satc_source

阅读并调试satc工具的代码,看看能不能做一些改进!

测试输入,注意在satc的docker中设置vscode的python版本是2021年发布的,不然会无法调试python2。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
//launch.json
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [

{
"name": "Python: Current File",
"type": "python",
"request": "launch",
"program": "${file}",
"args": ["-d", "/dataset/_RAXE500-V1.0.12.96_2.0.45.chk.extracted", "-o", "./out2.txt", "--ghidra_script=ref2sink_cmdi", "--ghidra_script=ref2sink_bof", "-b", "httpd", "--taint_check"],
"console": "integratedTerminal",
"justMyCode": true,
"env": {"PYTHONPATH":"/home/satc/.virtualenvs/SaTC/lib/python2.7/site-packages/"}
}
]
}

入口

在satc.py入口,程序解析输入参数。一共有四种脚本用来对binary进行测试

  • ref2share
  • share2sink

以上两个脚本用于测试跨binary的信息传递中是否存在漏洞

  • ref2sink_cmdi
  • ref2sink_bof

以上两个脚本测试单个binary中是否存在cmdi或者bof漏洞

front analysis

入口之后进入前端解析。这一部分主要做以下事情

  1. 扫描固件中所有前端文件,包含五个内容asp,html,js,php,xml。根据文件格式并用正则表达式提取关键字或者提取函数如下所示为输出
1
2
3
2023-08-18 05:54:20,292-root-logger.py-[line:112]-INFO : Find Keyword : allowedValueList PATH: /dataset/_RAXE500-V1.0.12.96_2.0.45.chk.extracted/squashfs-root/www/Public_UPNP_WANEtherLinkCfg.xml
2023-08-18 05:54:20,293-root-logger.py-[line:112]-INFO : Find Keyword : allowedValue PATH: /dataset/_RAXE500-V1.0.12.96_2.0.45.chk.extracted/squashfs-root/www/Public_UPNP_WANEtherLinkCfg.xml
2023-08-18 05:54:20,293-root-logger.py-[line:112]-INFO : Find Keyword : allowedValue PATH: /dataset/_RAXE500-V1.0.12.96_2.0.45.chk.extracted/squashfs-root/www/Public_UPNP_WANEtherLinkCfg.xml

具体而言提取的方式是正则匹配,下面是html的例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def get_keyword(self, html):
html_content = html.decode('utf-8', "ignore")
name_list = re.findall(r'name="(.*?)"', html_content)
id_list = re.findall(r'id="(.*?)"', html_content)
results = set(name_list) | set(id_list)
for res in results:
self._get_keyword(res, check=0)

def get_function(self, html):
html_content = html.decode('utf-8', "ignore")
path_list = re.findall(r'action="(.*?)"', html_content)
for path in path_list:
self._get_function(path, check=0)

def get_js_src(self, html):
html_content = html.decode('utf-8', 'ignore')
src_list = re.findall(r'<script src="(.*?)"></script>', html_content)
for src in src_list:
res = src.find("?")
if res > 0:
src = src[:res]
src_file = src.split("/")[-1]
js_obj = self.jsfile_citations.get(src_file, JSFile(src))
js_obj.add_depend(self.fpath)
self.jsfile_citations.update({src_file: js_obj})

在这之后,将提取到的keyword和function保存在self.analysise_obj中。这里的function也包含了POST到具体哪个cgi的cgi名称

  1. get_analysise_result中将上述信息提取,筛选掉keyword在不同文件中出现次数超过一定数量的文件,将他们记作removed,也许是为了效率的考量。其实这里是可能存在一些问题的,不知道为什么remove了

image-20230818142111007

remove部分代码如下,这里的PARA_MAX_FRONT默认为10

image-20230818143812334

  1. 进入backanalysis阶段,检查指定的binary(或者是整个固件)中引用的js文件。并用strings提取出二进制中的字符串
1
2
3
4
5
def get_string(self):
res = execute("strings '{}'".format(self.binaryfile))
self.bin_strings = set(res.split("\n"))
b_total.add(len(self.bin_strings))
return self.bin_strings

之后根据之前提取到的所有前端文件中的函数、关键字信息,在指定二进制文件中寻找是否存在对应字符串、函数名等信息.将(有意义函数名、有意义字符串)一起作为一个元组,append在elf_result之后。

之后将结果记录下,并且组合上获取到keyword或者function name的源文件名称是什么。将上述获取到的内容作为结果保存。

ghidra analysis

在ghidra analysis中,引用了本地指定的ghidra脚本。这里我选择ref2sink_cmdi和ref2sink_bof做分析。从结果上来看ghidra是用来根据key word生成什么函数包含了这个keyword,以及这个函数的调用链是什么,以及启发式的搜索到指定函数位置附近的可能可以接受参数的函数。先来看输出结果

image-20230818152539744

ref2sink_cmdi

以下函数是数据流追踪的核心内容,当给定target之后(这个target是一个字符串),找到这个target在程序位置,再根据此位置找到包含此字符串的函数。之后在findSinkPath中找到此函数如何到达目标sink函数的路径,代码中用的是dfs方法,这是比较耗时的。而如果找不到单层的包含此地址的函数,就再进行一层寻找,也就是用到这一条指令的再上面一层函数。这样的原因可能是因为有一些地址不被解析在函数中(我也不清楚)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
def searchParam(target, refstart=None, refend=None):
if DEBUG:
print 'start searching "%s" ...' % target
curAddr = currentProgram.minAddress
end = currentProgram.maxAddress
haveWayToSink = False
checkedRefAddr = set()
while curAddr < end:
curAddr = find(curAddr, target)
if curAddr is None:
break
if getByte(curAddr.add(len(target))) != 0:
curAddr = curAddr.add(1)
continue
for ref in getReferencesTo(curAddr):
if refstart is not None and refstart > ref.fromAddress:
continue
if refend is not None and refend < ref.fromAddress:
continue
if target not in newParam:
referenced.add(target)
caller = getFunctionContaining(ref.fromAddress)
if caller is not None:
if DEBUG:
print 'Reference From', a2h(ref.fromAddress), '(%s)' % caller,
print 'To', a2h(curAddr), '("%s")' % target
if ref.fromAddress in checkedRefAddr:
continue
haveWayToSink = findSinkPath(ref.fromAddress, curAddr, target) or haveWayToSink
checkedRefAddr.add(ref.fromAddress)
else:
for ref2 in getReferencesTo(ref.fromAddress):
caller = getFunctionContaining(ref2.fromAddress)
if caller is None:
if DEBUG:
print 'Ignore', getSymbolAt(ref2.fromAddress), 'at', a2h(ref2.fromAddress)
continue
if DEBUG:
print 'Reference From', a2h(ref2.fromAddress), '(%s)' % caller,
print 'To', a2h(ref.fromAddress), '(%s)' % getSymbolAt(ref.fromAddress),
print 'To', a2h(curAddr), '("%s")' % target
if ref2.fromAddress in checkedRefAddr:
continue
haveWayToSink = findSinkPath(ref2.fromAddress, curAddr, target) or haveWayToSink
checkedRefAddr.add(ref2.fromAddress)

curAddr = curAddr.add(1)
if DEBUG:
print 'finish searching "%s"' % target
return haveWayToSink

findSinkPath中,首先调用search寻找所有被call的函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
    startFunc = getFunctionContaining(refaddr)
assert startFunc is not None

pending.append(startFunc)
while len(pending):
search(pending.pop())

vulnerable = dfs(startFunc, [], refaddr)
if vulnerable:
searchStrArg(startFunc)
return vulnerable

# 下面是search函数,将此函数中每一个callee放入list中。直到没有call函数为止
def search(func, start=None):
if func in callMap:
return
callMap[func] = {}

start = start or func.entryPoint
end = func.body.maxAddress

inst = getInstructionAt(start)
while inst is not None and inst.address < end:
callee = getCallee(inst)
if callee is not None:
callMap[func][inst.address] = callee
if callee not in callMap:
pending.append(callee)
inst = inst.next

之后调用dfs,它从refaddr(有字符串的位置)开始搜索,直到调用的函数中存在sink,也就是system类的函数,否则检查callee是否是needCheckConstantStr的一员,这里检查了字符串是否可能被拼接以及是否是常量字符串。

在cmdi检测中,needCheckFormat中的sprintf并没有出现在sink中,也没有出现在bof检测中。这是它的一个问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
sinks = ['system', '___system', 'bstar_system', 'popen',
'doSystemCmd', 'doShell', 'twsystem', 'CsteSystem', 'cgi_deal_popen',
'ExeCmd', 'ExecShell', 'exec_shell_popen', 'exec_shell_popen_str'
]
needCheckConstantStr = {
'system': 0,
'fwrite': 0,
'___system': 0,
'bstar_system': 0,
'popen': 0,
'execve': 0,
'strcpy': 1,
'strcat': 1,
'strncpy': 1,
'memcpy': 1,
'twsystem': 0
}
needCheckFormat = {
'sprintf': 1,
'doSystemCmd': 0,
'doShell': 0
}

def dfs(func, path, start=None):
'''path: list of (addr of call, callee, callDigestFunc)'''
# 在cmdi检测中,needCheckFormat中的sprintf并没有出现在sink中
if func.name in sinks and len(path):
if func.name in needCheckConstantStr and checkConstantStr(path[-1][0], needCheckConstantStr[func.name]):
return False
if func.name in needCheckFormat and checkSafeFormat(path[-1][0], needCheckFormat[func.name]):
return False
printpath(path)
return True
callDigestFunc = False
vulnerable = False
for addr, callee in sorted(callMap[func].items()):
if start is not None and addr < start:
continue
if not callDigestFunc and callee.name in digest:
if callee.name in needCheckConstantStr and checkConstantStr(addr, needCheckConstantStr[callee.name]):
pass
elif callee.name in needCheckFormat and checkSafeFormat(addr, needCheckFormat[callee.name]):
pass
else:
callDigestFunc = True
if callee in [x[1] for x in path] + [startFunc] or callee in safeFuncs:
continue
vulnerable = dfs(callee, path + [(addr, callee, callDigestFunc)]) or vulnerable
if not vulnerable and func != startFunc:
safeFuncs.add(func)
return vulnerable

而具体检查参数在于在arm,mips中,字符串参数传递是通过寄存器实现的,下面应该是用了一个符号执行来判断当程序从函数起始位置开始执行到指定位置时,对应寄存器中是否是常量字符串

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
def getCallingArgs(addr, pos):
if not 0 <= pos <= 3:
return
arch = str(currentProgram.language.processor)
if arch == 'ARM':
reg = currentProgram.getRegister('r%d' % pos)
elif arch == 'MIPS':
nextInst = getInstructionAt(addr).next
if len(nextInst.pcode): # not NOP
addr = addr.add(8)
reg = currentProgram.getRegister('a%d' % pos)
else:
return
return getRegister(addr, reg)


def getRegister(addr, reg):
if analyzer is None:
getAnalyzer()

func = getFunctionContaining(addr)
if func is None:
return

if func in syms:
symEval = syms[func]
else:
symEval = SymbolicPropogator(currentProgram)
symEval.setParamRefCheck(True)
symEval.setReturnRefCheck(True)
symEval.setStoredRefCheck(True)
analyzer.flowConstants(currentProgram, func.entryPoint, func.body, symEval, monitor)
syms[func] = symEval

return symEval.getRegisterValue(addr, reg)

又或者,检查上述needCheckFormat中,函数是否安全,如果代码中将包含四个以上%s,判定为不安全。如果是四个以内,但是检查下来再某个位置上字符串并不是一个常量时,判断为不安全

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def checkConstantStr(addr, argpos=0):
# empty string is not considered as constant, for it may be uninitialized global variable
return bool(getStrArg(addr, argpos))


def checkSafeFormat(addr, offset=0):
data = getStrArg(addr, offset)
if data is None:
return False

fmtIndex = offset
for i in range(len(data) - 1):
if data[i] == '%' and data[i + 1] != '%':
fmtIndex += 1
if data[i + 1] == 's':
if fmtIndex > 3:
return False
if not checkConstantStr(addr, fmtIndex):
return False
return True

之后返回haveWayToSink的字符串个数,也就是能从这个字符串到达sink的个数

ref2sink_bof

ref2sink_bof和ref2sink_cmdi基本是一模一样的,可以看到就是sink改了一下,变成了堆strcpy作为sink的check。

image-20230820124659840

生成结果

在ghidra_extract_result文件夹中对应binary名下将生成$binary_ref2sink_cmdi.result

image-20230820125539569

文件中每一行是(字符串,引用函数,引用地址)以及完整的sink路径

taint analysis

当检测到命令行参数中存在--taint-analysis之后,进入satc.py中的污点分析阶段。首先会读取上一步生成的结果(httpd_ref2sink_cmdi.result),进入taint_stain_analysis函数。

ref2sink_cmdi

首先提取httpd_ref2sink_cmdi.result信息,因为之前并不是格式化处理好的数据,首先通过conv_Ghidra_output将数据格式化,并加上-alter2的后缀。具体的识别过程是匹配十六进制数字,提取call trace的地址,构成call trace的文本形式。

image-20230820182123585

之后建立angr工程,并生成cfg。

proj = angr.Project(binary, auto_load_libs=False, use_sim_procedures=True)

这里use_sim_procedures意思是re-hook external functions with SimProcedures,也就是用内置函数来代替外部函数,从而减少符号的使用量。

读取了上述function trace之后,进入taint.py中的main,传入function addr,taint addr,进入bugFinder。首先初始化一些信息,主要是binary相关的,接着进入_vuln_analysis

首先初始化coreTaint,生成的log文件存放在/tmp/coretaint.out中。在注释中的描述是

Perform a symbolic-execution-based taint analysis on a given binary to find whether

it exists a tainted path between a source and a sink.

coretaint

设置了log文件之后,文件通过_get_function_summaries找到angr project中的一些特定函数类型。这个function summary来源于Karonte这篇文章

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def _get_function_summaries(self):
"""
Set and returns a dictionary of function summaries
:return: function summaries
"""

p = self._current_p

mem_cpy_summ = get_memcpy_like(p)
size_of_summ = get_sizeof_like(p)
heap_alloc_summ = get_heap_alloc(p)
env_summ = get_env(p)
memcmp_like_unsized = get_memcmp_like_unsized(p)
memcmp_like_sized = get_memcmp_like_sized(p)
atoi_like = get_atoi(p)
nvram_summ = get_nvram(p)
cJSON_get_summ = get_cJSON(p)

summaries = mem_cpy_summ
summaries.update(size_of_summ)
summaries.update(heap_alloc_summ)
summaries.update(env_summ)
summaries.update(memcmp_like_unsized)
summaries.update(memcmp_like_sized)
summaries.update(atoi_like)
summaries.update(nvram_summ)
summaries.update(cJSON_get_summ)
return summaries

get_memcpy_like为例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
def get_memcpy_like(p):
"""
Gets and summarizes memcpy like functions within a Linux binary

:param p: angr project
:return: function summaries
"""

# TODO: add sprintf
summarized_f = {}

addrs = get_dyn_sym_addrs(p, ['sprintf'])
for f in addrs:
summarized_f[f] = summary_functions.sprintf

addrs = get_dyn_sym_addrs(p, ['snprintf'])
for f in addrs:
summarized_f[f] = summary_functions.snprintf

addrs = get_dyn_sym_addrs(p, ['strcpy','stristr'])
print addrs
for f in addrs:
summarized_f[f] = summary_functions.memcpy_unsized

addrs = get_dyn_sym_addrs(p, ['strncpy', 'memcpy'])
for f in addrs:
summarized_f[f] = summary_functions.memcpy_sized

return summarized_f

可以看到这部分操作把angr project中sprintf,snprintf等字符串拷贝相关函数用一个函数指针代替。以memcpy为例,如下。这应该是原先函数的代替,用python完成了重写,并且加上了taint与否的标签(还不是特别明白,等调试到再看)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
def memcpy_unsized(_core, call_site_path, plt_path):
"""
memcpy-like unsize (e.g., strcpy) function summary

:param _core: core taint engine
:param call_site_path: call site angr path
:param plt_path: path to the plt (i.e., call_site.step())
:return: None
"""

p = _core.p

# FIXME do taint untaint!
plt_path_cp = plt_path.copy(copy_states=True)
plt_state_cp = plt_path_cp.active[0]

src = getattr(plt_state_cp.regs, arg_reg_name(p, 1))

if _core.is_tainted(src, path=plt_path_cp) or _core.is_tainted(_core.safe_load(plt_path_cp, src), path=plt_path_cp):
# FIXME: make the actual copy so that taint dependency will be respected
t = _core.get_sym_val(name=_core.taint_buf, bits=_core.taint_buf_size).reversed
else:
plt_path_cp.step()
assert _core.p.is_hooked(plt_path_cp.active[0].addr), "memcpy_unsized: Summary function relies on angr's " \
"sim procedure, add option use_sim_procedures to the " \
"loader"
plt_path.step().step()
if not plt_path.active:
raise Exception("size of function has no active successors, not walking this path...")
return
dst = getattr(plt_path.active[0].regs, arg_reg_name(p, 0))
plt_path.active[0].memory.store(dst, t)

# restore the register values to return the call
_restore_caller_regs(_core, call_site_path, plt_path)

接下来设置sink_addr。这一步在后面coretaint中要用到。

1
2
3
4
5
6
7
8
9
10
def _find_sink_addresses(self):
"""
Sets the sink addresses in the current binary's project
:return: None
"""
p = self._current_p
# SINK_FUNCS = [('strcpy', sinks.strcpy), ('sprintf', sinks.sprintf), ('fwrite', sinks.fwrite), ('memcpy', sinks.memcpy),('system', sinks.system),('___system', sinks.system),('bstar_system', sinks.system),('popen',sinks.system),('execve',sinks.execve),("doSystemCmd",sinks.doSystemCmd),("twsystem", sinks.system),('CsteSystem', sinks.system)]

self._sink_addrs = [(get_dyn_sym_addr(p, func[0]), func[1]) for func in SINK_FUNCS]
self._sink_addrs += [(m, sinks.memcpy) for m in find_memcpy_like(p)]

接下来终于是coretaint了,运行污点测试engine。在设置一部分内容之后进入self.flat_explore,接着再进入self._flat_explore。这看起来是比较核心的函数。这一部分接受的sinks_infosources_info最终将变为传入check_func的参数,也就是_check_sink函数。这个函数接受一个state作为输入,判断当前state中是否有可以被taint或者untaint的内容,之后选择state.step()的返回结果作为下一步执行的内容,同时检查当前state经过的basic block中是否有sink(_is_sink_and_tainted),并且使用了taint data(这个检验方式是对于每一个调用sink函数的位置,使用重写的sink函数进行参数检查)。后面的taint analysis的部分应该是基于开源的taint engine修改的,应该只是修改了这一个检验函数。

而flat_explore本身应该和污点分析无关,是一种探索方式。下面写一下_check_sink的简化版源代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
 def _check_sink(self, current_path, guards_info, *_, **__):
"""
Checks whether the taint propagation analysis lead to a sink, and performs the necessary actions
:param current_path: angr current path
:param guards_info: guards (ITE) information
:return: None
"""

try:
current_state = current_path.active[0]
current_addr = current_state.addr
cfg = self._current_cfg

self._visited_bb += 1

# 下一步要探索的位置
next_path = current_path.copy(copy_states=True).step()
info = self._current_role_info
# 有一些不必要的污点变量可以删除,例如比较中的变量
bounded, var = self._is_any_taint_var_bounded(guards_info)
if bounded:
self._ct.do_recursive_untaint(var, current_path)

# If the taint is not applied yet, apply it
if not self._ct.taint_applied and current_addr == info[RoleInfo.CALLER_BB]:
next_state = next_path.active[0]
self._apply_taint(current_addr, current_path, next_state, taint_key=True)
# 一般eg_souce_addr为空,可能是测试时用的example source addr?
try:
if len(next_path.active) and self._config['eg_souce_addr']:
if next_path.active[0].addr == int(self._config['eg_souce_addr'], 16):
next_state = next_path.active[0]
self._apply_taint(current_addr, current_path, next_state, taint_key=True)
except TimeOutException as to:
raise to
except:
pass
# 检查途径的basic blocks中是否有sinks,并且这些sinks检查自己对应的变量位置是否有tainted的变量,有就产生一个alert
if self._is_sink_and_tainted(current_path):
delta_t = time.time() - self._analysis_starting_time
self._raised_alert = True
name_bin = self._ct.p.loader.main_object.binary
self._report_alert_fun('sink', name_bin, current_path, current_addr,
self._current_role_info[RoleInfo.DATAKEY],
pl_name=self._current_cpf_name, report_time=delta_t)

# tainted call address and tainted parameters
bl = self._current_p.factory.block(current_addr)
# 如果当前block中末尾是call,在不同的架构中也可能是jmp
if not len(next_path.active) and len(next_path.unconstrained) and bl.vex.jumpkind == 'Ijk_Call':
cap = bl.capstone.insns[-1]
vb = bl.vex
reg_jump = cap.insn.op_str
val_jump_reg = getattr(next_path.unconstrained[0].regs, reg_jump)
if not hasattr(vb.next, 'tmp'):
return
val_jump_tmp = next_path.unconstrained[0].scratch.temps[vb.next.tmp]

if not self.is_tainted_by_us(val_jump_reg) and not self.is_tainted_by_us(val_jump_tmp):
# 检查val_jump_reg是否指向tainted_data
if self._ct.is_or_points_to_tainted_data(val_jump_reg, next_path, unconstrained=True):
nargs = get_arity(self._current_p, current_path.active[0].addr)
# 找到jump的时候不同架构下用来传参的参数寄存器名字
for ord_reg in ordered_argument_regs[self._current_p.arch.name][:nargs]:
reg_name = self._current_p.arch.register_names[ord_reg]
if reg_name == reg_jump:
continue
# 判断call的时候参数寄存器是否包含或者指向taint data,如果是就默认是alert
reg_val = getattr(next_path.unconstrained[0].regs, reg_name)
if self._ct.is_or_points_to_tainted_data(reg_val, next_path,
unconstrained=True) and self.is_address(reg_val):
delta_t = time.time() - self._analysis_starting_time
self._raised_alert = True
name_bin = self._ct.p.loader.main_object.binary
self._report_alert_fun('sink', name_bin, current_path, current_addr,
self._current_role_info[RoleInfo.DATAKEY],
pl_name=self._current_cpf_name, report_time=delta_t)

next_state = next_path.unconstrained[0]
hash_val = self.bv_to_hash(val_jump_tmp)
self._taint_names_applied.append(hash_val)
hash_val = self.bv_to_hash(val_jump_reg)
self._taint_names_applied.append(hash_val)
# 将next_state对应的函数中各个参数taint
self._apply_taint(current_addr, current_path, next_state)

### _apply_taint如下所示
def _apply_taint(self, addr, current_path, next_state, taint_key=False):
"""
Applies the taint to the role function call

:param addr: address of the role function
:param current_path: current angr's path
:param next_state: state at the entry of the function
:return:
"""

def is_arg_key(arg):
return hasattr(arg, 'args') and type(arg.args[0]) in (int, long) and arg.args[0] == self._current_seed_addr

p = self._current_p
ins_args = get_ord_arguments_call(p, addr)
if not ins_args:
ins_args = get_any_arguments_call(p, addr)

if not are_parameters_in_registers(p):
raise Exception("Parameters not in registers: Implement me")

for stmt in ins_args:
reg_off = stmt.offset
reg_name = p.arch.register_names[reg_off]
val_arg = getattr(next_state.regs, reg_name)
size = None
if is_arg_key(val_arg):
if not taint_key:
continue
n_bytes = p.loader.memory.read_bytes(val_arg.args[0], STR_LEN)
size = len(get_mem_string(n_bytes)) * 8
if val_arg.concrete and val_arg.args[0] < p.loader.main_object.min_addr:
continue
log.info('taint applied to %s:%s' % (reg_name, str(val_arg)))
self._ct.apply_taint(current_path, val_arg, reg_name, size)


# eventually if we are in a loop guarded by a tainted variable
next_active = next_path.active
if len(next_active) > 1: # 大于1说明可能是判断语句,也可能是循环导致
history_addrs = [t for t in current_state.history.bbl_addrs]
seen_addr = [a.addr for a in next_active if a.addr in history_addrs] # 寻找当前地址是否出现在history 地址中

if len(seen_addr) == 0:
return

back_jumps = [a for a in seen_addr if a < current_addr]
if len(back_jumps) == 0:
return
# 所有跳回中选择其中第一个
bj = back_jumps[0]
node_s = cfg.get_any_node(bj)
node_f = cfg.get_any_node(current_addr)

if not node_s or not node_f:
return
# 获取第一个跳回中basic block地址和当前basic block地址
fun_s = node_s.function_address
fun_f = node_f.function_address

if fun_s != fun_f:
return

idx_s = history_addrs.index(bj)
# 从第一次出现当前地址往后开始找,确保所有跳回地址都是相同的,从而确保是跳回
for a in history_addrs[idx_s:]:
n = cfg.get_any_node(a)
if not n:
continue

if n.function_address != fun_s:
return

# if we have a back-jump satisfiying all the conditions
cond_guard = [g for g in next_active[0].guards][-1]

if hasattr(cond_guard, 'args') and len(cond_guard.args) == 2 and \
self._ct.taint_buf in str(cond_guard.args[0]) and \
self._ct.taint_buf in str(cond_guard.args[1]):
delta_t = time.time() - self._analysis_starting_time
self._raised_alert = True
name_bin = self._ct.p.loader.main_object.binary
# alert一个loop循环
self._report_alert_fun('loop', name_bin, current_path, current_addr, cond_guard,
pl_name=self._current_cpf_name, report_time=delta_t)
except TimeOutException as to:
raise to
except Exception as e:
log.error("Something went terribly wrong: %s" % str(e))

当对于explore过程中每个basic block都应用上述算法之后,就可以汇报出来是否存在到达sink函数,到达时是否有污点信息了。在这之后,结束了_vuln_analysis阶段。这部分应该是直接复用了KARONTE的代码,使用的BDG。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
try:
self._ct.run(s, (), (), summarized_f=summarized_f, force_thumb=False, check_func=self._check_sink,
init_bss=False)
except TimeOutException:
log.warning("Hard timeout triggered")
except Exception as e:
log.error("Something went terribly wrong: %s" % str(e))

self._ct.unset_alarm()

# stats
self._stats[bdg_node.bin]['to'] += 1 if self._ct.triggered_to() else 0
self._stats[bdg_node.bin]['visited_bb'] += self._visited_bb
self._stats[bdg_node.bin]['n_paths'] += self._ct.n_paths
self._stats[bdg_node.bin]['ana_time'] += (time.time() - ana_start_time)
self._stats[bdg_node.bin]['n_runs'] += 1

接下来回退到bugFinder里面,这里调用了_vuln_analysis。后面则是仍然和KARONTE类似,在BDG中的child binary进行搜索。仍然是调用此函数,不再赘述了。

问题&&思考

在看代码的时候发现一个问题,一开始ghidra找到的所有path中,包含taint的部分虽然被提取出来了,但是使用的时候比较粗糙,只是在apply_taint中使用,用处是检验被call的函数是不是之前taint路径上的,如果能利用之前taint trace的内容对binary进行剪枝,可能可以加快处理时间,这部分或许可以参考beacon文章。

文章目录
  1. 1. 入口
  2. 2. front analysis
  3. 3. ghidra analysis
    1. 3.1. ref2sink_cmdi
    2. 3.2. ref2sink_bof
    3. 3.3. 生成结果
  4. 4. taint analysis
    1. 4.1. ref2sink_cmdi
    2. 4.2. coretaint
  5. 5. 问题&&思考
|