pitrou commented on a change in pull request #10450:
URL: https://github.com/apache/arrow/pull/10450#discussion_r687640148
##########
File path: ci/scripts/python_wheel_manylinux_build.sh
##########
@@ -89,6 +90,7 @@ cmake \
-DARROW_ORC=${ARROW_ORC} \
-DARROW_PACKAGE_KIND="python-wheel-manylinux${MANYLINUX_VERSION}" \
-DARROW_PARQUET=${ARROW_PARQUET} \
+ -DPARQUET_REQUIRE_ENCRYPTION=${PARQUET_REQUIRE_ENCRYPTION} \
Review comment:
Same question here: did you check that this works?
##########
File path: ci/scripts/python_wheel_macos_build.sh
##########
@@ -104,6 +105,7 @@ cmake \
-DARROW_ORC=${ARROW_ORC} \
-DARROW_PACKAGE_KIND="python-wheel-macos" \
-DARROW_PARQUET=${ARROW_PARQUET} \
+ -DPARQUET_REQUIRE_ENCRYPTION=${PARQUET_REQUIRE_ENCRYPTION}
Review comment:
Are you just adding this blindly or did you check that building the
MacOS wheels still works with this option?
##########
File path: python/pyarrow/_parquet.pyx
##########
@@ -1340,6 +1355,58 @@ cdef shared_ptr[ArrowWriterProperties]
_create_arrow_writer_properties(
return arrow_properties
+cdef ParquetCipher cipher_from_name(name):
+ name = name.upper()
+ if name == 'AES_GCM_V1':
+ return ParquetCipher_AES_GCM_V1
+ elif name == 'AES_GCM_CTR_V1':
+ return ParquetCipher_AES_GCM_CTR_V1
+ else:
+ raise ValueError('Invalid value for algorithm: {0}'.format(name))
+
+
+def cipher_to_name(ParquetCipher cipher):
Review comment:
This can be a `cdef` as well, IMHO.
##########
File path: python/pyarrow/_parquet.pyx
##########
@@ -1452,3 +1522,415 @@ cdef class ParquetWriter(_Weakrefable):
return result
raise RuntimeError(
'file metadata is only available after writer close')
+
+cdef class EncryptionConfiguration(_Weakrefable):
+ cdef:
+ shared_ptr[CEncryptionConfiguration] configuration
+
+ # Avoid mistakingly creating attributes
+ __slots__ = ()
+
+ def __init__(self, footer_key, *, column_keys=None,
+ uniform_encryption=None, encryption_algorithm=None,
+ plaintext_footer=None, double_wrapping=None,
+ cache_lifetime=None, internal_key_material=None,
+ data_key_length_bits=None):
+ self.configuration.reset(
+ new CEncryptionConfiguration(tobytes(footer_key)))
+ if column_keys is not None:
+ self.column_keys = column_keys
+ if uniform_encryption is not None:
+ self.uniform_encryption = uniform_encryption
+ if encryption_algorithm is not None:
+ self.encryption_algorithm = encryption_algorithm
+ if plaintext_footer is not None:
+ self.plaintext_footer = plaintext_footer
+ if double_wrapping is not None:
+ self.double_wrapping = double_wrapping
+ if cache_lifetime is not None:
+ self.cache_lifetime = cache_lifetime
+ if internal_key_material is not None:
+ self.internal_key_material = internal_key_material
+ if data_key_length_bits is not None:
+ self.data_key_length_bits = data_key_length_bits
+
+ @property
+ def footer_key(self):
+ """ID of the master key for footer encryption/signing"""
+ return frombytes(self.configuration.get().footer_key)
+
+ @property
+ def column_keys(self):
+ """
+ List of columns to encrypt, with master key IDs (see HIVE-21848).
+ Format: "masterKeyID:colName,colName;masterKeyID:colName..."
Review comment:
What does this mean? It seems a dict is returned.
##########
File path: python/pyarrow/tests/parquet/test_parquet_encryption.py
##########
@@ -0,0 +1,388 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import pyarrow as pa
+import pytest
+
+try:
+ import pyarrow.parquet as pq
+except ImportError:
+ pq = None
+
+import base64
+from cryptography.fernet import Fernet
+from cryptography.fernet import InvalidToken
+from collections import OrderedDict
+from datetime import timedelta
+
+DATA_TABLE = pa.Table.from_pydict(
+ OrderedDict([
+ ('a', pa.array([1, 2, 3])),
+ ('b', pa.array(['a', 'b', 'c'])),
+ ('c', pa.array(['x', 'y', 'z']))
+ ])
+)
+PARQUET_NAME = 'encrypted_table.in_mem.parquet'
+FOOTER_KEY = Fernet.generate_key()
+FOOTER_KEY_NAME = "footer_key"
+COL_KEY = Fernet.generate_key()
+COL_KEY_NAME = "col_key"
+BASIC_ENCRYPTION_CONFIG = pq.EncryptionConfiguration(
+ footer_key=FOOTER_KEY_NAME,
+ column_keys={
+ COL_KEY_NAME: ["a", "b"],
+ },
+)
+
+
+class InMemoryKmsClient(pq.KmsClient):
+ """This is a mock class implementation of KmsClient, built for testing
only.
+ """
+
+ def __init__(self, config):
+ """Create an InMemoryKmsClient instance."""
+ pq.KmsClient.__init__(self)
+ self.master_keys_map = config.custom_kms_conf
+
+ def wrap_key(self, key_bytes, master_key_identifier):
+ """Wrap key key_bytes with key identified by master_key_identifier.
+ The result contains nonce concatenated before the encrypted key."""
+ master_key = self.master_keys_map[master_key_identifier]
+ # Create a cipher object to encrypt data
+ cipher = Fernet(master_key.encode('utf-8'))
+ encrypted_key = cipher.encrypt(key_bytes)
+ result = base64.b64encode(encrypted_key)
+ return result
+
+ def unwrap_key(self, wrapped_key, master_key_identifier):
+ """Unwrap wrapped_key with key identified by master_key_identifier"""
+ master_key = self.master_keys_map[master_key_identifier]
+ decoded_wrapped_key = base64.b64decode(wrapped_key)
+ # Create a cipher object to decrypt data
+ cipher = Fernet(master_key.encode('utf-8'))
+ decrypted_key = cipher.decrypt(decoded_wrapped_key)
+ return decrypted_key
+
+
+def verify_file_encrypted(path):
+ """Verify that the file is encrypted by looking at its first 4 bytes.
+ If it's the magic string PARE
+ then this is a parquet with encrypted footer."""
+ with open(path, "rb") as file:
+ magic_str = file.read(4)
+ # Verify magic string for parquet with encrypted footer is PARE
+ assert(magic_str == b'PARE')
+
+
[email protected]
+def test_encrypted_parquet_write_read(tempdir):
+ """Write an encrypted parquet, verify it's encrypted, and then read it."""
+ path = tempdir / PARQUET_NAME
+ table = DATA_TABLE
+
+ # Encrypt the footer with the footer key,
+ # encrypt column `a` and column `b` with another key,
+ # keep `c` plaintext
+ encryption_config = pq.EncryptionConfiguration(
+ footer_key=FOOTER_KEY_NAME,
+ column_keys={
+ COL_KEY_NAME: ["a", "b"],
+ },
+ encryption_algorithm="AES_GCM_V1",
+ cache_lifetime=timedelta(minutes=5.0),
+ data_key_length_bits=256)
+
+ kms_connection_config = pq.KmsConnectionConfig(
+ custom_kms_conf={
+ FOOTER_KEY_NAME: FOOTER_KEY.decode("UTF-8"),
+ COL_KEY_NAME: COL_KEY.decode("UTF-8"),
+ }
+ )
+
+ def kms_factory(kms_connection_configuration):
+ return InMemoryKmsClient(kms_connection_configuration)
+
+ crypto_factory = pq.CryptoFactory(kms_factory)
+ # Write with encryption properties
+ write_encrypted_parquet(path, table, encryption_config,
+ kms_connection_config, crypto_factory)
+ verify_file_encrypted(path)
+
+ # Read with decryption properties
+ decryption_config = pq.DecryptionConfiguration(
+ cache_lifetime=timedelta(minutes=5.0))
+ result_table = read_encrypted_parquet(
+ path, decryption_config, kms_connection_config, crypto_factory)
+ assert table.equals(result_table)
+
+
+def write_encrypted_parquet(path, table, encryption_config,
+ kms_connection_config, crypto_factory):
+ file_encryption_properties = crypto_factory.file_encryption_properties(
+ kms_connection_config, encryption_config)
+ assert(file_encryption_properties is not None)
+ with pq.ParquetWriter(
+ path, table.schema,
+ encryption_properties=file_encryption_properties) as writer:
+ writer.write_table(table)
+
+
+def read_encrypted_parquet(path, decryption_config,
+ kms_connection_config, crypto_factory):
+ file_decryption_properties = crypto_factory.file_decryption_properties(
+ kms_connection_config, decryption_config)
+ assert(file_decryption_properties is not None)
+ meta = pq.read_metadata(
+ path, decryption_properties=file_decryption_properties)
+ assert(meta.num_columns == 3)
+ schema = pq.read_schema(
+ path, decryption_properties=file_decryption_properties)
+ assert(len(schema.names) == 3)
+
+ result = pq.ParquetFile(
+ path, decryption_properties=file_decryption_properties)
+ return result.read(use_threads=False)
+
+
[email protected]
+def test_encrypted_parquet_write_read_wrong_key(tempdir):
+ """Write an encrypted parquet, verify it's encrypted,
+ and then read it using wrong keys."""
+ path = tempdir / PARQUET_NAME
+ table = DATA_TABLE
+
+ # Encrypt the footer with the footer key,
+ # encrypt column `a` and column `b` with another key,
+ # keep `c` plaintext
+ encryption_config = pq.EncryptionConfiguration(
+ footer_key=FOOTER_KEY_NAME,
+ column_keys={
+ COL_KEY_NAME: ["a", "b"],
+ },
+ encryption_algorithm="AES_GCM_V1",
+ cache_lifetime=timedelta(minutes=5.0),
+ data_key_length_bits=256)
+
+ kms_connection_config = pq.KmsConnectionConfig(
+ custom_kms_conf={
+ FOOTER_KEY_NAME: FOOTER_KEY.decode("UTF-8"),
+ COL_KEY_NAME: COL_KEY.decode("UTF-8"),
+ }
+ )
+
+ def kms_factory(kms_connection_configuration):
+ return InMemoryKmsClient(kms_connection_configuration)
+
+ crypto_factory = pq.CryptoFactory(kms_factory)
+ # Write with encryption properties
+ write_encrypted_parquet(path, table, encryption_config,
+ kms_connection_config, crypto_factory)
+ verify_file_encrypted(path)
+
+ # Read with decryption properties
+ wrong_kms_connection_config = pq.KmsConnectionConfig(
+ custom_kms_conf={
+ # Wrong keys - mixup in names
+ FOOTER_KEY_NAME: COL_KEY.decode("UTF-8"),
+ COL_KEY_NAME: FOOTER_KEY.decode("UTF-8"),
+ }
+ )
+ decryption_config = pq.DecryptionConfiguration(
+ cache_lifetime=timedelta(minutes=5.0))
+ with pytest.raises(InvalidToken):
Review comment:
When you use `pytest.raises`, please enclose only the statement that is
supposed to raise. It makes the test more robust against other unexpected
errors.
##########
File path: python/pyarrow/tests/parquet/test_parquet_encryption.py
##########
@@ -0,0 +1,388 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import pyarrow as pa
+import pytest
+
+try:
+ import pyarrow.parquet as pq
+except ImportError:
+ pq = None
+
+import base64
+from cryptography.fernet import Fernet
+from cryptography.fernet import InvalidToken
+from collections import OrderedDict
+from datetime import timedelta
+
+DATA_TABLE = pa.Table.from_pydict(
+ OrderedDict([
+ ('a', pa.array([1, 2, 3])),
+ ('b', pa.array(['a', 'b', 'c'])),
+ ('c', pa.array(['x', 'y', 'z']))
+ ])
+)
+PARQUET_NAME = 'encrypted_table.in_mem.parquet'
+FOOTER_KEY = Fernet.generate_key()
+FOOTER_KEY_NAME = "footer_key"
+COL_KEY = Fernet.generate_key()
+COL_KEY_NAME = "col_key"
+BASIC_ENCRYPTION_CONFIG = pq.EncryptionConfiguration(
+ footer_key=FOOTER_KEY_NAME,
+ column_keys={
+ COL_KEY_NAME: ["a", "b"],
+ },
+)
+
+
+class InMemoryKmsClient(pq.KmsClient):
+ """This is a mock class implementation of KmsClient, built for testing
only.
+ """
+
+ def __init__(self, config):
+ """Create an InMemoryKmsClient instance."""
+ pq.KmsClient.__init__(self)
+ self.master_keys_map = config.custom_kms_conf
+
+ def wrap_key(self, key_bytes, master_key_identifier):
+ """Wrap key key_bytes with key identified by master_key_identifier.
+ The result contains nonce concatenated before the encrypted key."""
+ master_key = self.master_keys_map[master_key_identifier]
+ # Create a cipher object to encrypt data
+ cipher = Fernet(master_key.encode('utf-8'))
+ encrypted_key = cipher.encrypt(key_bytes)
+ result = base64.b64encode(encrypted_key)
+ return result
+
+ def unwrap_key(self, wrapped_key, master_key_identifier):
+ """Unwrap wrapped_key with key identified by master_key_identifier"""
+ master_key = self.master_keys_map[master_key_identifier]
+ decoded_wrapped_key = base64.b64decode(wrapped_key)
+ # Create a cipher object to decrypt data
+ cipher = Fernet(master_key.encode('utf-8'))
+ decrypted_key = cipher.decrypt(decoded_wrapped_key)
+ return decrypted_key
+
+
+def verify_file_encrypted(path):
+ """Verify that the file is encrypted by looking at its first 4 bytes.
+ If it's the magic string PARE
+ then this is a parquet with encrypted footer."""
+ with open(path, "rb") as file:
+ magic_str = file.read(4)
+ # Verify magic string for parquet with encrypted footer is PARE
+ assert(magic_str == b'PARE')
+
+
[email protected]
+def test_encrypted_parquet_write_read(tempdir):
+ """Write an encrypted parquet, verify it's encrypted, and then read it."""
+ path = tempdir / PARQUET_NAME
+ table = DATA_TABLE
+
+ # Encrypt the footer with the footer key,
+ # encrypt column `a` and column `b` with another key,
+ # keep `c` plaintext
+ encryption_config = pq.EncryptionConfiguration(
+ footer_key=FOOTER_KEY_NAME,
+ column_keys={
+ COL_KEY_NAME: ["a", "b"],
+ },
+ encryption_algorithm="AES_GCM_V1",
+ cache_lifetime=timedelta(minutes=5.0),
+ data_key_length_bits=256)
+
+ kms_connection_config = pq.KmsConnectionConfig(
+ custom_kms_conf={
+ FOOTER_KEY_NAME: FOOTER_KEY.decode("UTF-8"),
+ COL_KEY_NAME: COL_KEY.decode("UTF-8"),
+ }
+ )
+
+ def kms_factory(kms_connection_configuration):
+ return InMemoryKmsClient(kms_connection_configuration)
+
+ crypto_factory = pq.CryptoFactory(kms_factory)
+ # Write with encryption properties
+ write_encrypted_parquet(path, table, encryption_config,
+ kms_connection_config, crypto_factory)
+ verify_file_encrypted(path)
+
+ # Read with decryption properties
+ decryption_config = pq.DecryptionConfiguration(
+ cache_lifetime=timedelta(minutes=5.0))
+ result_table = read_encrypted_parquet(
+ path, decryption_config, kms_connection_config, crypto_factory)
+ assert table.equals(result_table)
+
+
+def write_encrypted_parquet(path, table, encryption_config,
+ kms_connection_config, crypto_factory):
+ file_encryption_properties = crypto_factory.file_encryption_properties(
+ kms_connection_config, encryption_config)
+ assert(file_encryption_properties is not None)
+ with pq.ParquetWriter(
+ path, table.schema,
+ encryption_properties=file_encryption_properties) as writer:
+ writer.write_table(table)
+
+
+def read_encrypted_parquet(path, decryption_config,
+ kms_connection_config, crypto_factory):
+ file_decryption_properties = crypto_factory.file_decryption_properties(
+ kms_connection_config, decryption_config)
+ assert(file_decryption_properties is not None)
+ meta = pq.read_metadata(
+ path, decryption_properties=file_decryption_properties)
+ assert(meta.num_columns == 3)
+ schema = pq.read_schema(
+ path, decryption_properties=file_decryption_properties)
+ assert(len(schema.names) == 3)
+
+ result = pq.ParquetFile(
+ path, decryption_properties=file_decryption_properties)
+ return result.read(use_threads=False)
+
+
[email protected]
+def test_encrypted_parquet_write_read_wrong_key(tempdir):
+ """Write an encrypted parquet, verify it's encrypted,
+ and then read it using wrong keys."""
+ path = tempdir / PARQUET_NAME
+ table = DATA_TABLE
+
+ # Encrypt the footer with the footer key,
+ # encrypt column `a` and column `b` with another key,
+ # keep `c` plaintext
+ encryption_config = pq.EncryptionConfiguration(
+ footer_key=FOOTER_KEY_NAME,
+ column_keys={
+ COL_KEY_NAME: ["a", "b"],
+ },
+ encryption_algorithm="AES_GCM_V1",
+ cache_lifetime=timedelta(minutes=5.0),
+ data_key_length_bits=256)
+
+ kms_connection_config = pq.KmsConnectionConfig(
+ custom_kms_conf={
+ FOOTER_KEY_NAME: FOOTER_KEY.decode("UTF-8"),
+ COL_KEY_NAME: COL_KEY.decode("UTF-8"),
+ }
+ )
+
+ def kms_factory(kms_connection_configuration):
+ return InMemoryKmsClient(kms_connection_configuration)
+
+ crypto_factory = pq.CryptoFactory(kms_factory)
+ # Write with encryption properties
+ write_encrypted_parquet(path, table, encryption_config,
+ kms_connection_config, crypto_factory)
+ verify_file_encrypted(path)
+
+ # Read with decryption properties
+ wrong_kms_connection_config = pq.KmsConnectionConfig(
+ custom_kms_conf={
+ # Wrong keys - mixup in names
+ FOOTER_KEY_NAME: COL_KEY.decode("UTF-8"),
+ COL_KEY_NAME: FOOTER_KEY.decode("UTF-8"),
+ }
+ )
+ decryption_config = pq.DecryptionConfiguration(
+ cache_lifetime=timedelta(minutes=5.0))
+ with pytest.raises(InvalidToken):
+ result_table = read_encrypted_parquet(
+ path, decryption_config, wrong_kms_connection_config,
+ crypto_factory)
+ assert(result_table is not None)
Review comment:
This `assert` is never reached.
##########
File path: python/pyarrow/_parquet.pyx
##########
@@ -1192,7 +1197,8 @@ cdef shared_ptr[WriterProperties]
_create_writer_properties(
data_page_size=None,
compression_level=None,
use_byte_stream_split=False,
- data_page_version=None) except *:
+ data_page_version=None,
+ encryption_properties=None) except *:
Review comment:
If you write `FileEncryptionProperties encryption_properties=None`, you
can probably avoid the explicit type check below.
##########
File path: ci/scripts/python_wheel_windows_build.bat
##########
@@ -66,6 +67,7 @@ cmake ^
-DARROW_ORC=%ARROW_ORC% ^
-DARROW_PACKAGE_KIND="python-wheel-windows" ^
-DARROW_PARQUET=%ARROW_PARQUET% ^
+ -DPARQUET_REQUIRE_ENCRYPTION=%PARQUET_REQUIRE_ENCRYPTION% ^
Review comment:
Same question...
##########
File path: python/pyarrow/_parquet.pyx
##########
@@ -23,6 +23,7 @@ from textwrap import indent
import warnings
import numpy as np
+from datetime import timedelta
Review comment:
Same here: can you try to keep imports ordered?
##########
File path: python/pyarrow/_parquet.pyx
##########
@@ -1452,3 +1522,415 @@ cdef class ParquetWriter(_Weakrefable):
return result
raise RuntimeError(
'file metadata is only available after writer close')
+
+cdef class EncryptionConfiguration(_Weakrefable):
+ cdef:
+ shared_ptr[CEncryptionConfiguration] configuration
+
+ # Avoid mistakingly creating attributes
+ __slots__ = ()
+
+ def __init__(self, footer_key, *, column_keys=None,
+ uniform_encryption=None, encryption_algorithm=None,
+ plaintext_footer=None, double_wrapping=None,
+ cache_lifetime=None, internal_key_material=None,
+ data_key_length_bits=None):
+ self.configuration.reset(
+ new CEncryptionConfiguration(tobytes(footer_key)))
+ if column_keys is not None:
+ self.column_keys = column_keys
+ if uniform_encryption is not None:
+ self.uniform_encryption = uniform_encryption
+ if encryption_algorithm is not None:
+ self.encryption_algorithm = encryption_algorithm
+ if plaintext_footer is not None:
+ self.plaintext_footer = plaintext_footer
+ if double_wrapping is not None:
+ self.double_wrapping = double_wrapping
+ if cache_lifetime is not None:
+ self.cache_lifetime = cache_lifetime
+ if internal_key_material is not None:
+ self.internal_key_material = internal_key_material
+ if data_key_length_bits is not None:
+ self.data_key_length_bits = data_key_length_bits
+
+ @property
+ def footer_key(self):
+ """ID of the master key for footer encryption/signing"""
+ return frombytes(self.configuration.get().footer_key)
+
+ @property
+ def column_keys(self):
+ """
+ List of columns to encrypt, with master key IDs (see HIVE-21848).
+ Format: "masterKeyID:colName,colName;masterKeyID:colName..."
+ """
+ column_keys_str = frombytes(self.configuration.get().column_keys)
+ column_keys_to_key_list_str = dict(subString.split(
+ ":") for subString in column_keys_str.split(";"))
+ column_keys_dict = {k: v.split(
+ ",") for k, v in column_keys_to_key_list_str.items()}
+ return column_keys_dict
+
+ @column_keys.setter
+ def column_keys(self, dict value):
+ if value is not None:
+ # convert a dictionary such as
+ # '{"footer": ["b ", "d"], "a": ["a ", "f"]}''
+ # to the string defined by the spec 'footer: b , d; a: a , f'
+ column_keys = "; ".join(
+ ["{}: {}".format(k, ", ".join(v)) for k, v in value.items()])
+ self.configuration.get().column_keys = tobytes(column_keys)
+
+ @property
+ def uniform_encryption(self):
+ """Encrypt footer and all columns with the same encryption key."""
+ return self.configuration.get().uniform_encryption
+
+ @uniform_encryption.setter
+ def uniform_encryption(self, value):
+ self.configuration.get().uniform_encryption = value
+
+ @property
+ def encryption_algorithm(self):
+ """Parquet encryption algorithm.
+ Can be "AES_GCM_V1" (default), or "AES_GCM_CTR_V1"."""
+ return cipher_to_name(self.configuration.get().encryption_algorithm)
+
+ @encryption_algorithm.setter
+ def encryption_algorithm(self, value):
+ cipher = cipher_from_name(value)
+ self.configuration.get().encryption_algorithm = cipher
+
+ @property
+ def plaintext_footer(self):
+ """Write files with plaintext footer."""
+ return self.configuration.get().plaintext_footer
+
+ @plaintext_footer.setter
+ def plaintext_footer(self, value):
+ self.configuration.get().plaintext_footer = value
+
+ @property
+ def double_wrapping(self):
+ """Use double wrapping - where data encryption keys (DEKs) are
+ encrypted with key encryption keys (KEKs), which in turn are
+ encrypted with master keys.
+ If set to false, use single wrapping - where DEKs are
+ encrypted directly with master keys."""
+ return self.configuration.get().double_wrapping
+
+ @double_wrapping.setter
+ def double_wrapping(self, value):
+ self.configuration.get().double_wrapping = value
+
+ @property
+ def cache_lifetime(self):
+ """Lifetime of cached entities (key encryption keys,
+ local wrapping keys, KMS client objects)."""
+ return timedelta(
+ seconds=self.configuration.get().cache_lifetime_seconds)
+
+ @cache_lifetime.setter
+ def cache_lifetime(self, value):
+ if not isinstance(value, timedelta):
+ raise TypeError("cache_lifetime should be a timedelta")
+ self.configuration.get().cache_lifetime_seconds = value.total_seconds()
+
+ @property
+ def internal_key_material(self):
+ """Store key material inside Parquet file footers; this mode doesn’t
+ produce additional files. If set to false, key material is stored in
+ separate files in the same folder, which enables key rotation for
+ immutable Parquet files."""
+ return self.configuration.get().internal_key_material
+
+ @internal_key_material.setter
+ def internal_key_material(self, value):
+ self.configuration.get().internal_key_material = value
+
+ @property
+ def data_key_length_bits(self):
+ """Length of data encryption keys (DEKs), randomly generated by
parquet key
+ management tools. Can be 128, 192 or 256 bits."""
+ return self.configuration.get().data_key_length_bits
+
+ @data_key_length_bits.setter
+ def data_key_length_bits(self, value):
+ self.configuration.get().data_key_length_bits = value
+
+ cdef inline shared_ptr[CEncryptionConfiguration] unwrap(self) nogil:
+ return self.configuration
+
+cdef class DecryptionConfiguration(_Weakrefable):
+ cdef:
+ shared_ptr[CDecryptionConfiguration] configuration
+
+ # Avoid mistakingly creating attributes
+ __slots__ = ()
+
+ def __init__(self, *, cache_lifetime=None):
+ self.configuration.reset(new CDecryptionConfiguration())
+
+ @property
+ def cache_lifetime(self):
+ """Lifetime of cached entities (key encryption keys,
+ local wrapping keys, KMS client objects)."""
+ return timedelta(
+ seconds=self.configuration.get().cache_lifetime_seconds)
+
+ @cache_lifetime.setter
+ def cache_lifetime(self, value):
+ self.configuration.get().cache_lifetime_seconds = value.total_seconds()
+
+ cdef inline shared_ptr[CDecryptionConfiguration] unwrap(self) nogil:
+ return self.configuration
+
+
+cdef class KmsConnectionConfig(_Weakrefable):
+ cdef:
+ shared_ptr[CKmsConnectionConfig] configuration
+
+ # Avoid mistakingly creating attributes
+ __slots__ = ()
+
+ def __init__(self, *, kms_instance_id=None, kms_instance_url=None,
+ key_access_token=None, custom_kms_conf=None):
+ self.configuration.reset(new CKmsConnectionConfig())
+ if kms_instance_id is not None:
+ self.kms_instance_id = kms_instance_id
+ if kms_instance_url is not None:
+ self.kms_instance_url = kms_instance_url
+ if key_access_token is None:
+ self.key_access_token = b'DEFAULT'
+ else:
+ self.key_access_token = key_access_token
+ if custom_kms_conf is not None:
+ self.custom_kms_conf = custom_kms_conf
+
+ @property
+ def kms_instance_id(self):
+ """ID of the KMS instance that will be used for encryption
+ (if multiple KMS instances are available)."""
+ return frombytes(self.configuration.get().kms_instance_id)
+
+ @kms_instance_id.setter
+ def kms_instance_id(self, value):
+ self.configuration.get().kms_instance_id = tobytes(value)
+
+ @property
+ def kms_instance_url(self):
+ """URL of the KMS instance."""
+ return frombytes(self.configuration.get().kms_instance_url)
+
+ @kms_instance_url.setter
+ def kms_instance_url(self, value):
+ self.configuration.get().kms_instance_url = tobytes(value)
+
+ @property
+ def key_access_token(self):
+ """Authorization token that will be passed to KMS."""
+ return frombytes(self.configuration.get()
+ .refreshable_key_access_token.get().value())
+
+ @key_access_token.setter
+ def key_access_token(self, value):
+ self.refresh_key_access_token(value)
+
+ @property
+ def custom_kms_conf(self):
+ """A dictionary with KMS-type-specific configuration"""
+ custom_kms_conf = {
+ frombytes(k): frombytes(v)
+ for k, v in self.configuration.get().custom_kms_conf
+ }
+ return custom_kms_conf
+
+ @custom_kms_conf.setter
+ def custom_kms_conf(self, dict value):
+ if value is not None:
+ for k, v in value.items():
+ if isinstance(k, str) and isinstance(v, str):
+ self.configuration.get().custom_kms_conf[tobytes(k)] = \
+ tobytes(v)
+ else:
+ raise TypeError("Expected custom_kms_conf to be " +
+ "a dictionary of strings")
+
+ def refresh_key_access_token(self, value):
+ cdef:
+ shared_ptr[CKeyAccessToken] c_key_access_token = \
+ self.configuration.get().refreshable_key_access_token
+
+ c_key_access_token.get().Refresh(tobytes(value))
+
+ cdef inline shared_ptr[CKmsConnectionConfig] unwrap(self) nogil:
+ return self.configuration
+
+ @staticmethod
+ cdef wrap(const CKmsConnectionConfig& config):
+ result = KmsConnectionConfig()
+ result.configuration = make_shared[CKmsConnectionConfig](move(config))
+ return result
+
+# Callback definitions for CPyKmsClientVtable
+cdef void _cb_wrap_key(
+ handler, const c_string& key_bytes,
+ const c_string& master_key_identifier, c_string* out) except *:
+ mkid_str = frombytes(master_key_identifier)
+ wrapped_key = handler.wrap_key(key_bytes, mkid_str)
+ out[0] = tobytes(wrapped_key)
+
+cdef void _cb_unwrap_key(
+ handler, const c_string& wrapped_key,
+ const c_string& master_key_identifier, c_string* out) except *:
+ mkid_str = frombytes(master_key_identifier)
+ wk_str = frombytes(wrapped_key)
+ key = handler.unwrap_key(wk_str, mkid_str)
+ out[0] = tobytes(key)
+
+cdef class KmsClient(_Weakrefable):
+ """The abstract base class for KmsClient implementations."""
+ cdef:
+ shared_ptr[CKmsClient] client
+
+ def __init__(self):
+ self.init()
+
+ cdef init(self):
+ cdef:
+ CPyKmsClientVtable vtable = CPyKmsClientVtable()
+
+ vtable.wrap_key = _cb_wrap_key
+ vtable.unwrap_key = _cb_unwrap_key
+
+ self.client.reset(new CPyKmsClient(self, vtable))
+
+ def wrap_key(self, key_bytes, master_key_identifier):
+ raise NotImplementedError()
+
+ def unwrap_key(self, wrapped_key, master_key_identifier):
+ raise NotImplementedError()
+
+ cdef inline shared_ptr[CKmsClient] unwrap(self) nogil:
+ return self.client
+
+
+# Callback definition for CPyKmsClientFactoryVtable
+cdef void _cb_create_kms_client(
+ handler,
+ const CKmsConnectionConfig& kms_connection_config,
+ shared_ptr[CKmsClient]* out) except *:
+ connection_config = KmsConnectionConfig.wrap(kms_connection_config)
+
+ result = handler(connection_config)
+ if not isinstance(result, KmsClient):
+ raise TypeError(
+ "callable must return KmsClient instances, but got {}".format(
+ type(result)))
+
+ out[0] = (<KmsClient> result).unwrap()
+
+cdef class CryptoFactory(_Weakrefable):
+ """ A factory that produces the low-level FileEncryptionProperties and
+ FileDecryptionProperties objects, from the high-level parameters."""
+ cdef:
+ unique_ptr[CCryptoFactory] factory
+
+ # Avoid mistakingly creating attributes
+ __slots__ = ()
+
+ def __init__(self, kms_client_factory):
+ """Create CryptoFactory.
+
+ Parameters
+ ----------
+ kms_client_factory : a callable that accepts KmsConnectionConfig
+ and returns a KmsClient
+ """
+ self.factory.reset(new CCryptoFactory())
+
+ if callable(kms_client_factory):
+ self.init(kms_client_factory)
+ else:
+ raise TypeError("Parameter kms_client_factory must be a callable")
+
+ cdef init(self, callable_client_factory):
+ cdef:
+ CPyKmsClientFactoryVtable vtable
+ shared_ptr[CPyKmsClientFactory] kms_client_factory
+
+ vtable.create_kms_client = _cb_create_kms_client
+ kms_client_factory.reset(
+ new CPyKmsClientFactory(callable_client_factory, vtable))
+ # A KmsClientFactory object must be registered
+ # via this method before calling any of
+ # file_encryption_properties()/file_decryption_properties() methods.
+ self.factory.get().RegisterKmsClientFactory(
+ static_pointer_cast[CKmsClientFactory, CPyKmsClientFactory](
+ kms_client_factory))
+
+ def file_encryption_properties(self,
+ KmsConnectionConfig kms_connection_config,
+ EncryptionConfiguration encryption_config):
+ """Create file encryption properties.
+
+ Parameters
+ ----------
+ kms_connection_config : KmsConnectionConfig
+ Configuration of connection to KMS
+
+ encryption_config : EncryptionConfiguration
+ Configuration of the encryption, such as which columns to encrypt
+
+ Returns
+ -------
+ file_encryption_properties : FileEncryptionProperties or None
Review comment:
Why can it return None?
##########
File path: python/examples/minimal_build/build_venv.sh
##########
@@ -54,6 +54,7 @@ cmake -GNinja \
-DARROW_WITH_SNAPPY=ON \
-DARROW_WITH_BROTLI=ON \
-DARROW_PARQUET=ON \
+ -DPARQUET_REQUIRE_ENCRYPTION=ON \
Review comment:
This is a minimal build, I don't think we want to add this.
##########
File path: python/pyarrow/parquet.py
##########
@@ -2215,7 +2245,7 @@ def write_metadata(schema, where,
metadata_collector=None, **kwargs):
metadata.write_metadata_file(where)
-def read_metadata(where, memory_map=False):
+def read_metadata(where, memory_map=False, decryption_properties=None):
Review comment:
I don't this it's useful to change this simple wrapper function. People
can probably call `ParquetFile` directly.
##########
File path: python/examples/parquet_encryption/sample_vault_kms_client.py
##########
@@ -0,0 +1,164 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""A sample KmsClient implementation."""
+
+import pyarrow as pa
+
+try:
+ import pyarrow.parquet as pq
+except ImportError:
+ pq = None
+
+import requests
+import base64
+from typing import OrderedDict
Review comment:
Why are you importing the `typing` module? Do you mean `from collections
import OrderedDict`?
##########
File path: python/pyarrow/_parquet.pxd
##########
@@ -555,3 +586,110 @@ cdef class Statistics(_Weakrefable):
ColumnChunkMetaData parent):
self.statistics = statistics
self.parent = parent
+
+cdef extern from "parquet/exception.h" namespace "parquet" nogil:
+ cdef cppclass ParquetException:
+ pass
Review comment:
Hmm, why is this exposed? Cython shouldn't have to deal with Parquet
exceptions, only with Arrow `Status` values.
##########
File path: python/examples/parquet_encryption/sample_vault_kms_client.py
##########
@@ -0,0 +1,164 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""A sample KmsClient implementation."""
+
+import pyarrow as pa
+
+try:
+ import pyarrow.parquet as pq
+except ImportError:
+ pq = None
+
+import requests
+import base64
+from typing import OrderedDict
+import os
+import argparse
Review comment:
Can you try to keep Python imports ordered? Usually:
- first, stdlib imports
- second, third-party imports
- third, PyArrow imports
(also, each batch of imports should be kept lexicographically-ordered as
well, for readability)
##########
File path: python/examples/parquet_encryption/sample_vault_kms_client.py
##########
@@ -0,0 +1,164 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""A sample KmsClient implementation."""
+
+import pyarrow as pa
+
+try:
+ import pyarrow.parquet as pq
+except ImportError:
+ pq = None
+
+import requests
+import base64
+from typing import OrderedDict
+import os
+import argparse
+
+
+class VaultClient(pq.KmsClient):
+ """An example of a KmsClient implementation with master keys
+ managed by Hashicorp Vault KMS.
+ Not for production use!
+ """
+ JSON_MEDIA_TYPE = "application/json; charset=utf-8"
+ DEFAULT_TRANSIT_ENGINE = "/v1/transit/"
+ WRAP_ENDPOINT = "encrypt/"
+ UNWRAP_ENDPOINT = "decrypt/"
+ TOKEN_HEADER = "X-Vault-Token"
+
+ def __init__(self, kms_connection_config):
+ """Create a VaultClient instance.
+
+ Parameters
+ ----------
+ kms_connection_config : KmsConnectionConfig
+ configuration parameters to connect to vault,
+ e.g. URL and access token
+ """
+ pq.KmsClient.__init__(self)
+ self.kms_url = kms_connection_config.kms_instance_url + \
+ VaultClient.DEFAULT_TRANSIT_ENGINE
+ self.kms_connection_config = kms_connection_config
+
+ def wrap_key(self, key_bytes, master_key_identifier):
+ """Call Vault to wrap key key_bytes with key
+ identified by master_key_identifier."""
+ endpoint = self.kms_url + VaultClient.WRAP_ENDPOINT
+ headers = {VaultClient.TOKEN_HEADER:
+ self.kms_connection_config.key_access_token}
+ r = requests.post(endpoint + master_key_identifier,
+ headers=headers,
+ data={'plaintext': base64.b64encode(key_bytes)})
+ r.raise_for_status()
+ r_dict = r.json()
+ wrapped_key = r_dict['data']['ciphertext']
+ return wrapped_key
+
+ def unwrap_key(self, wrapped_key, master_key_identifier):
+ """Call Vault to unwrap wrapped_key with key
+ identified by master_key_identifier"""
+ endpoint = self.kms_url + VaultClient.UNWRAP_ENDPOINT
+ headers = {VaultClient.TOKEN_HEADER:
+ self.kms_connection_config.key_access_token}
+ r = requests.post(endpoint + master_key_identifier,
+ headers=headers,
+ data={'ciphertext': wrapped_key})
+ r.raise_for_status()
+ r_dict = r.json()
+ plaintext = r_dict['data']['plaintext']
+ key_bytes = base64.b64decode(plaintext)
+ return key_bytes
+
+
+def parquet_write_read_with_vault(parquet_filename):
+ """An example for writing an encrypted parquet and reading an
+ encrypted parquet using master keys managed by Hashicorp Vault KMS.
+ Note that for this implementation requests dependency is needed
+ and environment properties VAULT_URL and VAULT_TOKEN should be set.
+ Please enable the transit engine.
+ """
+ path = parquet_filename
+
+ table = pa.Table.from_pydict(
+ OrderedDict([
+ ('a', pa.array([1, 2, 3])),
+ ('b', pa.array(['a', 'b', 'c'])),
+ ('c', pa.array(['x', 'y', 'z']))
+ ])
+ )
+
+ # Encrypt the footer with the footer key,
+ # encrypt column `a` with one key
+ # and column `b` with another key,
+ # keep `c` plaintext
+ footer_key_name = "footer_key"
+ col_a_key_name = "col_a_key"
+ col_b_key_name = "col_b_key"
+
+ encryption_config = pq.EncryptionConfiguration(
+ footer_key=footer_key_name,
+ column_keys={
+ col_a_key_name: ["a"],
+ col_b_key_name: ["b"],
+ })
+
+ kms_connection_config = pq.KmsConnectionConfig(
+ kms_instance_url=os.environ.get('VAULT_URL', ''),
+ key_access_token=os.environ.get('VAULT_TOKEN', ''),
+ )
+
+ def kms_factory(kms_connection_configuration):
+ return VaultClient(kms_connection_configuration)
+
+ # Write with encryption properties
+ crypto_factory = pq.CryptoFactory(kms_factory)
+ file_encryption_properties = crypto_factory.file_encryption_properties(
+ kms_connection_config, encryption_config)
+ with pq.ParquetWriter(path,
+ table.schema,
+ encryption_properties=file_encryption_properties) \
+ as writer:
+ writer.write_table(table)
+
+ # Read with decryption properties
+ file_decryption_properties = crypto_factory.file_decryption_properties(
+ kms_connection_config)
+ result = pq.ParquetFile(
+ path, decryption_properties=file_decryption_properties)
+ result_table = result.read()
+ assert table.equals(result_table)
+
+
+def main():
+ parser = argparse.ArgumentParser(
+ description="Write and read an encrypted parquet using master keys " /
Review comment:
Nit, but you don't need to end lines with a `\` when you're inside a
parenthesis.
##########
File path: python/pyarrow/parquet.py
##########
@@ -2224,15 +2254,18 @@ def read_metadata(where, memory_map=False):
where : str (filepath) or file-like object
memory_map : bool, default False
Create memory map when the source is a file path.
+ decryption_properties : FileDecryptionProperties, default None
+ Decryption properties for reading encrypted Parquet files.
Returns
-------
metadata : FileMetadata
"""
- return ParquetFile(where, memory_map=memory_map).metadata
+ return ParquetFile(where, memory_map=memory_map,
+ decryption_properties=decryption_properties).metadata
-def read_schema(where, memory_map=False):
+def read_schema(where, memory_map=False, decryption_properties=None):
Review comment:
Same here.
##########
File path: python/pyarrow/_parquet.pyx
##########
@@ -1452,3 +1522,415 @@ cdef class ParquetWriter(_Weakrefable):
return result
raise RuntimeError(
'file metadata is only available after writer close')
+
+cdef class EncryptionConfiguration(_Weakrefable):
+ cdef:
+ shared_ptr[CEncryptionConfiguration] configuration
+
+ # Avoid mistakingly creating attributes
+ __slots__ = ()
+
+ def __init__(self, footer_key, *, column_keys=None,
+ uniform_encryption=None, encryption_algorithm=None,
+ plaintext_footer=None, double_wrapping=None,
+ cache_lifetime=None, internal_key_material=None,
+ data_key_length_bits=None):
+ self.configuration.reset(
+ new CEncryptionConfiguration(tobytes(footer_key)))
+ if column_keys is not None:
+ self.column_keys = column_keys
+ if uniform_encryption is not None:
+ self.uniform_encryption = uniform_encryption
+ if encryption_algorithm is not None:
+ self.encryption_algorithm = encryption_algorithm
+ if plaintext_footer is not None:
+ self.plaintext_footer = plaintext_footer
+ if double_wrapping is not None:
+ self.double_wrapping = double_wrapping
+ if cache_lifetime is not None:
+ self.cache_lifetime = cache_lifetime
+ if internal_key_material is not None:
+ self.internal_key_material = internal_key_material
+ if data_key_length_bits is not None:
+ self.data_key_length_bits = data_key_length_bits
+
+ @property
+ def footer_key(self):
+ """ID of the master key for footer encryption/signing"""
+ return frombytes(self.configuration.get().footer_key)
+
+ @property
+ def column_keys(self):
+ """
+ List of columns to encrypt, with master key IDs (see HIVE-21848).
+ Format: "masterKeyID:colName,colName;masterKeyID:colName..."
+ """
+ column_keys_str = frombytes(self.configuration.get().column_keys)
+ column_keys_to_key_list_str = dict(subString.split(
+ ":") for subString in column_keys_str.split(";"))
+ column_keys_dict = {k: v.split(
+ ",") for k, v in column_keys_to_key_list_str.items()}
+ return column_keys_dict
+
+ @column_keys.setter
+ def column_keys(self, dict value):
+ if value is not None:
+ # convert a dictionary such as
+ # '{"footer": ["b ", "d"], "a": ["a ", "f"]}''
+ # to the string defined by the spec 'footer: b , d; a: a , f'
+ column_keys = "; ".join(
+ ["{}: {}".format(k, ", ".join(v)) for k, v in value.items()])
+ self.configuration.get().column_keys = tobytes(column_keys)
+
+ @property
+ def uniform_encryption(self):
Review comment:
For the most part, there doesn't seem to be any unit tests for all these
properties. Can you add at least basic get/set tests?
##########
File path: python/pyarrow/tests/parquet/test_parquet_encryption.py
##########
@@ -0,0 +1,388 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import pyarrow as pa
+import pytest
+
+try:
+ import pyarrow.parquet as pq
+except ImportError:
+ pq = None
+
+import base64
+from cryptography.fernet import Fernet
+from cryptography.fernet import InvalidToken
+from collections import OrderedDict
+from datetime import timedelta
+
+DATA_TABLE = pa.Table.from_pydict(
+ OrderedDict([
+ ('a', pa.array([1, 2, 3])),
+ ('b', pa.array(['a', 'b', 'c'])),
+ ('c', pa.array(['x', 'y', 'z']))
+ ])
+)
+PARQUET_NAME = 'encrypted_table.in_mem.parquet'
+FOOTER_KEY = Fernet.generate_key()
+FOOTER_KEY_NAME = "footer_key"
Review comment:
It doesn't seem this constant is useful. Just use `"footer_key"`
everywhere.
##########
File path: python/pyarrow/tests/parquet/test_parquet_encryption.py
##########
@@ -0,0 +1,388 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import pyarrow as pa
+import pytest
+
+try:
+ import pyarrow.parquet as pq
+except ImportError:
+ pq = None
+
+import base64
+from cryptography.fernet import Fernet
+from cryptography.fernet import InvalidToken
+from collections import OrderedDict
+from datetime import timedelta
+
+DATA_TABLE = pa.Table.from_pydict(
+ OrderedDict([
+ ('a', pa.array([1, 2, 3])),
+ ('b', pa.array(['a', 'b', 'c'])),
+ ('c', pa.array(['x', 'y', 'z']))
+ ])
+)
+PARQUET_NAME = 'encrypted_table.in_mem.parquet'
+FOOTER_KEY = Fernet.generate_key()
+FOOTER_KEY_NAME = "footer_key"
+COL_KEY = Fernet.generate_key()
+COL_KEY_NAME = "col_key"
+BASIC_ENCRYPTION_CONFIG = pq.EncryptionConfiguration(
+ footer_key=FOOTER_KEY_NAME,
Review comment:
So the footer key has the value "footer_key"? This looks weird.
##########
File path: python/pyarrow/tests/parquet/test_parquet_encryption.py
##########
@@ -0,0 +1,388 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import pyarrow as pa
+import pytest
+
+try:
+ import pyarrow.parquet as pq
+except ImportError:
+ pq = None
+
+import base64
+from cryptography.fernet import Fernet
+from cryptography.fernet import InvalidToken
+from collections import OrderedDict
+from datetime import timedelta
+
+DATA_TABLE = pa.Table.from_pydict(
+ OrderedDict([
+ ('a', pa.array([1, 2, 3])),
+ ('b', pa.array(['a', 'b', 'c'])),
+ ('c', pa.array(['x', 'y', 'z']))
+ ])
+)
+PARQUET_NAME = 'encrypted_table.in_mem.parquet'
+FOOTER_KEY = Fernet.generate_key()
+FOOTER_KEY_NAME = "footer_key"
+COL_KEY = Fernet.generate_key()
+COL_KEY_NAME = "col_key"
+BASIC_ENCRYPTION_CONFIG = pq.EncryptionConfiguration(
+ footer_key=FOOTER_KEY_NAME,
+ column_keys={
+ COL_KEY_NAME: ["a", "b"],
+ },
+)
+
+
+class InMemoryKmsClient(pq.KmsClient):
+ """This is a mock class implementation of KmsClient, built for testing
only.
+ """
+
+ def __init__(self, config):
+ """Create an InMemoryKmsClient instance."""
+ pq.KmsClient.__init__(self)
+ self.master_keys_map = config.custom_kms_conf
+
+ def wrap_key(self, key_bytes, master_key_identifier):
+ """Wrap key key_bytes with key identified by master_key_identifier.
+ The result contains nonce concatenated before the encrypted key."""
+ master_key = self.master_keys_map[master_key_identifier]
+ # Create a cipher object to encrypt data
+ cipher = Fernet(master_key.encode('utf-8'))
+ encrypted_key = cipher.encrypt(key_bytes)
+ result = base64.b64encode(encrypted_key)
+ return result
+
+ def unwrap_key(self, wrapped_key, master_key_identifier):
+ """Unwrap wrapped_key with key identified by master_key_identifier"""
+ master_key = self.master_keys_map[master_key_identifier]
+ decoded_wrapped_key = base64.b64decode(wrapped_key)
+ # Create a cipher object to decrypt data
+ cipher = Fernet(master_key.encode('utf-8'))
+ decrypted_key = cipher.decrypt(decoded_wrapped_key)
+ return decrypted_key
+
+
+def verify_file_encrypted(path):
+ """Verify that the file is encrypted by looking at its first 4 bytes.
+ If it's the magic string PARE
+ then this is a parquet with encrypted footer."""
+ with open(path, "rb") as file:
+ magic_str = file.read(4)
+ # Verify magic string for parquet with encrypted footer is PARE
+ assert(magic_str == b'PARE')
+
+
[email protected]
+def test_encrypted_parquet_write_read(tempdir):
+ """Write an encrypted parquet, verify it's encrypted, and then read it."""
+ path = tempdir / PARQUET_NAME
+ table = DATA_TABLE
+
+ # Encrypt the footer with the footer key,
+ # encrypt column `a` and column `b` with another key,
+ # keep `c` plaintext
+ encryption_config = pq.EncryptionConfiguration(
+ footer_key=FOOTER_KEY_NAME,
+ column_keys={
+ COL_KEY_NAME: ["a", "b"],
+ },
+ encryption_algorithm="AES_GCM_V1",
+ cache_lifetime=timedelta(minutes=5.0),
+ data_key_length_bits=256)
+
+ kms_connection_config = pq.KmsConnectionConfig(
+ custom_kms_conf={
+ FOOTER_KEY_NAME: FOOTER_KEY.decode("UTF-8"),
+ COL_KEY_NAME: COL_KEY.decode("UTF-8"),
+ }
+ )
+
+ def kms_factory(kms_connection_configuration):
+ return InMemoryKmsClient(kms_connection_configuration)
+
+ crypto_factory = pq.CryptoFactory(kms_factory)
+ # Write with encryption properties
+ write_encrypted_parquet(path, table, encryption_config,
+ kms_connection_config, crypto_factory)
+ verify_file_encrypted(path)
+
+ # Read with decryption properties
+ decryption_config = pq.DecryptionConfiguration(
+ cache_lifetime=timedelta(minutes=5.0))
+ result_table = read_encrypted_parquet(
+ path, decryption_config, kms_connection_config, crypto_factory)
+ assert table.equals(result_table)
+
+
+def write_encrypted_parquet(path, table, encryption_config,
+ kms_connection_config, crypto_factory):
+ file_encryption_properties = crypto_factory.file_encryption_properties(
+ kms_connection_config, encryption_config)
+ assert(file_encryption_properties is not None)
+ with pq.ParquetWriter(
+ path, table.schema,
+ encryption_properties=file_encryption_properties) as writer:
+ writer.write_table(table)
+
+
+def read_encrypted_parquet(path, decryption_config,
+ kms_connection_config, crypto_factory):
+ file_decryption_properties = crypto_factory.file_decryption_properties(
+ kms_connection_config, decryption_config)
+ assert(file_decryption_properties is not None)
+ meta = pq.read_metadata(
+ path, decryption_properties=file_decryption_properties)
+ assert(meta.num_columns == 3)
+ schema = pq.read_schema(
+ path, decryption_properties=file_decryption_properties)
+ assert(len(schema.names) == 3)
+
+ result = pq.ParquetFile(
+ path, decryption_properties=file_decryption_properties)
+ return result.read(use_threads=False)
Review comment:
Is `use_threads=False` deliberate? Is there a problem when using
multiple threads?
##########
File path: python/pyarrow/tests/parquet/test_parquet_encryption.py
##########
@@ -0,0 +1,388 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import pyarrow as pa
+import pytest
+
+try:
+ import pyarrow.parquet as pq
+except ImportError:
+ pq = None
+
+import base64
+from cryptography.fernet import Fernet
+from cryptography.fernet import InvalidToken
+from collections import OrderedDict
+from datetime import timedelta
+
+DATA_TABLE = pa.Table.from_pydict(
+ OrderedDict([
+ ('a', pa.array([1, 2, 3])),
+ ('b', pa.array(['a', 'b', 'c'])),
+ ('c', pa.array(['x', 'y', 'z']))
+ ])
+)
+PARQUET_NAME = 'encrypted_table.in_mem.parquet'
+FOOTER_KEY = Fernet.generate_key()
+FOOTER_KEY_NAME = "footer_key"
+COL_KEY = Fernet.generate_key()
+COL_KEY_NAME = "col_key"
+BASIC_ENCRYPTION_CONFIG = pq.EncryptionConfiguration(
+ footer_key=FOOTER_KEY_NAME,
+ column_keys={
+ COL_KEY_NAME: ["a", "b"],
+ },
+)
+
+
+class InMemoryKmsClient(pq.KmsClient):
+ """This is a mock class implementation of KmsClient, built for testing
only.
+ """
+
+ def __init__(self, config):
+ """Create an InMemoryKmsClient instance."""
+ pq.KmsClient.__init__(self)
+ self.master_keys_map = config.custom_kms_conf
+
+ def wrap_key(self, key_bytes, master_key_identifier):
+ """Wrap key key_bytes with key identified by master_key_identifier.
+ The result contains nonce concatenated before the encrypted key."""
+ master_key = self.master_keys_map[master_key_identifier]
+ # Create a cipher object to encrypt data
+ cipher = Fernet(master_key.encode('utf-8'))
Review comment:
Since this is a basic test class, what does it bring to use
`cryptography` here? You can just fake any "cipher".
##########
File path: ci/conda_env_python.txt
##########
@@ -30,3 +30,4 @@ pytz
s3fs>=0.4
setuptools
setuptools_scm
+cryptography
Review comment:
Can you keep this file lexicographically-ordered?
##########
File path: python/pyarrow/tests/parquet/test_parquet_encryption.py
##########
@@ -0,0 +1,388 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import pyarrow as pa
+import pytest
+
+try:
+ import pyarrow.parquet as pq
+except ImportError:
+ pq = None
+
+import base64
+from cryptography.fernet import Fernet
+from cryptography.fernet import InvalidToken
+from collections import OrderedDict
+from datetime import timedelta
+
+DATA_TABLE = pa.Table.from_pydict(
+ OrderedDict([
+ ('a', pa.array([1, 2, 3])),
+ ('b', pa.array(['a', 'b', 'c'])),
+ ('c', pa.array(['x', 'y', 'z']))
+ ])
+)
+PARQUET_NAME = 'encrypted_table.in_mem.parquet'
+FOOTER_KEY = Fernet.generate_key()
+FOOTER_KEY_NAME = "footer_key"
+COL_KEY = Fernet.generate_key()
+COL_KEY_NAME = "col_key"
Review comment:
Same.
##########
File path: python/pyarrow/tests/parquet/test_parquet_encryption.py
##########
@@ -0,0 +1,388 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import pyarrow as pa
+import pytest
+
+try:
+ import pyarrow.parquet as pq
+except ImportError:
+ pq = None
+
+import base64
+from cryptography.fernet import Fernet
+from cryptography.fernet import InvalidToken
+from collections import OrderedDict
+from datetime import timedelta
+
+DATA_TABLE = pa.Table.from_pydict(
+ OrderedDict([
+ ('a', pa.array([1, 2, 3])),
+ ('b', pa.array(['a', 'b', 'c'])),
+ ('c', pa.array(['x', 'y', 'z']))
+ ])
+)
+PARQUET_NAME = 'encrypted_table.in_mem.parquet'
+FOOTER_KEY = Fernet.generate_key()
+FOOTER_KEY_NAME = "footer_key"
+COL_KEY = Fernet.generate_key()
+COL_KEY_NAME = "col_key"
+BASIC_ENCRYPTION_CONFIG = pq.EncryptionConfiguration(
+ footer_key=FOOTER_KEY_NAME,
+ column_keys={
+ COL_KEY_NAME: ["a", "b"],
+ },
+)
+
+
+class InMemoryKmsClient(pq.KmsClient):
+ """This is a mock class implementation of KmsClient, built for testing
only.
+ """
+
+ def __init__(self, config):
+ """Create an InMemoryKmsClient instance."""
+ pq.KmsClient.__init__(self)
+ self.master_keys_map = config.custom_kms_conf
+
+ def wrap_key(self, key_bytes, master_key_identifier):
+ """Wrap key key_bytes with key identified by master_key_identifier.
+ The result contains nonce concatenated before the encrypted key."""
+ master_key = self.master_keys_map[master_key_identifier]
+ # Create a cipher object to encrypt data
+ cipher = Fernet(master_key.encode('utf-8'))
+ encrypted_key = cipher.encrypt(key_bytes)
+ result = base64.b64encode(encrypted_key)
+ return result
+
+ def unwrap_key(self, wrapped_key, master_key_identifier):
+ """Unwrap wrapped_key with key identified by master_key_identifier"""
+ master_key = self.master_keys_map[master_key_identifier]
+ decoded_wrapped_key = base64.b64decode(wrapped_key)
+ # Create a cipher object to decrypt data
+ cipher = Fernet(master_key.encode('utf-8'))
+ decrypted_key = cipher.decrypt(decoded_wrapped_key)
+ return decrypted_key
+
+
+def verify_file_encrypted(path):
+ """Verify that the file is encrypted by looking at its first 4 bytes.
+ If it's the magic string PARE
+ then this is a parquet with encrypted footer."""
+ with open(path, "rb") as file:
+ magic_str = file.read(4)
+ # Verify magic string for parquet with encrypted footer is PARE
+ assert(magic_str == b'PARE')
+
+
[email protected]
+def test_encrypted_parquet_write_read(tempdir):
+ """Write an encrypted parquet, verify it's encrypted, and then read it."""
+ path = tempdir / PARQUET_NAME
+ table = DATA_TABLE
+
+ # Encrypt the footer with the footer key,
+ # encrypt column `a` and column `b` with another key,
+ # keep `c` plaintext
+ encryption_config = pq.EncryptionConfiguration(
+ footer_key=FOOTER_KEY_NAME,
+ column_keys={
+ COL_KEY_NAME: ["a", "b"],
+ },
+ encryption_algorithm="AES_GCM_V1",
+ cache_lifetime=timedelta(minutes=5.0),
+ data_key_length_bits=256)
+
+ kms_connection_config = pq.KmsConnectionConfig(
+ custom_kms_conf={
+ FOOTER_KEY_NAME: FOOTER_KEY.decode("UTF-8"),
+ COL_KEY_NAME: COL_KEY.decode("UTF-8"),
+ }
+ )
+
+ def kms_factory(kms_connection_configuration):
+ return InMemoryKmsClient(kms_connection_configuration)
+
+ crypto_factory = pq.CryptoFactory(kms_factory)
+ # Write with encryption properties
+ write_encrypted_parquet(path, table, encryption_config,
+ kms_connection_config, crypto_factory)
+ verify_file_encrypted(path)
+
+ # Read with decryption properties
+ decryption_config = pq.DecryptionConfiguration(
+ cache_lifetime=timedelta(minutes=5.0))
+ result_table = read_encrypted_parquet(
+ path, decryption_config, kms_connection_config, crypto_factory)
+ assert table.equals(result_table)
+
+
+def write_encrypted_parquet(path, table, encryption_config,
+ kms_connection_config, crypto_factory):
+ file_encryption_properties = crypto_factory.file_encryption_properties(
+ kms_connection_config, encryption_config)
+ assert(file_encryption_properties is not None)
+ with pq.ParquetWriter(
+ path, table.schema,
+ encryption_properties=file_encryption_properties) as writer:
+ writer.write_table(table)
+
+
+def read_encrypted_parquet(path, decryption_config,
+ kms_connection_config, crypto_factory):
+ file_decryption_properties = crypto_factory.file_decryption_properties(
+ kms_connection_config, decryption_config)
+ assert(file_decryption_properties is not None)
+ meta = pq.read_metadata(
+ path, decryption_properties=file_decryption_properties)
+ assert(meta.num_columns == 3)
+ schema = pq.read_schema(
+ path, decryption_properties=file_decryption_properties)
+ assert(len(schema.names) == 3)
+
+ result = pq.ParquetFile(
+ path, decryption_properties=file_decryption_properties)
+ return result.read(use_threads=False)
+
+
[email protected]
+def test_encrypted_parquet_write_read_wrong_key(tempdir):
+ """Write an encrypted parquet, verify it's encrypted,
+ and then read it using wrong keys."""
+ path = tempdir / PARQUET_NAME
+ table = DATA_TABLE
+
+ # Encrypt the footer with the footer key,
+ # encrypt column `a` and column `b` with another key,
+ # keep `c` plaintext
+ encryption_config = pq.EncryptionConfiguration(
+ footer_key=FOOTER_KEY_NAME,
+ column_keys={
+ COL_KEY_NAME: ["a", "b"],
+ },
+ encryption_algorithm="AES_GCM_V1",
+ cache_lifetime=timedelta(minutes=5.0),
+ data_key_length_bits=256)
+
+ kms_connection_config = pq.KmsConnectionConfig(
+ custom_kms_conf={
+ FOOTER_KEY_NAME: FOOTER_KEY.decode("UTF-8"),
+ COL_KEY_NAME: COL_KEY.decode("UTF-8"),
+ }
+ )
+
+ def kms_factory(kms_connection_configuration):
+ return InMemoryKmsClient(kms_connection_configuration)
+
+ crypto_factory = pq.CryptoFactory(kms_factory)
+ # Write with encryption properties
+ write_encrypted_parquet(path, table, encryption_config,
+ kms_connection_config, crypto_factory)
+ verify_file_encrypted(path)
+
+ # Read with decryption properties
+ wrong_kms_connection_config = pq.KmsConnectionConfig(
+ custom_kms_conf={
+ # Wrong keys - mixup in names
+ FOOTER_KEY_NAME: COL_KEY.decode("UTF-8"),
+ COL_KEY_NAME: FOOTER_KEY.decode("UTF-8"),
+ }
+ )
+ decryption_config = pq.DecryptionConfiguration(
+ cache_lifetime=timedelta(minutes=5.0))
+ with pytest.raises(InvalidToken):
+ result_table = read_encrypted_parquet(
+ path, decryption_config, wrong_kms_connection_config,
+ crypto_factory)
+ assert(result_table is not None)
+
+
[email protected]
+def test_encrypted_parquet_read_no_decryption_config(tempdir):
+ """Write an encrypted parquet, verify it's encrypted,
+ but then try to read it without decryption properties."""
+ with pytest.raises(IOError, match=r"no decryption"):
+ test_encrypted_parquet_write_read(tempdir)
+ path = tempdir / PARQUET_NAME
+ result = pq.ParquetFile(path)
+ assert(result is not None)
Review comment:
Same here and below: the `assert` is never reached?
##########
File path: python/pyarrow/tests/parquet/test_parquet_encryption.py
##########
@@ -0,0 +1,388 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import pyarrow as pa
+import pytest
+
+try:
+ import pyarrow.parquet as pq
+except ImportError:
+ pq = None
+
+import base64
+from cryptography.fernet import Fernet
+from cryptography.fernet import InvalidToken
+from collections import OrderedDict
+from datetime import timedelta
+
+DATA_TABLE = pa.Table.from_pydict(
+ OrderedDict([
+ ('a', pa.array([1, 2, 3])),
+ ('b', pa.array(['a', 'b', 'c'])),
+ ('c', pa.array(['x', 'y', 'z']))
+ ])
+)
+PARQUET_NAME = 'encrypted_table.in_mem.parquet'
+FOOTER_KEY = Fernet.generate_key()
+FOOTER_KEY_NAME = "footer_key"
+COL_KEY = Fernet.generate_key()
+COL_KEY_NAME = "col_key"
+BASIC_ENCRYPTION_CONFIG = pq.EncryptionConfiguration(
+ footer_key=FOOTER_KEY_NAME,
+ column_keys={
+ COL_KEY_NAME: ["a", "b"],
+ },
+)
+
+
+class InMemoryKmsClient(pq.KmsClient):
+ """This is a mock class implementation of KmsClient, built for testing
only.
+ """
+
+ def __init__(self, config):
+ """Create an InMemoryKmsClient instance."""
+ pq.KmsClient.__init__(self)
+ self.master_keys_map = config.custom_kms_conf
+
+ def wrap_key(self, key_bytes, master_key_identifier):
+ """Wrap key key_bytes with key identified by master_key_identifier.
+ The result contains nonce concatenated before the encrypted key."""
+ master_key = self.master_keys_map[master_key_identifier]
+ # Create a cipher object to encrypt data
+ cipher = Fernet(master_key.encode('utf-8'))
+ encrypted_key = cipher.encrypt(key_bytes)
+ result = base64.b64encode(encrypted_key)
+ return result
+
+ def unwrap_key(self, wrapped_key, master_key_identifier):
+ """Unwrap wrapped_key with key identified by master_key_identifier"""
+ master_key = self.master_keys_map[master_key_identifier]
+ decoded_wrapped_key = base64.b64decode(wrapped_key)
+ # Create a cipher object to decrypt data
+ cipher = Fernet(master_key.encode('utf-8'))
+ decrypted_key = cipher.decrypt(decoded_wrapped_key)
+ return decrypted_key
+
+
+def verify_file_encrypted(path):
+ """Verify that the file is encrypted by looking at its first 4 bytes.
+ If it's the magic string PARE
+ then this is a parquet with encrypted footer."""
+ with open(path, "rb") as file:
+ magic_str = file.read(4)
+ # Verify magic string for parquet with encrypted footer is PARE
+ assert(magic_str == b'PARE')
+
+
[email protected]
+def test_encrypted_parquet_write_read(tempdir):
+ """Write an encrypted parquet, verify it's encrypted, and then read it."""
+ path = tempdir / PARQUET_NAME
+ table = DATA_TABLE
+
+ # Encrypt the footer with the footer key,
+ # encrypt column `a` and column `b` with another key,
+ # keep `c` plaintext
+ encryption_config = pq.EncryptionConfiguration(
+ footer_key=FOOTER_KEY_NAME,
+ column_keys={
+ COL_KEY_NAME: ["a", "b"],
+ },
+ encryption_algorithm="AES_GCM_V1",
+ cache_lifetime=timedelta(minutes=5.0),
+ data_key_length_bits=256)
+
+ kms_connection_config = pq.KmsConnectionConfig(
+ custom_kms_conf={
+ FOOTER_KEY_NAME: FOOTER_KEY.decode("UTF-8"),
+ COL_KEY_NAME: COL_KEY.decode("UTF-8"),
+ }
+ )
+
+ def kms_factory(kms_connection_configuration):
+ return InMemoryKmsClient(kms_connection_configuration)
+
+ crypto_factory = pq.CryptoFactory(kms_factory)
+ # Write with encryption properties
+ write_encrypted_parquet(path, table, encryption_config,
+ kms_connection_config, crypto_factory)
+ verify_file_encrypted(path)
+
+ # Read with decryption properties
+ decryption_config = pq.DecryptionConfiguration(
+ cache_lifetime=timedelta(minutes=5.0))
+ result_table = read_encrypted_parquet(
+ path, decryption_config, kms_connection_config, crypto_factory)
+ assert table.equals(result_table)
+
+
+def write_encrypted_parquet(path, table, encryption_config,
+ kms_connection_config, crypto_factory):
+ file_encryption_properties = crypto_factory.file_encryption_properties(
+ kms_connection_config, encryption_config)
+ assert(file_encryption_properties is not None)
+ with pq.ParquetWriter(
+ path, table.schema,
+ encryption_properties=file_encryption_properties) as writer:
+ writer.write_table(table)
+
+
+def read_encrypted_parquet(path, decryption_config,
+ kms_connection_config, crypto_factory):
+ file_decryption_properties = crypto_factory.file_decryption_properties(
+ kms_connection_config, decryption_config)
+ assert(file_decryption_properties is not None)
+ meta = pq.read_metadata(
+ path, decryption_properties=file_decryption_properties)
+ assert(meta.num_columns == 3)
+ schema = pq.read_schema(
+ path, decryption_properties=file_decryption_properties)
+ assert(len(schema.names) == 3)
+
+ result = pq.ParquetFile(
+ path, decryption_properties=file_decryption_properties)
+ return result.read(use_threads=False)
+
+
[email protected]
+def test_encrypted_parquet_write_read_wrong_key(tempdir):
+ """Write an encrypted parquet, verify it's encrypted,
+ and then read it using wrong keys."""
+ path = tempdir / PARQUET_NAME
+ table = DATA_TABLE
+
+ # Encrypt the footer with the footer key,
+ # encrypt column `a` and column `b` with another key,
+ # keep `c` plaintext
+ encryption_config = pq.EncryptionConfiguration(
+ footer_key=FOOTER_KEY_NAME,
+ column_keys={
+ COL_KEY_NAME: ["a", "b"],
+ },
+ encryption_algorithm="AES_GCM_V1",
+ cache_lifetime=timedelta(minutes=5.0),
+ data_key_length_bits=256)
+
+ kms_connection_config = pq.KmsConnectionConfig(
+ custom_kms_conf={
+ FOOTER_KEY_NAME: FOOTER_KEY.decode("UTF-8"),
+ COL_KEY_NAME: COL_KEY.decode("UTF-8"),
+ }
+ )
+
+ def kms_factory(kms_connection_configuration):
+ return InMemoryKmsClient(kms_connection_configuration)
+
+ crypto_factory = pq.CryptoFactory(kms_factory)
+ # Write with encryption properties
+ write_encrypted_parquet(path, table, encryption_config,
+ kms_connection_config, crypto_factory)
+ verify_file_encrypted(path)
+
+ # Read with decryption properties
+ wrong_kms_connection_config = pq.KmsConnectionConfig(
+ custom_kms_conf={
+ # Wrong keys - mixup in names
+ FOOTER_KEY_NAME: COL_KEY.decode("UTF-8"),
+ COL_KEY_NAME: FOOTER_KEY.decode("UTF-8"),
+ }
+ )
+ decryption_config = pq.DecryptionConfiguration(
+ cache_lifetime=timedelta(minutes=5.0))
+ with pytest.raises(InvalidToken):
+ result_table = read_encrypted_parquet(
+ path, decryption_config, wrong_kms_connection_config,
+ crypto_factory)
+ assert(result_table is not None)
+
+
[email protected]
+def test_encrypted_parquet_read_no_decryption_config(tempdir):
+ """Write an encrypted parquet, verify it's encrypted,
+ but then try to read it without decryption properties."""
+ with pytest.raises(IOError, match=r"no decryption"):
+ test_encrypted_parquet_write_read(tempdir)
+ path = tempdir / PARQUET_NAME
+ result = pq.ParquetFile(path)
+ assert(result is not None)
+
+
[email protected]
+def test_encrypted_parquet_read_metadata_no_decryption_config(tempdir):
+ """Write an encrypted parquet, verify it's encrypted,
+ but then try to read its metadata without decryption properties."""
+ with pytest.raises(IOError, match=r"no decryption"):
+ test_encrypted_parquet_write_read(tempdir)
+ path = tempdir / PARQUET_NAME
+ meta = pq.read_metadata(path)
+ assert(meta is not None)
+
+
[email protected]
+def test_encrypted_parquet_read_schema_no_decryption_config(tempdir):
+ """Write an encrypted parquet, verify it's encrypted,
+ but then try to read its schema without decryption properties."""
+ with pytest.raises(IOError, match=r"no decryption"):
+ test_encrypted_parquet_write_read(tempdir)
+ path = tempdir / PARQUET_NAME
+ schema = pq.read_schema(path)
+ assert(schema is not None)
+
+
[email protected]
+def test_encrypted_parquet_write_no_col_key(tempdir):
+ """Write an encrypted parquet, but give only footer key,
+ without column key."""
+ path = tempdir / 'encrypted_table_no_col_key.in_mem.parquet'
+ table = DATA_TABLE
+
+ # Encrypt the footer with the footer key
+ encryption_config = pq.EncryptionConfiguration(
+ footer_key=FOOTER_KEY_NAME)
+
+ kms_connection_config = pq.KmsConnectionConfig(
+ custom_kms_conf={
+ FOOTER_KEY_NAME: FOOTER_KEY.decode("UTF-8"),
+ COL_KEY_NAME: COL_KEY.decode("UTF-8"),
+ }
+ )
+
+ def kms_factory(kms_connection_configuration):
+ return InMemoryKmsClient(kms_connection_configuration)
+
+ with pytest.raises(RuntimeError, match=r"column_keys"):
+ crypto_factory = pq.CryptoFactory(kms_factory)
+ # Write with encryption properties
+ write_encrypted_parquet(path, table, encryption_config,
+ kms_connection_config, crypto_factory)
+
+
[email protected]
+def test_encrypted_parquet_write_kms_error(tempdir):
+ """Write an encrypted parquet, but raise KeyError in KmsClient."""
+ path = tempdir / 'encrypted_table_kms_error.in_mem.parquet'
+ table = DATA_TABLE
+
+ encryption_config = BASIC_ENCRYPTION_CONFIG
+
+ # Empty master_keys_map
+ kms_connection_config = pq.KmsConnectionConfig()
+
+ def kms_factory(kms_connection_configuration):
+ # Empty master keys map will cause KeyError to be raised
+ # on wrap/unwrap calls
+ return InMemoryKmsClient(kms_connection_configuration)
+
+ with pytest.raises(RuntimeError, match="footer_key.*KeyError"):
+ crypto_factory = pq.CryptoFactory(kms_factory)
+ # Write with encryption properties
+ write_encrypted_parquet(path, table, encryption_config,
+ kms_connection_config, crypto_factory)
+
+
[email protected]
+def test_encrypted_parquet_write_kms_specific_error(tempdir):
+ """Write an encrypted parquet, but raise KeyError in KmsClient."""
+ path = tempdir / 'encrypted_table_kms_error.in_mem.parquet'
+ table = DATA_TABLE
+
+ encryption_config = BASIC_ENCRYPTION_CONFIG
+
+ # Empty master_keys_map
+ kms_connection_config = pq.KmsConnectionConfig()
+
+ class ThrowingKmsClient(pq.KmsClient):
+ """A KmsClient implementation that throws exception in
+ wrap/unwrap calls
+ """
+
+ def __init__(self, config):
+ """Create an InMemoryKmsClient instance."""
+ pq.KmsClient.__init__(self)
+ self.config = config
+
+ def wrap_key(self, key_bytes, master_key_identifier):
+ raise ValueError("Cannot Wrap Key")
+
+ def unwrap_key(self, wrapped_key, master_key_identifier):
+ raise ValueError("Cannot Unwrap Key")
+
+ def kms_factory(kms_connection_configuration):
+ # Exception thrown in wrap/unwrap calls
+ return ThrowingKmsClient(kms_connection_configuration)
+
+ with pytest.raises(RuntimeError, match="Cannot Wrap Key.*ValueError"):
+ crypto_factory = pq.CryptoFactory(kms_factory)
+ # Write with encryption properties
+ write_encrypted_parquet(path, table, encryption_config,
+ kms_connection_config, crypto_factory)
+
+
[email protected]
+def test_encrypted_parquet_write_kms_factory_error(tempdir):
+ """Write an encrypted parquet, but raise ValueError in kms_factory."""
+ path = tempdir / 'encrypted_table_kms_factory_error.in_mem.parquet'
+ table = DATA_TABLE
+
+ encryption_config = BASIC_ENCRYPTION_CONFIG
+
+ # Empty master_keys_map
+ kms_connection_config = pq.KmsConnectionConfig()
+
+ def kms_factory(kms_connection_configuration):
+ raise ValueError('Cannot create KmsClient')
+
+ with pytest.raises(RuntimeError,
+ match="Cannot create KmsClient.*ValueError"):
Review comment:
I'm not sure why `RuntimeError` is raised, instead of the original
`ValueError`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]