laforge has submitted this change. ( 
https://gerrit.osmocom.org/c/pysim/+/41450?usp=email )

Change subject: card_key_provider: separate and refactor CSV column encryption
......................................................................

card_key_provider: separate and refactor CSV column encryption

The CardKeyProviderCsv class implements a column decryption scheme
where columns are protected using a transport key. The CSV files
are enrcypted using contrib/csv-encrypt-columns.py.

The current implementation has two main problems:

- The decryption code in CardKeyProviderCsv is not specific to CSV files.
  It could be re-used in other formats, for example to decrypt columns
  (fields) red from a database. So let's split the decryption code in a
  separate class.

- The encryption code in csv-encrypt-columns.py accesses methods and
  properties in CardKeyProviderCsv. Also having the coresponding
  encryption code somewhere out of tree may be confusing. Let's improve
  the design and put encryption and decryption functions in a single
  class. Let's also make sure the encryption/decryption is covered by
  unittests.

Related: SYS#7725
Change-Id: I180457d4938f526d227c81020e4e03c6b3a57dab
---
M contrib/csv-encrypt-columns.py
M pySim/card_key_provider.py
M tests/unittests/test_card_key_provider.py
3 files changed, 165 insertions(+), 60 deletions(-)

Approvals:
  laforge: Looks good to me, approved
  Jenkins Builder: Verified




diff --git a/contrib/csv-encrypt-columns.py b/contrib/csv-encrypt-columns.py
index 49340a2..9967005 100755
--- a/contrib/csv-encrypt-columns.py
+++ b/contrib/csv-encrypt-columns.py
@@ -24,20 +24,12 @@
 from Cryptodome.Cipher import AES
 from osmocom.utils import h2b, b2h, Hexstr

-from pySim.card_key_provider import CardKeyProviderCsv
+from pySim.card_key_provider import CardKeyFieldCryptor

-def dict_keys_to_upper(d: dict) -> dict:
-    return {k.upper():v for k,v in d.items()}
-
-class CsvColumnEncryptor:
+class CsvColumnEncryptor(CardKeyFieldCryptor):
     def __init__(self, filename: str, transport_keys: dict):
         self.filename = filename
-        self.transport_keys = dict_keys_to_upper(transport_keys)
-
-    def encrypt_col(self, colname:str, value: str) -> Hexstr:
-        key = self.transport_keys[colname]
-        cipher = AES.new(h2b(key), AES.MODE_CBC, CardKeyProviderCsv.IV)
-        return b2h(cipher.encrypt(h2b(value)))
+        self.crypt = CardKeyFieldCryptor(transport_keys)

     def encrypt(self) -> None:
         with open(self.filename, 'r') as infile:
@@ -49,9 +41,8 @@
                 cw.writeheader()

                 for row in cr:
-                    for key_colname in self.transport_keys:
-                        if key_colname in row:
-                            row[key_colname] = self.encrypt_col(key_colname, 
row[key_colname])
+                    for fieldname in cr.fieldnames:
+                            row[fieldname] = 
self.crypt.encrypt_field(fieldname, row[fieldname])
                     cw.writerow(row)

 if __name__ == "__main__":
@@ -71,9 +62,5 @@
         print("You must specify at least one key!")
         sys.exit(1)

-    csv_column_keys = 
CardKeyProviderCsv.process_transport_keys(csv_column_keys)
-    for name, key in csv_column_keys.items():
-        print("Encrypting column %s using AES key %s" % (name, key))
-
     cce = CsvColumnEncryptor(opts.CSVFILE, csv_column_keys)
     cce.encrypt()
diff --git a/pySim/card_key_provider.py b/pySim/card_key_provider.py
index 9f9dc70..561418f 100644
--- a/pySim/card_key_provider.py
+++ b/pySim/card_key_provider.py
@@ -10,7 +10,7 @@
 operation with pySim-shell.
 """

-# (C) 2021-2024 by Sysmocom s.f.m.c. GmbH
+# (C) 2021-2025 by Sysmocom s.f.m.c. GmbH
 # All Rights Reserved
 #
 # Author: Philipp Maier, Harald Welte
@@ -31,23 +31,104 @@
 from typing import List, Dict, Optional
 from Cryptodome.Cipher import AES
 from osmocom.utils import h2b, b2h
+from pySim.log import PySimLogger

 import abc
 import csv
+import logging
+
+log = PySimLogger.get("CARDKEY")

 card_key_providers = []  # type: List['CardKeyProvider']

-# well-known groups of columns relate to a given functionality.  This avoids 
having
-# to specify the same transport key N number of times, if the same key is used 
for multiple
-# fields of one group, like KIC+KID+KID of one SD.
-CRYPT_GROUPS = {
-    'UICC_SCP02': ['UICC_SCP02_KIC1', 'UICC_SCP02_KID1', 'UICC_SCP02_KIK1'],
-    'UICC_SCP03': ['UICC_SCP03_KIC1', 'UICC_SCP03_KID1', 'UICC_SCP03_KIK1'],
-    'SCP03_ISDR': ['SCP03_ENC_ISDR', 'SCP03_MAC_ISDR', 'SCP03_DEK_ISDR'],
-    'SCP03_ISDA': ['SCP03_ENC_ISDR', 'SCP03_MAC_ISDA', 'SCP03_DEK_ISDA'],
-    'SCP03_ECASD': ['SCP03_ENC_ECASD', 'SCP03_MAC_ECASD', 'SCP03_DEK_ECASD'],
+class CardKeyFieldCryptor:
+    """
+    A Card key field encryption class that may be used by Card key provider 
implementations to add support for
+    a column-based encryption to protect sensitive material (cryptographic key 
material, ADM keys, etc.).
+    The sensitive material is encrypted using a "key-encryption key", 
occasionally also known as "transport key"
+    before it is stored into a file or database (see also GSMA FS.28). The 
"transport key" is then used to decrypt
+    the key material on demand.
+    """
+
+    # well-known groups of columns relate to a given functionality.  This 
avoids having
+    # to specify the same transport key N number of times, if the same key is 
used for multiple
+    # fields of one group, like KIC+KID+KID of one SD.
+    __CRYPT_GROUPS = {
+            'UICC_SCP02': ['UICC_SCP02_KIC1', 'UICC_SCP02_KID1', 
'UICC_SCP02_KIK1'],
+            'UICC_SCP03': ['UICC_SCP03_KIC1', 'UICC_SCP03_KID1', 
'UICC_SCP03_KIK1'],
+            'SCP03_ISDR': ['SCP03_ENC_ISDR', 'SCP03_MAC_ISDR', 
'SCP03_DEK_ISDR'],
+            'SCP03_ISDA': ['SCP03_ENC_ISDR', 'SCP03_MAC_ISDA', 
'SCP03_DEK_ISDA'],
+            'SCP03_ECASD': ['SCP03_ENC_ECASD', 'SCP03_MAC_ECASD', 
'SCP03_DEK_ECASD'],
     }

+    __IV = b'\x23' * 16
+
+    @staticmethod
+    def __dict_keys_to_upper(d: dict) -> dict:
+            return {k.upper():v for k,v in d.items()}
+
+    @staticmethod
+    def __process_transport_keys(transport_keys: dict, crypt_groups: dict):
+        """Apply a single transport key to multiple fields/columns, if the 
name is a group."""
+        new_dict = {}
+        for name, key in transport_keys.items():
+            if name in crypt_groups:
+                for field in crypt_groups[name]:
+                    new_dict[field] = key
+            else:
+                new_dict[name] = key
+        return new_dict
+
+    def __init__(self, transport_keys: dict):
+        """
+        Create new field encryptor/decryptor object and set transport keys, 
usually one for each column. In some cases
+        it is also possible to use a single key for multiple columns (see also 
__CRYPT_GROUPS)
+
+        Args:
+                transport_keys : a dict indexed by field name, whose values 
are hex-encoded AES keys for the
+                                 respective field (column) of the CSV. This is 
done so that different fields
+                                 (columns) can use different transport keys, 
which is strongly recommended by
+                                 GSMA FS.28
+        """
+        self.transport_keys = 
self.__process_transport_keys(self.__dict_keys_to_upper(transport_keys),
+                                                            
self.__CRYPT_GROUPS)
+        for name, key in self.transport_keys.items():
+                log.debug("Encrypting/decrypting field %s using AES key %s" % 
(name, key))
+
+    def decrypt_field(self, field_name: str, encrypted_val: str) -> str:
+        """
+        Decrypt a single field. The decryption is only applied if we have a 
transport key is known under the provided
+        field name, otherwise the field is treated as plaintext and passed 
through as it is.
+
+        Args:
+                field_name : name of the field to decrypt (used to identify 
which key to use)
+                encrypted_val : encrypted field value
+
+        Returns:
+                plaintext field value
+        """
+        if not field_name.upper() in self.transport_keys:
+            return encrypted_val
+        cipher = AES.new(h2b(self.transport_keys[field_name.upper()]), 
AES.MODE_CBC, self.__IV)
+        return b2h(cipher.decrypt(h2b(encrypted_val)))
+
+    def encrypt_field(self, field_name: str, plaintext_val: str) -> str:
+        """
+        Encrypt a single field. The encryption is only applied if we have a 
transport key is known under the provided
+        field name, otherwise the field is treated as non sensitive and passed 
through as it is.
+
+        Args:
+                field_name : name of the field to decrypt (used to identify 
which key to use)
+                encrypted_val : encrypted field value
+
+        Returns:
+                plaintext field value
+        """
+        if not field_name.upper() in self.transport_keys:
+            return plaintext_val
+        cipher = AES.new(h2b(self.transport_keys[field_name.upper()]), 
AES.MODE_CBC, self.__IV)
+        return b2h(cipher.encrypt(h2b(plaintext_val)))
+
 class CardKeyProvider(abc.ABC):
     """Base class, not containing any concrete implementation."""

@@ -89,13 +170,9 @@
                 dictionary of {field, value} strings for each requested field 
from 'fields'
         """

-
 class CardKeyProviderCsv(CardKeyProvider):
-    """Card key provider implementation that allows to query against a 
specified CSV file.
-    Supports column-based encryption as it is generally a bad idea to store 
cryptographic key material in
-    plaintext.  Instead, the key material should be encrypted by a 
"key-encryption key", occasionally also
-    known as "transport key" (see GSMA FS.28)."""
-    IV = b'\x23' * 16
+    """Card key provider implementation that allows to query against a 
specified CSV file."""
+
     csv_file = None
     filename = None

@@ -103,35 +180,13 @@
         """
         Args:
                 filename : file name (path) of CSV file containing 
card-individual key/data
-                transport_keys : a dict indexed by field name, whose values 
are hex-encoded AES keys for the
-                                 respective field (column) of the CSV.  This 
is done so that different fields
-                                 (columns) can use different transport keys, 
which is strongly recommended by
-                                 GSMA FS.28
+                transport_keys : (see class CardKeyFieldCryptor)
         """
         self.csv_file = open(filename, 'r')
         if not self.csv_file:
             raise RuntimeError("Could not open CSV file '%s'" % filename)
         self.filename = filename
-        self.transport_keys = self.process_transport_keys(transport_keys)
-
-    @staticmethod
-    def process_transport_keys(transport_keys: dict):
-        """Apply a single transport key to multiple fields/columns, if the 
name is a group."""
-        new_dict = {}
-        for name, key in transport_keys.items():
-            if name in CRYPT_GROUPS:
-                for field in CRYPT_GROUPS[name]:
-                    new_dict[field] = key
-            else:
-                new_dict[name] = key
-        return new_dict
-
-    def _decrypt_field(self, field_name: str, encrypted_val: str) -> str:
-        """decrypt a single field, if we have a transport key for the field of 
that name."""
-        if not field_name in self.transport_keys:
-            return encrypted_val
-        cipher = AES.new(h2b(self.transport_keys[field_name]), AES.MODE_CBC, 
self.IV)
-        return b2h(cipher.decrypt(h2b(encrypted_val)))
+        self.crypt = CardKeyFieldCryptor(transport_keys)

     def get(self, fields: List[str], key: str, value: str) -> Dict[str, str]:
         super()._verify_get_data(fields, key, value)
@@ -147,7 +202,7 @@
             if row[key] == value:
                 for f in fields:
                     if f in row:
-                        rc.update({f: self._decrypt_field(f, row[f])})
+                        rc.update({f: self.crypt.decrypt_field(f, row[f])})
                     else:
                         raise RuntimeError("CSV-File '%s' lacks column '%s'" % 
(self.filename, f))
         return rc
diff --git a/tests/unittests/test_card_key_provider.py 
b/tests/unittests/test_card_key_provider.py
index 03148e2..bbe76e8 100644
--- a/tests/unittests/test_card_key_provider.py
+++ b/tests/unittests/test_card_key_provider.py
@@ -4,7 +4,7 @@
 import os
 from pySim.card_key_provider import *

-class TestCardKeyProvider(unittest.TestCase):
+class TestCardKeyProviderCsv(unittest.TestCase):

     def __init__(self, *args, **kwargs):
         column_keys = {"KI" : "000424252525535532532A0B0C0D0E0F",
@@ -76,5 +76,68 @@
             result = card_key_provider_get_field("KIC1", "ICCID", 
t.get('ICCID'))
             self.assertEqual(result, t.get('EXPECTED'))

+class TestCardKeyFieldCryptor(unittest.TestCase):
+
+    def __init__(self, *args, **kwargs):
+        transport_keys = {"KI" : "000424252525535532532A0B0C0D0E0F",
+                          "OPC" : "000102030405065545645645645D0E0F",
+                          "KIC1" : "06410203546406456456450B0C0D0E0F",
+                          "UICC_SCP03" : "00040267840507667609045645645E0F"}
+        self.crypt = CardKeyFieldCryptor(transport_keys)
+        super().__init__(*args, **kwargs)
+
+    def test_encrypt_field(self):
+        test_data = [{'EXPECTED' : "0b1e1e56cd62645aeb4c2d72a7c98f27",
+                      'PLAINTEXT_VAL' : "000102030405060708090a0b0c0d0e0f", 
'FIELDNAME' : "OPC"},
+                     {'EXPECTED' : "000102030405060708090a0b0c0d0e0f",
+                      'PLAINTEXT_VAL' : "000102030405060708090a0b0c0d0e0f", 
'FIELDNAME' : "NOCRYPT"},
+                     {'EXPECTED' : "00248276d2734f108f9761e2f98e2a9d",
+                      'PLAINTEXT_VAL' : "000102030405060708090a0b0c0d0e0f", 
'FIELDNAME' : "UICC_SCP03_KIC1"},
+                     {'EXPECTED' : "00248276d2734f108f9761e2f98e2a9d",
+                      'PLAINTEXT_VAL' : "000102030405060708090a0b0c0d0e0f", 
'FIELDNAME' : "UICC_SCP03_KID1"},
+                     {'EXPECTED' : "00248276d2734f108f9761e2f98e2a9d",
+                      'PLAINTEXT_VAL' : "000102030405060708090a0b0c0d0e0f", 
'FIELDNAME' : "UICC_SCP03_KIK1"},
+                     {'EXPECTED' : "0b1e1e56cd62645aeb4c2d72a7c98f27",
+                      'PLAINTEXT_VAL' : "000102030405060708090a0b0c0d0e0f", 
'FIELDNAME' : "opc"},
+                     {'EXPECTED' : "000102030405060708090a0b0c0d0e0f",
+                      'PLAINTEXT_VAL' : "000102030405060708090a0b0c0d0e0f", 
'FIELDNAME' : "nocrypt"},
+                     {'EXPECTED' : "00248276d2734f108f9761e2f98e2a9d",
+                      'PLAINTEXT_VAL' : "000102030405060708090a0b0c0d0e0f", 
'FIELDNAME' : "uicc_scp03_kic1"},
+                     {'EXPECTED' : "00248276d2734f108f9761e2f98e2a9d",
+                      'PLAINTEXT_VAL' : "000102030405060708090a0b0c0d0e0f", 
'FIELDNAME' : "uicc_scp03_kid1"},
+                     {'EXPECTED' : "00248276d2734f108f9761e2f98e2a9d",
+                      'PLAINTEXT_VAL' : "000102030405060708090a0b0c0d0e0f", 
'FIELDNAME' : "uicc_scp03_kik1"}]
+
+        for t in test_data:
+            result = self.crypt.encrypt_field(t.get('FIELDNAME'), 
t.get('PLAINTEXT_VAL'))
+            self.assertEqual(result, t.get('EXPECTED'))
+
+    def test_decrypt_field(self):
+        test_data = [{'EXPECTED' : "000102030405060708090a0b0c0d0e0f",
+                      'ENCRYPTED_VAL' : "0b1e1e56cd62645aeb4c2d72a7c98f27", 
'FIELDNAME' : "OPC"},
+                     {'EXPECTED' : "000102030405060708090a0b0c0d0e0f",
+                      'ENCRYPTED_VAL' : "000102030405060708090a0b0c0d0e0f", 
'FIELDNAME' : "NOCRYPT"},
+                     {'EXPECTED' : "000102030405060708090a0b0c0d0e0f",
+                      'ENCRYPTED_VAL' : "00248276d2734f108f9761e2f98e2a9d", 
'FIELDNAME' : "UICC_SCP03_KIC1"},
+                     {'EXPECTED' : "000102030405060708090a0b0c0d0e0f",
+                      'ENCRYPTED_VAL' : "00248276d2734f108f9761e2f98e2a9d", 
'FIELDNAME' : "UICC_SCP03_KID1"},
+                     {'EXPECTED' : "000102030405060708090a0b0c0d0e0f",
+                      'ENCRYPTED_VAL' : "00248276d2734f108f9761e2f98e2a9d", 
'FIELDNAME' : "UICC_SCP03_KIK1"},
+                     {'EXPECTED' : "000102030405060708090a0b0c0d0e0f",
+                      'ENCRYPTED_VAL' : "0b1e1e56cd62645aeb4c2d72a7c98f27", 
'FIELDNAME' : "opc"},
+                     {'EXPECTED' : "000102030405060708090a0b0c0d0e0f",
+                      'ENCRYPTED_VAL' : "000102030405060708090a0b0c0d0e0f", 
'FIELDNAME' : "nocrypt"},
+                     {'EXPECTED' : "000102030405060708090a0b0c0d0e0f",
+                      'ENCRYPTED_VAL' : "00248276d2734f108f9761e2f98e2a9d", 
'FIELDNAME' : "uicc_scp03_kic1"},
+                     {'EXPECTED' : "000102030405060708090a0b0c0d0e0f",
+                      'ENCRYPTED_VAL' : "00248276d2734f108f9761e2f98e2a9d", 
'FIELDNAME' : "uicc_scp03_kid1"},
+                     {'EXPECTED' : "000102030405060708090a0b0c0d0e0f",
+                      'ENCRYPTED_VAL' : "00248276d2734f108f9761e2f98e2a9d", 
'FIELDNAME' : "uicc_scp03_kik1"}]
+
+        for t in test_data:
+            result = self.crypt.decrypt_field(t.get('FIELDNAME'), 
t.get('ENCRYPTED_VAL'))
+            self.assertEqual(result, t.get('EXPECTED'))
+
+
 if __name__ == "__main__":
        unittest.main()

--
To view, visit https://gerrit.osmocom.org/c/pysim/+/41450?usp=email
To unsubscribe, or for help writing mail filters, visit 
https://gerrit.osmocom.org/settings?usp=email

Gerrit-MessageType: merged
Gerrit-Project: pysim
Gerrit-Branch: master
Gerrit-Change-Id: I180457d4938f526d227c81020e4e03c6b3a57dab
Gerrit-Change-Number: 41450
Gerrit-PatchSet: 8
Gerrit-Owner: dexter <[email protected]>
Gerrit-Reviewer: Jenkins Builder
Gerrit-Reviewer: fixeria <[email protected]>
Gerrit-Reviewer: laforge <[email protected]>

Reply via email to