Package: release.debian.org Severity: normal User: release.debian....@packages.debian.org Usertags: unblock
Please unblock package python-jwcrypto The new upstream release is needed to fix: https://bugs.debian.org/cgi-bin/bugreport.cgi?bug=925457 diff -Nru python-jwcrypto-0.4.2/debian/changelog python-jwcrypto-0.6.0/debian/changelog --- python-jwcrypto-0.4.2/debian/changelog 2017-12-23 10:00:03.000000000 +0200 +++ python-jwcrypto-0.6.0/debian/changelog 2019-04-02 09:05:15.000000000 +0300 @@ -1,3 +1,11 @@ +python-jwcrypto (0.6.0-1) unstable; urgency=medium + + * New upstream release. (Closes: #925457) + * control: Update vcs urls. + * control: Drop X-Python-Version*. + + -- Timo Aaltonen <tjaal...@debian.org> Tue, 02 Apr 2019 09:05:15 +0300 + python-jwcrypto (0.4.2-1) unstable; urgency=medium * New upstream release. diff -Nru python-jwcrypto-0.4.2/debian/control python-jwcrypto-0.6.0/debian/control --- python-jwcrypto-0.4.2/debian/control 2017-12-23 09:52:28.000000000 +0200 +++ python-jwcrypto-0.6.0/debian/control 2019-04-02 09:04:58.000000000 +0300 @@ -14,12 +14,10 @@ python3-cryptography, python3-nose, python3-setuptools, -X-Python-Version: >= 2.7 -X-Python3-Version: >= 3.3 Standards-Version: 4.1.2 Homepage: https://github.com/latchset/jwcrypto -Vcs-Git: https://anonscm.debian.org/git/pkg-freeipa/python-jwcrypto.git -Vcs-Browser: https://anonscm.debian.org/cgit/pkg-freeipa/python-jwcrypto.git +Vcs-Git: https://salsa.debian.org/freeipa-team/python-jwcrypto.git +Vcs-Browser: https://salsa.debian.org/freeipa-team/python-jwcrypto Package: python-jwcrypto Architecture: all diff -Nru python-jwcrypto-0.4.2/docs/source/conf.py python-jwcrypto-0.6.0/docs/source/conf.py --- python-jwcrypto-0.4.2/docs/source/conf.py 2017-08-01 18:56:23.000000000 +0300 +++ python-jwcrypto-0.6.0/docs/source/conf.py 2018-11-05 17:14:47.000000000 +0200 @@ -46,16 +46,16 @@ # General information about the project. project = u'JWCrypto' -copyright = u'2016-2017, JWCrypto Contributors' +copyright = u'2016-2018, JWCrypto Contributors' # The version info for the project you're documenting, acts as replacement for # |version| and |release|, also used in various other places throughout the # built documents. # # The short X.Y version. -version = '0.4' +version = '0.6' # The full version, including alpha/beta/rc tags. -release = '0.4.2' +release = '0.6' # The language for content autogenerated by Sphinx. Refer to documentation # for a list of supported languages. diff -Nru python-jwcrypto-0.4.2/docs/source/jwe.rst python-jwcrypto-0.6.0/docs/source/jwe.rst --- python-jwcrypto-0.4.2/docs/source/jwe.rst 2017-08-01 18:56:23.000000000 +0300 +++ python-jwcrypto-0.6.0/docs/source/jwe.rst 2018-11-05 17:14:47.000000000 +0200 @@ -51,6 +51,9 @@ Examples -------- +Symmetric keys +~~~~~~~~~~~~~~ + Encrypt a JWE token:: >>> from jwcrypto import jwk, jwe >>> from jwcrypto.common import json_encode @@ -67,3 +70,29 @@ >>> jwetoken.deserialize(enc) >>> jwetoken.decrypt(key) >>> payload = jwetoken.payload + +Asymmetric keys +~~~~~~~~~~~~~~~ + +Encrypt a JWE token:: + >>> from jwcrypto import jwk, jwe + >>> from jwcrypto.common import json_encode, json_decode + >>> public_key = jwk.JWK() + >>> private_key = jwk.JWK.generate(kty='RSA', size=2048) + >>> public_key.import_key(**json_decode(private_key.export_public())) + >>> payload = "My Encrypted message" + >>> protected_header = { + "alg": "RSA-OAEP-256", + "enc": "A256CBC-HS512", + "typ": "JWE", + "kid": public_key.thumbprint(), + } + >>> jwetoken = jwe.JWE(payload.encode('utf-8'), + recipient=public_key, + protected=protected_header) + >>> enc = jwetoken.serialize() + +Decrypt a JWE token:: + >>> jwetoken = jwe.JWE() + >>> jwetoken.deserialize(enc, key=private_key) + >>> payload = jwetoken.payload diff -Nru python-jwcrypto-0.4.2/jwcrypto/common.py python-jwcrypto-0.6.0/jwcrypto/common.py --- python-jwcrypto-0.4.2/jwcrypto/common.py 2017-08-01 18:56:23.000000000 +0300 +++ python-jwcrypto-0.6.0/jwcrypto/common.py 2018-11-05 17:14:47.000000000 +0200 @@ -16,12 +16,12 @@ def base64url_decode(payload): - l = len(payload) % 4 - if l == 2: + size = len(payload) % 4 + if size == 2: payload += '==' - elif l == 3: + elif size == 3: payload += '=' - elif l != 0: + elif size != 0: raise ValueError('Invalid base64 string') return urlsafe_b64decode(payload.encode('utf-8')) diff -Nru python-jwcrypto-0.4.2/jwcrypto/jwa.py python-jwcrypto-0.6.0/jwcrypto/jwa.py --- python-jwcrypto-0.4.2/jwcrypto/jwa.py 2017-08-01 18:56:23.000000000 +0300 +++ python-jwcrypto-0.6.0/jwcrypto/jwa.py 2018-11-05 17:14:47.000000000 +0200 @@ -14,6 +14,7 @@ from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from cryptography.hazmat.primitives.kdf.concatkdf import ConcatKDFHash from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC +from cryptography.hazmat.primitives.keywrap import aes_key_unwrap, aes_key_wrap from cryptography.hazmat.primitives.padding import PKCS7 import six @@ -141,15 +142,15 @@ def sign(self, key, payload): skey = key.get_op_key('sign', self._curve) signature = skey.sign(payload, ec.ECDSA(self.hashfn)) - r, s = ec_utils.decode_rfc6979_signature(signature) - l = key.get_curve(self._curve).key_size - return _encode_int(r, l) + _encode_int(s, l) + r, s = ec_utils.decode_dss_signature(signature) + size = key.get_curve(self._curve).key_size + return _encode_int(r, size) + _encode_int(s, size) def verify(self, key, payload, signature): pkey = key.get_op_key('verify', self._curve) r = signature[:len(signature) // 2] s = signature[len(signature) // 2:] - enc_signature = ec_utils.encode_rfc6979_signature( + enc_signature = ec_utils.encode_dss_signature( int(hexlify(r), 16), int(hexlify(s), 16)) pkey.verify(enc_signature, payload, ec.ECDSA(self.hashfn)) @@ -439,49 +440,14 @@ if not cek: cek = _randombits(bitsize) - # Implement RFC 3394 Key Unwrap - 2.2.2 - # TODO: Use cryptography once issue #1733 is resolved - iv = 'a6a6a6a6a6a6a6a6' - a = unhexlify(iv) - r = [cek[i:i + 8] for i in range(0, len(cek), 8)] - n = len(r) - for j in range(0, 6): - for i in range(0, n): - e = Cipher(algorithms.AES(rk), modes.ECB(), - backend=self.backend).encryptor() - b = e.update(a + r[i]) + e.finalize() - a = _encode_int(_decode_int(b[:8]) ^ ((n * j) + i + 1), 64) - r[i] = b[-8:] - ek = a - for i in range(0, n): - ek += r[i] + ek = aes_key_wrap(rk, cek, default_backend()) + return {'cek': cek, 'ek': ek} def unwrap(self, key, bitsize, ek, headers): rk = self._get_key(key, 'decrypt') - # Implement RFC 3394 Key Unwrap - 2.2.3 - # TODO: Use cryptography once issue #1733 is resolved - iv = 'a6a6a6a6a6a6a6a6' - aiv = unhexlify(iv) - - r = [ek[i:i + 8] for i in range(0, len(ek), 8)] - a = r.pop(0) - n = len(r) - for j in range(5, -1, -1): - for i in range(n - 1, -1, -1): - da = _decode_int(a) - atr = _encode_int((da ^ ((n * j) + i + 1)), 64) + r[i] - d = Cipher(algorithms.AES(rk), modes.ECB(), - backend=self.backend).decryptor() - b = d.update(atr) + d.finalize() - a = b[:8] - r[i] = b[-8:] - - if a != aiv: - raise RuntimeError('Decryption Failed') - - cek = b''.join(r) + cek = aes_key_unwrap(rk, ek, default_backend()) if _bitsize(cek) != bitsize: raise InvalidJWEKeyLength(bitsize, _bitsize(cek)) return cek @@ -761,23 +727,24 @@ def wrap(self, key, bitsize, cek, headers): self._check_key(key) + dk_size = self.keysize if self.keysize is None: if cek is not None: raise InvalidJWEOperation('ECDH-ES cannot use an existing CEK') alg = headers['enc'] + dk_size = bitsize else: - bitsize = self.keysize alg = headers['alg'] epk = JWK.generate(kty=key.key_type, crv=key.key_curve) dk = self._derive(epk.get_op_key('unwrapKey'), key.get_op_key('wrapKey'), - alg, bitsize, headers) + alg, dk_size, headers) if self.keysize is None: ret = {'cek': dk} else: - aeskw = self.aeskwmap[bitsize]() + aeskw = self.aeskwmap[self.keysize]() kek = JWK(kty="oct", use="enc", k=base64url_encode(dk)) ret = aeskw.wrap(kek, bitsize, cek, headers) @@ -788,20 +755,21 @@ if 'epk' not in headers: raise ValueError('Invalid Header, missing "epk" parameter') self._check_key(key) + dk_size = self.keysize if self.keysize is None: alg = headers['enc'] + dk_size = bitsize else: - bitsize = self.keysize alg = headers['alg'] epk = JWK(**headers['epk']) dk = self._derive(key.get_op_key('unwrapKey'), epk.get_op_key('wrapKey'), - alg, bitsize, headers) + alg, dk_size, headers) if self.keysize is None: return dk else: - aeskw = self.aeskwmap[bitsize]() + aeskw = self.aeskwmap[self.keysize]() kek = JWK(kty="oct", use="enc", k=base64url_encode(dk)) cek = aeskw.unwrap(kek, bitsize, ek, headers) return cek @@ -828,7 +796,7 @@ class _EcdhEsAes256Kw(_EcdhEs): name = 'ECDH-ES+A256KW' - description = 'ECDH-ES using Concat KDF and "A128KW" wrapping' + description = 'ECDH-ES using Concat KDF and "A256KW" wrapping' keysize = 256 algorithm_usage_location = 'alg' algorithm_use = 'kex' diff -Nru python-jwcrypto-0.4.2/jwcrypto/jwe.py python-jwcrypto-0.6.0/jwcrypto/jwe.py --- python-jwcrypto-0.4.2/jwcrypto/jwe.py 2017-08-01 18:56:23.000000000 +0300 +++ python-jwcrypto-0.6.0/jwcrypto/jwe.py 2018-11-05 17:14:47.000000000 +0200 @@ -3,6 +3,7 @@ import zlib from jwcrypto import common +from jwcrypto.common import JWException from jwcrypto.common import base64url_decode, base64url_encode from jwcrypto.common import json_decode, json_encode from jwcrypto.jwa import JWA @@ -40,7 +41,7 @@ """Default allowed algorithms""" -class InvalidJWEData(Exception): +class InvalidJWEData(JWException): """Invalid JWE Object. This exception is raised when the JWE Object is invalid and/or @@ -58,7 +59,7 @@ super(InvalidJWEData, self).__init__(msg) -# These have been moved to jwcrypto.common, maintain here for bacwards compat +# These have been moved to jwcrypto.common, maintain here for backwards compat InvalidCEKeyLength = common.InvalidCEKeyLength InvalidJWEKeyLength = common.InvalidJWEKeyLength InvalidJWEKeyType = common.InvalidJWEKeyType @@ -108,7 +109,7 @@ json_decode(unprotected) # check header encoding self.objects['unprotected'] = unprotected if algs: - self.allowed_algs = algs + self._allowed_algs = algs if recipient: self.add_recipient(recipient, header=header) @@ -269,7 +270,19 @@ if compact: for invalid in 'aad', 'unprotected': if invalid in self.objects: - raise InvalidJWEOperation("Can't use compact encoding") + raise InvalidJWEOperation( + "Can't use compact encoding when the '%s' parameter" + "is set" % invalid) + if 'protected' not in self.objects: + raise InvalidJWEOperation( + "Can't use compat encoding without protected headers") + else: + ph = json_decode(self.objects['protected']) + for required in 'alg', 'enc': + if required not in ph: + raise InvalidJWEOperation( + "Can't use compat encoding, '%s' must be in the " + "protected header" % required) if 'recipients' in self.objects: if len(self.objects['recipients']) != 1: raise InvalidJWEOperation("Invalid number of recipients") diff -Nru python-jwcrypto-0.4.2/jwcrypto/jwk.py python-jwcrypto-0.6.0/jwcrypto/jwk.py --- python-jwcrypto-0.4.2/jwcrypto/jwk.py 2017-08-01 18:56:23.000000000 +0300 +++ python-jwcrypto-0.6.0/jwcrypto/jwk.py 2018-11-05 17:14:47.000000000 +0200 @@ -1,8 +1,9 @@ # Copyright (C) 2015 JWCrypto Project Contributors - see LICENSE file import os - from binascii import hexlify, unhexlify +from collections import namedtuple +from enum import Enum from cryptography import x509 from cryptography.hazmat.backends import default_backend @@ -12,6 +13,7 @@ from six import iteritems +from jwcrypto.common import JWException from jwcrypto.common import base64url_decode, base64url_encode from jwcrypto.common import json_decode, json_encode @@ -22,36 +24,59 @@ 'oct': 'Octet sequence'} """Registry of valid Key Types""" + # RFC 7518 - 7.5 # It is part of the JWK Parameters Registry, but we want a more # specific map for internal usage -JWKValuesRegistry = {'EC': {'crv': ('Curve', 'Public', 'Required'), - 'x': ('X Coordinate', 'Public', 'Required'), - 'y': ('Y Coordinate', 'Public', 'Required'), - 'd': ('ECC Private Key', 'Private', None)}, - 'RSA': {'n': ('Modulus', 'Public', 'Required'), - 'e': ('Exponent', 'Public', 'Required'), - 'd': ('Private Exponent', 'Private', None), - 'p': ('First Prime Factor', 'Private', None), - 'q': ('Second Prime Factor', 'Private', None), - 'dp': ('First Factor CRT Exponent', 'Private', - None), - 'dq': ('Second Factor CRT Exponent', 'Private', - None), - 'qi': ('First CRT Coefficient', 'Private', None)}, - 'oct': {'k': ('Key Value', 'Private', 'Required')}} +class ParmType(Enum): + name = 'A string with a name' + b64 = 'Base64url Encoded' + b64U = 'Base64urlUint Encoded' + unsupported = 'Unsupported Parameter' + + +JWKParameter = namedtuple('Parameter', 'description public required type') +JWKValuesRegistry = { + 'EC': { + 'crv': JWKParameter('Curve', True, True, ParmType.name), + 'x': JWKParameter('X Coordinate', True, True, ParmType.b64), + 'y': JWKParameter('Y Coordinate', True, True, ParmType.b64), + 'd': JWKParameter('ECC Private Key', False, False, ParmType.b64), + }, + 'RSA': { + 'n': JWKParameter('Modulus', True, True, ParmType.b64), + 'e': JWKParameter('Exponent', True, True, ParmType.b64U), + 'd': JWKParameter('Private Exponent', False, False, ParmType.b64U), + 'p': JWKParameter('First Prime Factor', False, False, ParmType.b64U), + 'q': JWKParameter('Second Prime Factor', False, False, ParmType.b64U), + 'dp': JWKParameter('First Factor CRT Exponent', + False, False, ParmType.b64U), + 'dq': JWKParameter('Second Factor CRT Exponent', + False, False, ParmType.b64U), + 'qi': JWKParameter('First CRT Coefficient', + False, False, ParmType.b64U), + 'oth': JWKParameter('Other Primes Info', + False, False, ParmType.unsupported), + }, + 'oct': { + 'k': JWKParameter('Key Value', False, True, ParmType.b64), + } +} """Registry of valid key values""" -JWKParamsRegistry = {'kty': ('Key Type', 'Public', ), - 'use': ('Public Key Use', 'Public'), - 'key_ops': ('Key Operations', 'Public'), - 'alg': ('Algorithm', 'Public'), - 'kid': ('Key ID', 'Public'), - 'x5u': ('X.509 URL', 'Public'), - 'x5c': ('X.509 Certificate Chain', 'Public'), - 'x5t': ('X.509 Certificate SHA-1 Thumbprint', 'Public'), - 'x5t#S256': ('X.509 Certificate SHA-256 Thumbprint', - 'Public')} +JWKParamsRegistry = { + 'kty': JWKParameter('Key Type', True, None, None), + 'use': JWKParameter('Public Key Use', True, None, None), + 'key_ops': JWKParameter('Key Operations', True, None, None), + 'alg': JWKParameter('Algorithm', True, None, None), + 'kid': JWKParameter('Key ID', True, None, None), + 'x5u': JWKParameter('X.509 URL', True, None, None), + 'x5c': JWKParameter('X.509 Certificate Chain', True, None, None), + 'x5t': JWKParameter('X.509 Certificate SHA-1 Thumbprint', + True, None, None), + 'x5t#S256': JWKParameter('X.509 Certificate SHA-256 Thumbprint', + True, None, None) +} """Regstry of valid key parameters""" # RFC 7518 - 7.6 @@ -83,7 +108,7 @@ 'secp521r1': 'P-521'} -class InvalidJWKType(Exception): +class InvalidJWKType(JWException): """Invalid JWK Type Exception. This exception is raised when an invalid parameter type is used. @@ -98,7 +123,7 @@ self.value, list(JWKTypesRegistry.keys())) -class InvalidJWKUsage(Exception): +class InvalidJWKUsage(JWException): """Invalid JWK usage Exception. This exception is raised when an invalid key usage is requested, @@ -123,7 +148,7 @@ valid) -class InvalidJWKOperation(Exception): +class InvalidJWKOperation(JWException): """Invalid JWK Operation Exception. This exception is raised when an invalid key operation is requested, @@ -150,7 +175,7 @@ valid) -class InvalidJWKValue(Exception): +class InvalidJWKValue(JWException): """Invalid JWK Value Exception. This exception is raised when an invalid/unknown value is used in the @@ -210,6 +235,7 @@ @classmethod def generate(cls, **kwargs): obj = cls() + kty = None try: kty = kwargs['kty'] gen = getattr(obj, '_generate_%s' % kty) @@ -219,6 +245,7 @@ return obj def generate_key(self, **params): + kty = None try: kty = params.pop('generate') gen = getattr(self, '_generate_%s' % kty) @@ -346,8 +373,26 @@ names.remove(name) for name, val in iteritems(JWKValuesRegistry[kty]): - if val[2] == 'Required' and name not in self._key: + if val.required and name not in self._key: raise InvalidJWKValue('Missing required value %s' % name) + if val.type == ParmType.unsupported and name in self._key: + raise InvalidJWKValue('Unsupported parameter %s' % name) + if val.type == ParmType.b64 and name in self._key: + # Check that the value is base64url encoded + try: + base64url_decode(self._key[name]) + except Exception: # pylint: disable=broad-except + raise InvalidJWKValue( + '"%s" is not base64url encoded' % name + ) + if val[3] == ParmType.b64U and name in self._key: + # Check that the value is Base64urlUInt encoded + try: + self._decode_int(self._key[name]) + except Exception: # pylint: disable=broad-except + raise InvalidJWKValue( + '"%s" is not Base64urlUInt encoded' % name + ) # Unknown key parameters are allowed # Let's just store them out of the way @@ -385,6 +430,20 @@ ' "key_ops" values specified at' ' the same time') + @classmethod + def from_json(cls, key): + """Creates a RFC 7517 JWK from the standard JSON format. + + :param key: The RFC 7517 representation of a JWK. + """ + obj = cls() + try: + jkey = json_decode(key) + except Exception as e: # pylint: disable=broad-except + raise InvalidJWKValue(e) + obj.import_key(**jkey) + return obj + def export(self, private_key=True): """Exports the key in the standard JSON format. Exports the key regardless of type, if private_key is False @@ -405,19 +464,23 @@ It fails if one is not available like when this function is called on a symmetric key. """ + pub = self._public_params() + return json_encode(pub) + + def _public_params(self): if not self.has_public: raise InvalidJWKType("No public key available") pub = {} preg = JWKParamsRegistry for name in preg: - if preg[name][1] == 'Public': + if preg[name].public: if name in self._params: pub[name] = self._params[name] reg = JWKValuesRegistry[self._params['kty']] for param in reg: - if reg[param][1] == 'Public': + if reg[param].public: pub[param] = self._key[param] - return json_encode(pub) + return pub def _export_all(self): d = dict() @@ -439,6 +502,10 @@ return self._export_all() raise InvalidJWKType("Not a symmetric key") + def public(self): + pub = self._public_params() + return JWK(**pub) + @property def has_public(self): """Whether this JWK has an asymmetric Public key.""" @@ -446,7 +513,7 @@ return False reg = JWKValuesRegistry[self._params['kty']] for value in reg: - if reg[value][1] == 'Public' and value in self._key: + if reg[value].public and value in self._key: return True @property @@ -456,7 +523,7 @@ return False reg = JWKValuesRegistry[self._params['kty']] for value in reg: - if reg[value][1] == 'Private' and value in self._key: + if not reg[value].public and value in self._key: return True return False @@ -700,7 +767,7 @@ t = {'kty': self._params['kty']} for name, val in iteritems(JWKValuesRegistry[t['kty']]): - if val[2] == 'Required': + if val.required: t[name] = self._key[name] digest = hashes.Hash(hashalg, backend=default_backend()) digest.update(bytes(json_encode(t).encode('utf8'))) @@ -733,6 +800,12 @@ super(JWKSet, self).__setitem__('keys', _JWKkeys()) self.update(*args, **kwargs) + def __iter__(self): + return self['keys'].__iter__() + + def __contains__(self, key): + return self['keys'].__contains__(key) + def __setitem__(self, key, val): if key == 'keys': self['keys'].add(val) @@ -769,7 +842,7 @@ """ try: jwkset = json_decode(keyset) - except: + except Exception: # pylint: disable=broad-except raise InvalidJWKValue() if 'keys' not in jwkset: @@ -782,8 +855,6 @@ else: self[k] = v - return self - @classmethod def from_json(cls, keyset): """Creates a RFC 7517 keyset from the standard JSON format. @@ -791,7 +862,8 @@ :param keyset: The RFC 7517 representation of a JOSE Keyset. """ obj = cls() - return obj.import_keyset(keyset) + obj.import_keyset(keyset) + return obj def get_key(self, kid): """Gets a key from the set. diff -Nru python-jwcrypto-0.4.2/jwcrypto/jws.py python-jwcrypto-0.6.0/jwcrypto/jws.py --- python-jwcrypto-0.4.2/jwcrypto/jws.py 2017-08-01 18:56:23.000000000 +0300 +++ python-jwcrypto-0.6.0/jwcrypto/jws.py 2018-11-05 17:14:47.000000000 +0200 @@ -1,5 +1,8 @@ # Copyright (C) 2015 JWCrypto Project Contributors - see LICENSE file +from collections import namedtuple + +from jwcrypto.common import JWException from jwcrypto.common import base64url_decode, base64url_encode from jwcrypto.common import json_decode, json_encode from jwcrypto.jwa import JWA @@ -8,18 +11,24 @@ # RFC 7515 - 9.1 # name: (description, supported?) -JWSHeaderRegistry = {'alg': ('Algorithm', True), - 'jku': ('JWK Set URL', False), - 'jwk': ('JSON Web Key', False), - 'kid': ('Key ID', True), - 'x5u': ('X.509 URL', False), - 'x5c': ('X.509 Certificate Chain', False), - 'x5t': ('X.509 Certificate SHA-1 Thumbprint', False), - 'x5t#S256': ('X.509 Certificate SHA-256 Thumbprint', - False), - 'typ': ('Type', True), - 'cty': ('Content Type', True), - 'crit': ('Critical', True)} +JWSHeaderParameter = namedtuple('Parameter', + 'description mustprotect supported') +JWSHeaderRegistry = { + 'alg': JWSHeaderParameter('Algorithm', False, True), + 'jku': JWSHeaderParameter('JWK Set URL', False, False), + 'jwk': JWSHeaderParameter('JSON Web Key', False, False), + 'kid': JWSHeaderParameter('Key ID', False, True), + 'x5u': JWSHeaderParameter('X.509 URL', False, False), + 'x5c': JWSHeaderParameter('X.509 Certificate Chain', False, False), + 'x5t': JWSHeaderParameter( + 'X.509 Certificate SHA-1 Thumbprint', False, False), + 'x5t#S256': JWSHeaderParameter( + 'X.509 Certificate SHA-256 Thumbprint', False, False), + 'typ': JWSHeaderParameter('Type', False, True), + 'cty': JWSHeaderParameter('Content Type', False, True), + 'crit': JWSHeaderParameter('Critical', True, True), + 'b64': JWSHeaderParameter('Base64url-Encode Payload', True, True) +} """Registry of valid header parameters""" default_allowed_algs = [ @@ -30,7 +39,7 @@ """Default allowed algorithms""" -class InvalidJWSSignature(Exception): +class InvalidJWSSignature(JWException): """Invalid JWS Signature. This exception is raised when a signature cannot be validated. @@ -47,7 +56,7 @@ super(InvalidJWSSignature, self).__init__(msg) -class InvalidJWSObject(Exception): +class InvalidJWSObject(JWException): """Invalid JWS Object. This exception is raised when the JWS Object is invalid and/or @@ -63,7 +72,7 @@ super(InvalidJWSObject, self).__init__(msg) -class InvalidJWSOperation(Exception): +class InvalidJWSOperation(JWException): """Invalid JWS Object. This exception is raised when a requested operation cannot @@ -113,11 +122,16 @@ if header is not None: if isinstance(header, dict): + self.header = header header = json_encode(header) + else: + self.header = json_decode(header) + self.protected = base64url_encode(header.encode('utf-8')) else: + self.header = dict() self.protected = '' - self.payload = base64url_encode(payload) + self.payload = payload def _jwa(self, name, allowed): if allowed is None: @@ -126,12 +140,22 @@ raise InvalidJWSOperation('Algorithm not allowed') return JWA.signing_alg(name) + def _payload(self): + if self.header.get('b64', True): + return base64url_encode(self.payload).encode('utf-8') + else: + if isinstance(self.payload, bytes): + return self.payload + else: + return self.payload.encode('utf-8') + def sign(self): """Generates a signature""" - sigin = ('.'.join([self.protected, self.payload])).encode('utf-8') + payload = self._payload() + sigin = b'.'.join([self.protected.encode('utf-8'), payload]) signature = self.engine.sign(self.key, sigin) return {'protected': self.protected, - 'payload': self.payload, + 'payload': payload, 'signature': base64url_encode(signature)} def verify(self, signature): @@ -140,7 +164,8 @@ :raises InvalidJWSSignature: if the verification fails. """ try: - sigin = ('.'.join([self.protected, self.payload])).encode('utf-8') + payload = self._payload() + sigin = b'.'.join([self.protected.encode('utf-8'), payload]) self.engine.verify(self.key, sigin, signature) except Exception as e: # pylint: disable=broad-except raise InvalidJWSSignature('Verification failed', repr(e)) @@ -164,16 +189,6 @@ self.verifylog = None self._allowed_algs = None - def _check_crit(self, crit): - for k in crit: - if k not in JWSHeaderRegistry: - raise InvalidJWSSignature('Unknown critical header: ' - '"%s"' % k) - else: - if not JWSHeaderRegistry[k][1]: - raise InvalidJWSSignature('Unsupported critical ' - 'header: "%s"' % k) - @property def allowed_algs(self): """Allowed algorithms. @@ -197,31 +212,61 @@ def is_valid(self): return self.objects.get('valid', False) - def _merge_headers(self, h1, h2): - for k in list(h1.keys()): - if k in h2: - raise InvalidJWSObject('Duplicate header: "%s"' % k) - h1.update(h2) - return h1 + # TODO: allow caller to specify list of headers it understands + def _merge_check_headers(self, protected, *headers): + header = None + crit = [] + if protected is not None: + if 'crit' in protected: + crit = protected['crit'] + # Check immediately if we support these critical headers + for k in crit: + if k not in JWSHeaderRegistry: + raise InvalidJWSObject( + 'Unknown critical header: "%s"' % k) + else: + if not JWSHeaderRegistry[k][1]: + raise InvalidJWSObject( + 'Unsupported critical header: "%s"' % k) + header = protected + if 'b64' in header: + if not isinstance(header['b64'], bool): + raise InvalidJWSObject('b64 header must be a boolean') + + for hn in headers: + if hn is None: + continue + if header is None: + header = dict() + for h in list(hn.keys()): + if h in JWSHeaderRegistry: + if JWSHeaderRegistry[h].mustprotect: + raise InvalidJWSObject('"%s" must be protected' % h) + if h in header: + raise InvalidJWSObject('Duplicate header: "%s"' % h) + header.update(hn) + + for k in crit: + if k not in header: + raise InvalidJWSObject('Missing critical header "%s"' % k) + + return header # TODO: support selecting key with 'kid' and passing in multiple keys def _verify(self, alg, key, payload, signature, protected, header=None): - # verify it is a valid JSON object and keep a decode copy + p = dict() + # verify it is a valid JSON object and decode if protected is not None: p = json_decode(protected) - else: - p = dict() - if not isinstance(p, dict): - raise InvalidJWSSignature('Invalid Protected header') + if not isinstance(p, dict): + raise InvalidJWSSignature('Invalid Protected header') # merge heders, and verify there are no duplicates if header: if not isinstance(header, dict): raise InvalidJWSSignature('Invalid Unprotected header') - p = self._merge_headers(p, header) - # verify critical headers - # TODO: allow caller to specify list of headers it understands - if 'crit' in p: - self._check_crit(p['crit']) + + # Merge and check (critical) headers + self._merge_check_headers(p, header) # check 'alg' is present if alg is None and 'alg' not in p: raise InvalidJWSSignature('No "alg" in headers') @@ -282,6 +327,33 @@ raise InvalidJWSSignature('Verification failed for all ' 'signatures' + repr(self.verifylog)) + def _deserialize_signature(self, s): + o = dict() + o['signature'] = base64url_decode(str(s['signature'])) + if 'protected' in s: + p = base64url_decode(str(s['protected'])) + o['protected'] = p.decode('utf-8') + if 'header' in s: + o['header'] = s['header'] + return o + + def _deserialize_b64(self, o, protected): + if protected is None: + b64n = None + else: + p = json_decode(protected) + b64n = p.get('b64') + if b64n is not None: + if not isinstance(b64n, bool): + raise InvalidJWSObject('b64 header must be boolean') + b64 = o.get('b64') + if b64 == b64n: + return + elif b64 is None: + o['b64'] = b64n + else: + raise InvalidJWSObject('conflicting b64 values') + def deserialize(self, raw_jws, key=None, alg=None): """Deserialize a JWS token. @@ -304,25 +376,21 @@ try: try: djws = json_decode(raw_jws) - o['payload'] = base64url_decode(str(djws['payload'])) if 'signatures' in djws: o['signatures'] = list() for s in djws['signatures']: - os = dict() - os['signature'] = base64url_decode(str(s['signature'])) - if 'protected' in s: - p = base64url_decode(str(s['protected'])) - os['protected'] = p.decode('utf-8') - if 'header' in s: - os['header'] = s['header'] + os = self._deserialize_signature(s) o['signatures'].append(os) + self._deserialize_b64(o, os.get('protected')) else: - o['signature'] = base64url_decode(str(djws['signature'])) - if 'protected' in djws: - p = base64url_decode(str(djws['protected'])) - o['protected'] = p.decode('utf-8') - if 'header' in djws: - o['header'] = djws['header'] + o = self._deserialize_signature(djws) + self._deserialize_b64(o, o.get('protected')) + + if 'payload' in djws: + if o.get('b64', True): + o['payload'] = base64url_decode(str(djws['payload'])) + else: + o['payload'] = djws['payload'] except ValueError: c = raw_jws.split('.') @@ -331,6 +399,7 @@ p = base64url_decode(str(c[0])) if len(p) > 0: o['protected'] = p.decode('utf-8') + self._deserialize_b64(o, o['protected']) o['payload'] = base64url_decode(str(c[1])) o['signature'] = base64url_decode(str(c[2])) @@ -353,7 +422,8 @@ :param potected: The Protected Header (optional) :param header: The Unprotected Header (optional) - :raises InvalidJWSObject: if no payload has been set on the object. + :raises InvalidJWSObject: if no payload has been set on the object, + or invalid headers are provided. :raises ValueError: if the key is not a :class:`JWK` object. :raises ValueError: if the algorithm is missing or is not provided by one of the headers. @@ -364,20 +434,36 @@ if not self.objects.get('payload', None): raise InvalidJWSObject('Missing Payload') + b64 = True + p = dict() if protected: if isinstance(protected, dict): - protected = json_encode(protected) - p = json_decode(protected) - # TODO: allow caller to specify list of headers it understands - if 'crit' in p: - self._check_crit(p['crit']) + p = protected + protected = json_encode(p) + else: + p = json_decode(protected) + # If b64 is present we must enforce criticality + if 'b64' in list(p.keys()): + crit = p.get('crit', []) + if 'b64' not in crit: + raise InvalidJWSObject('b64 header must always be critical') + b64 = p['b64'] + + if 'b64' in self.objects: + if b64 != self.objects['b64']: + raise InvalidJWSObject('Mixed b64 headers on signatures') + + h = None if header: if isinstance(header, dict): + h = header header = json_encode(header) - h = json_decode(header) - p = self._merge_headers(p, h) + else: + h = json_decode(header) + + p = self._merge_check_headers(p, h) if 'alg' in p: if alg is None: @@ -416,6 +502,7 @@ self.objects['signatures'].append(o) else: self.objects.update(o) + self.objects['b64'] = b64 def serialize(self, compact=False): """Serializes the object into a JWS token. @@ -428,7 +515,6 @@ :raises InvalidJWSSignature: if no signature has been added to the object, or no valid signature can be found. """ - if compact: if 'signatures' in self.objects: raise InvalidJWSOperation("Can't use compact encoding with " @@ -441,23 +527,40 @@ protected = base64url_encode(self.objects['protected']) else: protected = '' - return '.'.join([protected, - base64url_encode(self.objects['payload']), + if self.objects.get('payload', False): + if self.objects.get('b64', True): + payload = base64url_encode(self.objects['payload']) + else: + if isinstance(self.objects['payload'], bytes): + payload = self.objects['payload'].decode('utf-8') + else: + payload = self.objects['payload'] + if '.' in payload: + raise InvalidJWSOperation( + "Can't use compact encoding with unencoded " + "payload that uses the . character") + else: + payload = '' + return '.'.join([protected, payload, base64url_encode(self.objects['signature'])]) else: obj = self.objects + sig = dict() + if self.objects.get('payload', False): + if self.objects.get('b64', True): + sig['payload'] = base64url_encode(self.objects['payload']) + else: + sig['payload'] = self.objects['payload'] if 'signature' in obj: if not obj.get('valid', False): raise InvalidJWSSignature("No valid signature found") - sig = {'payload': base64url_encode(obj['payload']), - 'signature': base64url_encode(obj['signature'])} + sig['signature'] = base64url_encode(obj['signature']) if 'protected' in obj: sig['protected'] = base64url_encode(obj['protected']) if 'header' in obj: sig['header'] = obj['header'] elif 'signatures' in obj: - sig = {'payload': base64url_encode(obj['payload']), - 'signatures': list()} + sig['signatures'] = list() for o in obj['signatures']: if not o.get('valid', False): continue @@ -481,24 +584,27 @@ raise InvalidJWSOperation("Payload not verified") return self.objects['payload'] + def detach_payload(self): + self.objects.pop('payload', None) + @property def jose_header(self): obj = self.objects if 'signature' in obj: - jh = dict() if 'protected' in obj: p = json_decode(obj['protected']) - jh = self._merge_headers(jh, p) - jh = self._merge_headers(jh, obj.get('header', dict())) - return jh + else: + p = None + return self._merge_check_headers(p, obj.get('header', dict())) elif 'signatures' in self.objects: jhl = list() for o in obj['signatures']: jh = dict() - if 'protected' in obj: + if 'protected' in o: p = json_decode(o['protected']) - jh = self._merge_headers(jh, p) - jh = self._merge_headers(jh, o.get('header', dict())) + else: + p = None + jh = self._merge_check_headers(p, o.get('header', dict())) jhl.append(jh) return jhl else: diff -Nru python-jwcrypto-0.4.2/jwcrypto/jwt.py python-jwcrypto-0.6.0/jwcrypto/jwt.py --- python-jwcrypto-0.4.2/jwcrypto/jwt.py 2017-08-01 18:56:23.000000000 +0300 +++ python-jwcrypto-0.6.0/jwcrypto/jwt.py 2018-11-05 17:14:47.000000000 +0200 @@ -5,7 +5,7 @@ from six import string_types -from jwcrypto.common import json_decode, json_encode +from jwcrypto.common import JWException, json_decode, json_encode from jwcrypto.jwe import JWE from jwcrypto.jwk import JWK, JWKSet from jwcrypto.jws import JWS @@ -22,7 +22,7 @@ 'jti': 'JWT ID'} -class JWTExpired(Exception): +class JWTExpired(JWException): """Json Web Token is expired. This exception is raised when a token is expired accoring to its claims. @@ -39,7 +39,7 @@ super(JWTExpired, self).__init__(msg) -class JWTNotYetValid(Exception): +class JWTNotYetValid(JWException): """Json Web Token is not yet valid. This exception is raised when a token is not valid yet according to its @@ -57,7 +57,7 @@ super(JWTNotYetValid, self).__init__(msg) -class JWTMissingClaim(Exception): +class JWTMissingClaim(JWException): """Json Web Token claim is invalid. This exception is raised when a claim does not match the expected value. @@ -74,7 +74,7 @@ super(JWTMissingClaim, self).__init__(msg) -class JWTInvalidClaimValue(Exception): +class JWTInvalidClaimValue(JWException): """Json Web Token claim is invalid. This exception is raised when a claim does not match the expected value. @@ -91,7 +91,7 @@ super(JWTInvalidClaimValue, self).__init__(msg) -class JWTInvalidClaimFormat(Exception): +class JWTInvalidClaimFormat(JWException): """Json Web Token claim format is invalid. This exception is raised when a claim is not in a valid format. @@ -108,7 +108,7 @@ super(JWTInvalidClaimFormat, self).__init__(msg) -class JWTMissingKeyID(Exception): +class JWTMissingKeyID(JWException): """Json Web Token is missing key id. This exception is raised when trying to decode a JWT with a key set @@ -126,7 +126,7 @@ super(JWTMissingKeyID, self).__init__(msg) -class JWTMissingKey(Exception): +class JWTMissingKey(JWException): """Json Web Token is using a key not in the key set. This exception is raised if the key that was used is not available @@ -155,15 +155,15 @@ """Creates a JWT object. :param header: A dict or a JSON string with the JWT Header data. - :param claims: A dict or a string withthe JWT Claims data. + :param claims: A dict or a string with the JWT Claims data. :param jwt: a 'raw' JWT token :param key: A (:class:`jwcrypto.jwk.JWK`) key to deserialize - the token. A (:class:`jwcrypt.jwk.JWKSet`) can also be used. + the token. A (:class:`jwcrypto.jwk.JWKSet`) can also be used. :param algs: An optional list of allowed algorithms :param default_claims: An optional dict with default values for registred claims. A None value for NumericDate type claims will cause generation according to system time. Only the values - fro RFC 7519 - 4.1 are evaluated. + from RFC 7519 - 4.1 are evaluated. :param check_claims: An optional dict of claims that must be present in the token, if the value is not None the claim must match exactly. @@ -212,9 +212,15 @@ @header.setter def header(self, h): if isinstance(h, dict): - self._header = json_encode(h) + eh = json_encode(h) else: - self._header = h + eh = h + h = json_decode(eh) + + if h.get('b64') is False: + raise ValueError("b64 header is invalid." + "JWTs cannot use unencoded payloads") + self._header = eh @property def claims(self): @@ -224,6 +230,10 @@ @claims.setter def claims(self, c): + if self._reg_claims and not isinstance(c, dict): + # decode c so we can set default claims + c = json_decode(c) + if isinstance(c, dict): self._add_default_claims(c) self._claims = json_encode(c) @@ -276,7 +286,7 @@ def _add_jti_claim(self, claims): if 'jti' in claims or 'jti' not in self._reg_claims: return - claims['jti'] = uuid.uuid4() + claims['jti'] = str(uuid.uuid4()) def _add_default_claims(self, claims): if self._reg_claims is None: @@ -380,8 +390,8 @@ if value in claims[name]: continue raise JWTInvalidClaimValue( - "Invalid '%s' value. Expected '%s' in '%s'" % ( - name, value, claims[name])) + "Invalid '%s' value. Expected '%s' to be in '%s'" % ( + name, claims[name], value)) elif name == 'exp': if value is not None: @@ -398,7 +408,7 @@ else: if value is not None and value != claims[name]: raise JWTInvalidClaimValue( - "Invalid '%s' value. Expected '%d' got '%d'" % ( + "Invalid '%s' value. Expected '%s' got '%s'" % ( name, value, claims[name])) def make_signed_token(self, key): @@ -437,7 +447,7 @@ :param jwt: a 'raw' JWT token. :param key: A (:class:`jwcrypto.jwk.JWK`) verification or - decryption key, or a (:class:`jwcrypt.jwk.JWKSet`) that + decryption key, or a (:class:`jwcrypto.jwk.JWKSet`) that contains a key indexed by the 'kid' header. """ c = jwt.count('.') diff -Nru python-jwcrypto-0.4.2/jwcrypto/tests.py python-jwcrypto-0.6.0/jwcrypto/tests.py --- python-jwcrypto-0.4.2/jwcrypto/tests.py 2017-08-01 18:56:23.000000000 +0300 +++ python-jwcrypto-0.6.0/jwcrypto/tests.py 2018-11-05 17:14:47.000000000 +0200 @@ -3,7 +3,6 @@ from __future__ import unicode_literals import copy - import unittest from cryptography.hazmat.backends import default_backend @@ -312,11 +311,17 @@ self.assertRaises(jwk.InvalidJWKValue, jwk.JWK.from_pyca, dict()) + def test_jwk_from_json(self): + k = jwk.JWK.generate(kty='oct', size=256) + y = jwk.JWK.from_json(k.export()) + self.assertEqual(k.export(), y.export()) + def test_jwkset(self): k = jwk.JWK(**RSAPrivateKey) ks = jwk.JWKSet() ks.add(k) - ks2 = jwk.JWKSet().import_keyset(ks.export()) + ks2 = jwk.JWKSet() + ks2.import_keyset(ks.export()) self.assertEqual(len(ks), len(ks2)) self.assertEqual(len(ks), 1) k1 = ks.get_key(RSAPrivateKey['kid']) @@ -329,6 +334,15 @@ ks3 = jwk.JWKSet.from_json(ks.export()) self.assertEqual(len(ks), len(ks3)) + # Test Keyset with mutiple keys + ksm = jwk.JWKSet.from_json(json_encode(PrivateKeys)) + num = 0 + for item in ksm: + self.assertTrue(isinstance(item, jwk.JWK)) + self.assertTrue(item in ksm) + num += 1 + self.assertEqual(num, len(PrivateKeys['keys'])) + def test_thumbprint(self): for i in range(0, len(PublicKeys['keys'])): k = jwk.JWK(**PublicKeys['keys'][i]) @@ -378,6 +392,25 @@ self.assertFalse(pubkey.has_private) self.assertEqual(prikey.key_id, pubkey.key_id) + def test_public(self): + key = jwk.JWK.from_pem(RSAPrivatePEM, password=RSAPrivatePassword) + self.assertTrue(key.has_public) + self.assertTrue(key.has_private) + pubkey = key.public() + self.assertTrue(pubkey.has_public) + self.assertFalse(pubkey.has_private) + # finally check public works + e = jwe.JWE('plaintext', '{"alg":"RSA-OAEP","enc":"A256GCM"}') + e.add_recipient(pubkey) + enc = e.serialize() + d = jwe.JWE() + d.deserialize(enc, key) + self.assertEqual(d.payload, b'plaintext') + + def test_invalid_value(self): + with self.assertRaises(jwk.InvalidJWKValue): + jwk.JWK(kty='oct', k=b'\x01') + # RFC 7515 - A.1 A1_protected = \ @@ -556,7 +589,11 @@ 'key2': jwk.JWK(**A3_key), 'protected2': bytes(bytearray(A3_protected)).decode('utf-8'), 'header2': json_encode({"kid": "e9bc097a-ce51-4036-9562-d2ade882db0d"}), - 'serialized': A6_serialized} + 'serialized': A6_serialized, + 'jose_header': [{"kid": "2010-12-29", + "alg": "RS256"}, + {"kid": "e9bc097a-ce51-4036-9562-d2ade882db0d", + "alg": "ES256"}]} A7_example = \ '{' + \ @@ -630,6 +667,7 @@ sig = s.serialize() s.deserialize(sig, A6_example['key1']) s.deserialize(A6_serialized, A6_example['key2']) + self.assertEqual(A6_example['jose_header'], s.jose_header) def test_A7(self): s = jws.JWS(A6_example['payload']) @@ -801,6 +839,29 @@ '"ciphertext":"KDlTtXchhZTGufMYmOYGS4HffxPSUrfmqCHXaI9wOGY",' \ '"tag":"Mz-VPPyU4RlcuYv1IwIvzw"}' +Issue_136_Protected_Header_no_epk = { + "alg": "ECDH-ES+A256KW", + "enc": "A256CBC-HS512"} + +Issue_136_Contributed_JWE = \ + "eyJhbGciOiJFQ0RILUVTK0ExMjhLVyIsImVuYyI6IkEyNTZDQkMtSFM1MTIiLCJr" \ + "aWQiOiJrZXkxIiwiZXBrIjp7Imt0eSI6IkVDIiwiY3J2IjoiUC0yNTYiLCJ4Ijoi" \ + "cDNpU241cEFSNUpYUE5aVF9SSEw2MTJMUGliWEI2WDhvTE9EOXFrN2NhTSIsInki" \ + "OiI1Y04yQ2FqeXM3SVlDSXFEby1QUHF2bVQ1RzFvMEEtU0JicEQ5NFBOb3NNIn19" \ + ".wG51hYE_Vma8tvFKVyeZs4lsHhXiarEw3-59eWHPmhRflDAKrMvnBw1urezo_Bz" \ + "ZyPJ76m42ORQPbhEu5NvbJk3vgdgcp03j" \ + ".lRttW8r6P6zM0uYDQt0EjQ.qnOnz7biCbqdLEdUH3acMamFm-cBRCSTFb83tNPrgDU" \ + ".vZnwYpYjzrTaYritwMzaguaAMsq9rQOWe8NUHICv2hg" + +Issue_136_Contributed_Key = { + "alg": "ECDH-ES+A128KW", + "crv": "P-256", + "d": "F2PnliYin65AoIUxL1CwwzBPNeL2TyZPAKtkXOP50l8", + "kid": "key1", + "kty": "EC", + "x": "FPrb_xwxe8SBP3kO-e-WsofFp7n5-yc_tGgfAvqAP8g", + "y": "lM3HuyKMYUVsYdGqiWlkwTZbGO3Fh-hyadq8lfkTgBc"} + class TestJWE(unittest.TestCase): def check_enc(self, plaintext, protected, key, vector): @@ -843,6 +904,40 @@ e = jwe.JWE(algs=['A256KW']) e.deserialize(E_A5_ex, E_A4_ex['key2']) + def test_compact_protected_header(self): + """Compact representation requires a protected header""" + e = jwe.JWE(E_A1_ex['plaintext']) + e.add_recipient(E_A1_ex['key'], E_A1_ex['protected']) + + with self.assertRaises(jwe.InvalidJWEOperation): + e.serialize(compact=True) + + def test_compact_invalid_header(self): + with self.assertRaises(jwe.InvalidJWEOperation): + e = jwe.JWE(E_A1_ex['plaintext'], E_A1_ex['protected'], + aad='XYZ', recipient=E_A1_ex['key']) + e.serialize(compact=True) + + with self.assertRaises(jwe.InvalidJWEOperation): + e = jwe.JWE(E_A1_ex['plaintext'], E_A1_ex['protected'], + unprotected='{"jku":"https://example.com/keys.jwks"}', + recipient=E_A1_ex['key']) + e.serialize(compact=True) + + def test_JWE_Issue_136(self): + plaintext = "plain" + protected = json_encode(Issue_136_Protected_Header_no_epk) + key = jwk.JWK.generate(kty='EC', crv='P-521') + e = jwe.JWE(plaintext, protected) + e.add_recipient(key) + enc = e.serialize() + e.deserialize(enc, key) + self.assertEqual(e.payload, plaintext.encode('utf-8')) + + e = jwe.JWE() + e.deserialize(Issue_136_Contributed_JWE, + jwk.JWK(**Issue_136_Contributed_Key)) + MMA_vector_key = jwk.JWK(**E_A2_key) MMA_vector_ok_cek = \ @@ -1018,6 +1113,39 @@ keyset.add(key) jwt.JWT(jwt=token, key=keyset, check_claims={'exp': 1300819380}) + def test_invalid_claim_type(self): + key = jwk.JWK(**E_A2_key) + claims = {"testclaim": "test"} + claims.update(A1_claims) + t = jwt.JWT(A1_header, claims) + t.make_encrypted_token(key) + token = t.serialize() + + # Wrong string + self.assertRaises(jwt.JWTInvalidClaimValue, jwt.JWT, jwt=token, + key=key, check_claims={"testclaim": "ijgi"}) + + # Wrong type + self.assertRaises(jwt.JWTInvalidClaimValue, jwt.JWT, jwt=token, + key=key, check_claims={"testclaim": 123}) + + # Correct + jwt.JWT(jwt=token, key=key, check_claims={"testclaim": "test"}) + + def test_claim_params(self): + key = jwk.JWK(**E_A2_key) + default_claims = {"iss": "test", "exp": None} + string_claims = '{"string_claim":"test"}' + string_header = '{"alg":"RSA1_5","enc":"A128CBC-HS256"}' + t = jwt.JWT(string_header, string_claims, + default_claims=default_claims) + t.make_encrypted_token(key) + token = t.serialize() + + # Check default_claims + jwt.JWT(jwt=token, key=key, check_claims={"iss": "test", "exp": None, + "string_claim": "test"}) + class ConformanceTests(unittest.TestCase): @@ -1148,3 +1276,57 @@ self.assertEqual(inst.name, name) else: self.fail((name, cls)) + + +# RFC 7797 + +rfc7797_e_header = '{"alg":"HS256"}' +rfc7797_u_header = '{"alg":"HS256","b64":false,"crit":["b64"]}' +rfc7797_payload = "$.02" + + +class TestUnencodedPayload(unittest.TestCase): + + def test_regular(self): + result = \ + 'eyJhbGciOiJIUzI1NiJ9.JC4wMg.' + \ + '5mvfOroL-g7HyqJoozehmsaqmvTYGEq5jTI1gVvoEoQ' + + s = jws.JWS(rfc7797_payload) + s.add_signature(jwk.JWK(**SymmetricKeys['keys'][1]), + protected=rfc7797_e_header) + sig = s.serialize(compact=True) + self.assertEqual(sig, result) + + def test_compat_unencoded(self): + result = \ + 'eyJhbGciOiJIUzI1NiIsImI2NCI6ZmFsc2UsImNyaXQiOlsiYjY0Il19..' + \ + 'A5dxf2s96_n5FLueVuW1Z_vh161FwXZC4YLPff6dmDY' + + s = jws.JWS(rfc7797_payload) + s.add_signature(jwk.JWK(**SymmetricKeys['keys'][1]), + protected=rfc7797_u_header) + # check unencoded payload is in serialized form + sig = s.serialize() + self.assertEqual(json_decode(sig)['payload'], rfc7797_payload) + # check error raises if we try to get compact serialization + with self.assertRaises(jws.InvalidJWSOperation): + sig = s.serialize(compact=True) + # check compact serialization is allowed with detached payload + s.detach_payload() + sig = s.serialize(compact=True) + self.assertEqual(sig, result) + + def test_misses_crit(self): + s = jws.JWS(rfc7797_payload) + with self.assertRaises(jws.InvalidJWSObject): + s.add_signature(jwk.JWK(**SymmetricKeys['keys'][1]), + protected={"alg": "HS256", "b64": False}) + + def test_mismatching_encoding(self): + s = jws.JWS(rfc7797_payload) + s.add_signature(jwk.JWK(**SymmetricKeys['keys'][0]), + protected=rfc7797_e_header) + with self.assertRaises(jws.InvalidJWSObject): + s.add_signature(jwk.JWK(**SymmetricKeys['keys'][1]), + protected=rfc7797_u_header) diff -Nru python-jwcrypto-0.4.2/Makefile python-jwcrypto-0.6.0/Makefile --- python-jwcrypto-0.4.2/Makefile 2017-08-01 18:56:23.000000000 +0300 +++ python-jwcrypto-0.6.0/Makefile 2018-11-05 17:14:47.000000000 +0200 @@ -21,13 +21,15 @@ testlong: export TOX_TESTENV_PASSENV=JWCRYPTO_TESTS_ENABLE_MMA testlong: rm -f .coverage - tox -e py35 + tox -e py36 test: rm -f .coverage tox -e py27 tox -e py34 --skip-missing-interpreter tox -e py35 --skip-missing-interpreter + tox -e py36 --skip-missing-interpreter + tox -e py37 --skip-missing-interpreter DOCS_DIR = docs .PHONY: docs diff -Nru python-jwcrypto-0.4.2/README.md python-jwcrypto-0.6.0/README.md --- python-jwcrypto-0.4.2/README.md 2017-08-01 18:56:23.000000000 +0300 +++ python-jwcrypto-0.6.0/README.md 2018-11-05 17:14:47.000000000 +0200 @@ -1,3 +1,5 @@ +[![Build Status](https://travis-ci.org/latchset/jwcrypto.svg?branch=master)](https://travis-ci.org/latchset/jwcrypto) + JWCrypto ======== diff -Nru python-jwcrypto-0.4.2/setup.py python-jwcrypto-0.6.0/setup.py --- python-jwcrypto-0.4.2/setup.py 2017-08-01 18:56:23.000000000 +0300 +++ python-jwcrypto-0.6.0/setup.py 2018-11-05 17:14:47.000000000 +0200 @@ -6,7 +6,7 @@ setup( name = 'jwcrypto', - version = '0.4.2', + version = '0.6.0', license = 'LGPLv3+', maintainer = 'JWCrypto Project Contributors', maintainer_email = 's...@redhat.com', @@ -18,6 +18,7 @@ 'Programming Language :: Python :: 3.4', 'Programming Language :: Python :: 3.5', 'Programming Language :: Python :: 3.6', + 'Programming Language :: Python :: 3.7', 'Intended Audience :: Developers', 'Topic :: Security', 'Topic :: Software Development :: Libraries :: Python Modules' diff -Nru python-jwcrypto-0.4.2/tox.ini python-jwcrypto-0.6.0/tox.ini --- python-jwcrypto-0.4.2/tox.ini 2017-08-01 18:56:23.000000000 +0300 +++ python-jwcrypto-0.6.0/tox.ini 2018-11-05 17:14:47.000000000 +0200 @@ -1,5 +1,5 @@ [tox] -envlist = lint,py27,py34,py35,py36,pep8py2,pep8py3,doc,sphinx +envlist = lint,py27,py34,py35,py36,py37,pep8py2,pep8py3,doc,sphinx skip_missing_interpreters = true [testenv] unblock python-jwcrypto/0.6.0-1