Source code for jks.bks

# vim: set et ai ts=4 sts=4 sw=4:
import struct
import hashlib
from pyasn1.codec.ber import decoder
from pyasn1_modules import rfc5208, rfc2459
from Cryptodome.Hash import HMAC, SHA

from .util import *
from .jks import KeyStore, TrustedCertEntry
from . import rfc7292

ENTRY_TYPE_KEY = 2            # plaintext key entry as would otherwise be stored inside a sealed entry (type 4); no longer supported at the time of writing (BC 1.54)
ENTRY_TYPE_SECRET = 3         # for keys that were added to the store in already-protected form; can be arbitrary data
ENTRY_TYPE_SEALED = 4         # for keys that were protected by the BC keystore implementation upon adding
KEY_TYPE_PRIVATE = 0          #: Type indicator for private keys in :class:`BksKeyEntry`.
KEY_TYPE_PUBLIC = 1           #: Type indicator for public keys in :class:`BksKeyEntry`.
KEY_TYPE_SECRET = 2           #: Type indicator for secret keys in :class:`BksKeyEntry`. Indicates a key for use with a symmetric encryption algorithm.

class AbstractBksEntry(AbstractKeystoreEntry):
    """Abstract superclass for BKS keystore entry types"""
    def __init__(self, **kwargs):
        super(AbstractBksEntry, self).__init__(**kwargs)
        # All BKS entries can carry an arbitrary number of associated certificates
        self.cert_chain = kwargs.get("cert_chain", [])
        self._encrypted = kwargs.get("encrypted")

[docs]class BksTrustedCertEntry(TrustedCertEntry): """Represents a trusted certificate entry in a BKS or UBER keystore.""" pass # identical
[docs]class BksKeyEntry(AbstractBksEntry): """ Represents a non-encrypted cryptographic key (public, private or secret) stored in a BKS keystore. May exceptionally appear as a top-level entry type in (very) old keystores, but you are most likely to encounter these as the nested object inside a :class:`BksSealedKeyEntry` once decrypted. """ def __init__(self, type, format, algorithm, encoded, **kwargs): super(BksKeyEntry, self).__init__(**kwargs) self.type = type """An integer indicating the type of key: one of :const:`KEY_TYPE_PRIVATE`, :const:`KEY_TYPE_PUBLIC`, :const:`KEY_TYPE_SECRET`.""" self.format = format """A string indicating the format or encoding in which the key is stored. One of: ``PKCS8``, ``PKCS#8``, ``X.509``, ``X509``, ``RAW``.""" self.algorithm = algorithm """A string indicating the algorithm for which the key is valid.""" self.encoded = encoded """A byte string containing the key, formatted as indicated by the :attr:`format` attribute.""" if self.type == KEY_TYPE_PRIVATE: if self.format not in ["PKCS8", "PKCS#8"]: raise UnexpectedKeyEncodingException("Unexpected encoding for private key entry: '%s'" % self.format) # self.encoded is a PKCS#8 PrivateKeyInfo private_key_info = decoder.decode(self.encoded, asn1Spec=rfc5208.PrivateKeyInfo())[0] self.pkey_pkcs8 = self.encoded self.pkey = private_key_info['privateKey'].asOctets() self.algorithm_oid = private_key_info['privateKeyAlgorithm']['algorithm'].asTuple() elif self.type == KEY_TYPE_PUBLIC: if self.format not in ["X.509", "X509"]: raise UnexpectedKeyEncodingException("Unexpected encoding for public key entry: '%s'" % self.format) # self.encoded is an X.509 SubjectPublicKeyInfo spki = decoder.decode(self.encoded, asn1Spec=rfc2459.SubjectPublicKeyInfo())[0] self.public_key_info = self.encoded self.public_key = bitstring_to_bytes(spki['subjectPublicKey']) self.algorithm_oid = spki['algorithm']['algorithm'].asTuple() elif self.type == KEY_TYPE_SECRET: if self.format != "RAW": raise UnexpectedKeyEncodingException("Unexpected encoding for raw key entry: '%s'" % self.format) # self.encoded is an unwrapped/raw cryptographic key self.key = encoded self.key_size = len(encoded)*8 else: raise UnexpectedKeyEncodingException("Key format '%s' not recognized" % self.format)
[docs] def is_decrypted(self): """Always returns ``True`` for this entry type.""" return True
[docs] def decrypt(self, key_password): """Does nothing for this entry type; these entries are stored in non-encrypted form.""" pass
[docs] @classmethod def type2str(cls, t): """ Returns a string representation of the given key type. Returns one of ``PRIVATE``, ``PUBLIC`` or ``SECRET``, or ``None`` if no such key type is known. :param int t: Key type constant. One of :const:`KEY_TYPE_PRIVATE`, :const:`KEY_TYPE_PUBLIC`, :const:`KEY_TYPE_SECRET`. """ if t == KEY_TYPE_PRIVATE: return "PRIVATE" elif t == KEY_TYPE_PUBLIC: return "PUBLIC" elif t == KEY_TYPE_SECRET: return "SECRET" return None
[docs]class BksSecretKeyEntry(AbstractBksEntry): # TODO: consider renaming this to SecretValueEntry, since it's arbitrary secret data """ Conceptually similar to, but not to be confused with, :class:`BksKeyEntry` objects of type :const:`KEY_TYPE_SECRET`: - :class:`BksSecretKeyEntry` objects store the result of arbitrary user-supplied byte[]s, which, per the Java Keystore SPI, keystores are obligated to assume have already been protected by the user in some unspecified way. Because of this assumption, no password is provided for these entries when adding them to the keystore, and keystores are thus forced to store these bytes as-is. Produced by a call to ``KeyStore.setKeyEntry(String alias, byte[] key, Certificate[] chain)`` call. The bouncycastle project appears to have completely abandoned these entry types well over a decade ago now, and it is no longer possible to retrieve these entries through the Java APIs in any (remotely) recent BC version. - :class:`BksKeyEntry` objects of type :const:`KEY_TYPE_SECRET` store the result of a getEncoded() call on proper Java objects of type SecretKey. Produced by a call to ``KeyStore.setKeyEntry(String alias, Key key, char[] password, Certificate[] chain)``. The difference here is that the KeyStore implementation knows it's getting a proper (Secret)Key Java object, and can decide for itself how to store it given the password supplied by the user. I.e., in this version of setKeyEntry it is left up to the keystore implementation to encode and protect the supplied Key object, instead of in advance by the user. """ def __init__(self, **kwargs): super(BksSecretKeyEntry, self).__init__(**kwargs) self.key = self._encrypted """A byte string containing the secret key/value."""
[docs] def is_decrypted(self): """Always returns ``True`` for this entry type.""" return True
[docs] def decrypt(self, key_password): """Does nothing for this entry type; these entries stored arbitrary user-supplied data, unclear how to decrypt (may not be encrypted at all).""" pass
[docs]class BksSealedKeyEntry(AbstractBksEntry): """ PBEWithSHAAnd3-KeyTripleDES-CBC-encrypted wrapper around a :class:`BksKeyEntry`. The contained key type is unknown until decrypted. Once decrypted, objects of this type can be used in the same way as :class:`BksKeyEntry`: attribute accesses are forwarded to the wrapped :class:`BksKeyEntry` object. """ def __init__(self, **kwargs): super(BksSealedKeyEntry, self).__init__(**kwargs) self._nested = None # nested BksKeyEntry once decrypted def __getattr__(self, name): if not self.is_decrypted(): raise NotYetDecryptedException("Cannot access attribute '%s'; entry not yet decrypted, call decrypt() with the correct password first" % name) # if it's an attribute that exists here, return it; otherwise forward the request to the nested entry if "_"+name in self.__dict__: return self.__dict__["_"+name] else: return getattr(self._nested, name)
[docs] def is_decrypted(self): return (not self._encrypted)
[docs] def decrypt(self, key_password): if self.is_decrypted(): return pos = 0 data = self._encrypted salt, pos = BksKeyStore._read_data(data, pos) iteration_count = b4.unpack_from(data, pos)[0]; pos += 4 encrypted_blob = data[pos:] # The intention of the BKS entry decryption routine in BcKeyStoreSpi.StoreEntry.getObject(char[] password) appears to be: # - try to decrypt with "PBEWithSHAAnd3-KeyTripleDES-CBC" first (1.2.840.113549.; # - if that fails, try again with "BrokenPBEWithSHAAnd3-KeyTripleDES-CBC"; # - if that still fails, try again with "OldPBEWithSHAAnd3-KeyTripleDES-CBC" # - give up with an UnrecoverableKeyException # # However, at the time of writing (bcprov-jdk15on-1.53 and 1.54), the second and third cases can never successfully execute # because their implementation requests non-existent SecretKeyFactory objects for the Broken/Old algorithm names. # Inquiry through the BC developer mailing list tells us that this is indeed old functionality that has been retired long ago # and is not expected to be operational anymore, and should be cleaned up. # # So in practice, the real behaviour is: # - try to decrypt with "PBEWithSHAAnd3-KeyTripleDES-CBC" (1.2.840.113549.; # - give up with an UnrecoverableKeyException # # Implementation classes: # PBEWithSHAAnd3-KeyTripleDES-CBC -> org.bouncycastle.jcajce.provider.symmetric.DESede$PBEWithSHAAndDES3Key # BrokenPBEWithSHAAnd3-KeyTripleDES-CBC -> org.bouncycastle.jce.provider.BrokenJCEBlockCipher$BrokePBEWithSHAAndDES3Key # OldPBEWithSHAAnd3-KeyTripleDES-CBC -> org.bouncycastle.jce.provider.BrokenJCEBlockCipher$OldPBEWithSHAAndDES3Key # try: decrypted = rfc7292.decrypt_PBEWithSHAAnd3KeyTripleDESCBC(encrypted_blob, key_password, salt, iteration_count) except BadDataLengthException: raise BadKeystoreFormatException("Bad BKS entry format: %s" % str(e)) except BadPaddingException: raise DecryptionFailureException("Failed to decrypt data for key '%s'; wrong password?" % self.alias) # the plaintext content of a SealedEntry is a KeyEntry key_entry, dummy = BksKeyStore._read_bks_key(decrypted, 0, self.store_type) key_entry.store_type = self.store_type key_entry.cert_chain = self.cert_chain key_entry.alias = self.alias key_entry.timestamp = self.timestamp self._nested = key_entry self._encrypted = None
decrypt.__doc__ = AbstractBksEntry.decrypt.__doc__ is_decrypted.__doc__ = AbstractBksEntry.is_decrypted.__doc__
[docs]class BksKeyStore(AbstractKeystore): """ Bouncycastle "BKS" keystore parser. Supports both the current V2 and old V1 formats. """ def __init__(self, store_type, entries, version=2): super(BksKeyStore, self).__init__(store_type, entries) self.version = version """Version of the keystore format, if loaded.""" @property def certs(self): """A subset of the :attr:`entries` dictionary, filtered down to only those entries of type :class:`BksTrustedCertEntry`.""" return dict([(a, e) for a, e in self.entries.items() if isinstance(e, BksTrustedCertEntry)]) @property def secret_keys(self): """A subset of the :attr:`entries` dictionary, filtered down to only those entries of type :class:`BksSecretKeyEntry`.""" return dict([(a, e) for a, e in self.entries.items() if isinstance(e, BksSecretKeyEntry)]) @property def sealed_keys(self): """A subset of the :attr:`entries` dictionary, filtered down to only those entries of type :class:`BksSealedKeyEntry`.""" return dict([(a, e) for a, e in self.entries.items() if isinstance(e, BksSealedKeyEntry)]) @property def plain_keys(self): """A subset of the :attr:`entries` dictionary, filtered down to only those entries of type :class:`BksKeyEntry`.""" return dict([(a, e) for a, e in self.entries.items() if isinstance(e, BksKeyEntry)])
[docs] @classmethod def loads(cls, data, store_password, try_decrypt_keys=True): """ See :meth:`jks.jks.KeyStore.loads`. :param bytes data: Byte string representation of the keystore to be loaded. :param str password: Keystore password string :param bool try_decrypt_keys: Whether to automatically try to decrypt any encountered key entries using the same password as the keystore password. :returns: A loaded :class:`BksKeyStore` instance, if the keystore could be successfully parsed and the supplied store password is correct. If the ``try_decrypt_keys`` parameters was set to ``True``, any keys that could be successfully decrypted using the store password have already been decrypted; otherwise, no atttempt to decrypt any key entries is made. :raises BadKeystoreFormatException: If the keystore is malformed in some way :raises UnsupportedKeystoreVersionException: If the keystore contains an unknown format version number :raises KeystoreSignatureException: If the keystore signature could not be verified using the supplied store password :raises DuplicateAliasException: If the keystore contains duplicate aliases """ try: pos = 0 version = b4.unpack_from(data, pos)[0]; pos += 4 if version not in [1,2]: raise UnsupportedKeystoreVersionException("Unsupported BKS keystore version; only V1 and V2 supported, found v"+repr(version)) salt, pos = cls._read_data(data, pos) iteration_count = b4.unpack_from(data, pos)[0]; pos += 4 store_type = "bks" entries, size = cls._load_bks_entries(data[pos:], store_type, store_password, try_decrypt_keys=try_decrypt_keys) hmac_fn = hashlib.sha1 hmac_digest_size = hmac_fn().digest_size hmac_key_size = hmac_digest_size*8 if version != 1 else hmac_digest_size hmac_key = rfc7292.derive_key(hmac_fn, rfc7292.PURPOSE_MAC_MATERIAL, store_password, salt, iteration_count, hmac_key_size//8) store_data = data[pos:pos+size] store_hmac = data[pos+size:pos+size+hmac_digest_size] if len(store_hmac) != hmac_digest_size: raise BadKeystoreFormatException("Bad HMAC size; found %d bytes, expected %d bytes" % (len(store_hmac), hmac_digest_size)) hmac =, digestmod=SHA) hmac.update(store_data) computed_hmac = hmac.digest() if store_hmac != computed_hmac: raise KeystoreSignatureException("Hash mismatch; incorrect keystore password?") return cls(store_type, entries, version=version) except struct.error as e: raise BadKeystoreFormatException(e)
@classmethod def _load_bks_entries(cls, data, store_type, store_password, try_decrypt_keys=False): entries = {} pos = 0 while pos < len(data): _type = b1.unpack_from(data, pos)[0]; pos += 1 if _type == 0: break alias, pos = cls._read_utf(data, pos, kind="entry alias") timestamp = int(b8.unpack_from(data, pos)[0]); pos += 8 chain_length = b4.unpack_from(data, pos)[0]; pos += 4 cert_chain = [] for n in range(chain_length): entry, pos = cls._read_bks_cert(data, pos, store_type) cert_chain.append(entry) if _type == 1: # certificate entry, pos = cls._read_bks_cert(data, pos, store_type) elif _type == 2: # key: plaintext key entry, i.e. same as sealed key but without the PBEWithSHAAnd3KeyTripleDESCBC layer entry, pos = cls._read_bks_key(data, pos, store_type) elif _type == 3: # secret key: opaque arbitrary data blob, stored as-is by the keystore; can be anything (assumed to already be protected when supplied). entry, pos = cls._read_bks_secret(data, pos, store_type) elif _type == 4: # sealed key; a well-formatted certificate, private key or public key, encrypted by the BKS implementation with a standard algorithm at save time entry, pos = cls._read_bks_sealed(data, pos, store_type) else: raise BadKeystoreFormatException("Unexpected keystore entry type %d", tag) entry.alias = alias entry.timestamp = timestamp entry.cert_chain = cert_chain if try_decrypt_keys: try: entry.decrypt(store_password) except DecryptionFailureException: pass # ok, let user call .decrypt() manually afterwards if alias in entries: raise DuplicateAliasException("Found duplicate alias '%s'" % alias) entries[alias] = entry return (entries, pos) @classmethod def _read_bks_cert(cls, data, pos, store_type): cert_type, pos = cls._read_utf(data, pos, kind="certificate type") cert_data, pos = cls._read_data(data, pos) entry = BksTrustedCertEntry(type=cert_type, cert=cert_data, store_type=store_type) return entry, pos @classmethod def _read_bks_key(cls, data, pos, store_type): """Given a data stream, attempt to parse a stored BKS key entry at the given position, and return it as a BksKeyEntry.""" key_type = b1.unpack_from(data, pos)[0]; pos += 1 key_format, pos = BksKeyStore._read_utf(data, pos, kind="key format") key_algorithm, pos = BksKeyStore._read_utf(data, pos, kind="key algorithm") key_enc, pos = BksKeyStore._read_data(data, pos) entry = BksKeyEntry(key_type, key_format, key_algorithm, key_enc, store_type=store_type) return entry, pos @classmethod def _read_bks_secret(cls, data, pos, store_type): secret_data, pos = cls._read_data(data, pos) entry = BksSecretKeyEntry(store_type=store_type, encrypted=secret_data) return entry, pos @classmethod def _read_bks_sealed(cls, data, pos, store_type): sealed_data, pos = cls._read_data(data, pos) entry = BksSealedKeyEntry(store_type=store_type, encrypted=sealed_data) return entry, pos
[docs]class UberKeyStore(BksKeyStore): """ BouncyCastle "UBER" keystore format parser. """
[docs] @classmethod def loads(cls, data, store_password, try_decrypt_keys=True): """ See :meth:`jks.jks.KeyStore.loads`. :param bytes data: Byte string representation of the keystore to be loaded. :param str password: Keystore password string :param bool try_decrypt_keys: Whether to automatically try to decrypt any encountered key entries using the same password as the keystore password. :returns: A loaded :class:`UberKeyStore` instance, if the keystore could be successfully parsed and the supplied store password is correct. If the ``try_decrypt_keys`` parameters was set to ``True``, any keys that could be successfully decrypted using the store password have already been decrypted; otherwise, no atttempt to decrypt any key entries is made. :raises BadKeystoreFormatException: If the keystore is malformed in some way :raises UnsupportedKeystoreVersionException: If the keystore contains an unknown format version number :raises KeystoreSignatureException: If the keystore signature could not be verified using the supplied store password :raises DecryptionFailureException: If the keystore contents could not be decrypted using the supplied store password :raises DuplicateAliasException: If the keystore contains duplicate aliases """ # Uber keystores contain the same entry data as BKS keystores, except they wrap it differently: # BKS = BKS_store || HMAC-SHA1(BKS_store) # UBER = PBEWithSHAAndTwofish-CBC(BKS_store || SHA1(BKS_store)) # # where BKS_store represents the entry format shared by both keystore types. # # The Twofish key size is 256 bits, the PBE key derivation scheme is that as outlined by PKCS#12 (RFC 7292), # and the padding scheme for the Twofish cipher is PKCS#7. try: pos = 0 version = b4.unpack_from(data, pos)[0]; pos += 4 if version != 1: raise UnsupportedKeystoreVersionException('Unsupported UBER keystore version; only v1 supported, found v'+repr(version)) salt, pos = cls._read_data(data, pos) iteration_count = b4.unpack_from(data, pos)[0]; pos += 4 encrypted_bks_store = data[pos:] try: decrypted = rfc7292.decrypt_PBEWithSHAAndTwofishCBC(encrypted_bks_store, store_password, salt, iteration_count) except BadDataLengthException as e: raise BadKeystoreFormatException("Bad UBER keystore format: %s" % str(e)) except BadPaddingException as e: raise DecryptionFailureException("Failed to decrypt UBER keystore: bad password?") # Note: we can assume that the hash must be present at the last 20 bytes of the decrypted data (i.e. without first # parsing through to see where the entry data actually ends), because valid UBER keystores generators should not put # any trailing bytes after the hash prior to encrypting. hash_fn = hashlib.sha1 hash_digest_size = hash_fn().digest_size bks_store = decrypted[:-hash_digest_size] bks_hash = decrypted[-hash_digest_size:] if len(bks_hash) != hash_digest_size: raise BadKeystoreFormatException("Insufficient signature bytes; found %d bytes, expected %d bytes" % (len(bks_hash), hash_digest_size)) if hash_fn(bks_store).digest() != bks_hash: raise KeystoreSignatureException("Hash mismatch; incorrect keystore password?") store_type = "uber" entries, size = cls._load_bks_entries(bks_store, store_type, store_password, try_decrypt_keys=try_decrypt_keys) return cls(store_type, entries, version=version) except struct.error as e: raise BadKeystoreFormatException(e)
def __init__(self, store_type, entries, version=1): super(UberKeyStore, self).__init__(store_type, entries, version=version) self.version = version # only here so Sphinx documents the field """Version of the keystore format, if loaded."""