jorisvandenbossche commented on a change in pull request #10450:
URL: https://github.com/apache/arrow/pull/10450#discussion_r714751514



##########
File path: docs/source/python/parquet.rst
##########
@@ -604,3 +604,171 @@ One example is Azure Blob storage, which can be 
interfaced through the
 
     abfs = AzureBlobFileSystem(account_name="XXXX", account_key="XXXX", 
container_name="XXXX")
     table = pq.read_table("file.parquet", filesystem=abfs)
+
+Parquet Modular Encryption (Columnar Encryption)
+------------------------------------------------
+
+Columnar encryption is supported for Parquet files starting from
+Apache Arrow 4.0.0.

Review comment:
       It was already supported before in C++? But were you able to use it from 
Python? (this PR is for 6.0.0)

##########
File path: docs/source/python/parquet.rst
##########
@@ -604,3 +604,171 @@ One example is Azure Blob storage, which can be 
interfaced through the
 
     abfs = AzureBlobFileSystem(account_name="XXXX", account_key="XXXX", 
container_name="XXXX")
     table = pq.read_table("file.parquet", filesystem=abfs)
+
+Parquet Modular Encryption (Columnar Encryption)
+------------------------------------------------
+
+Columnar encryption is supported for Parquet files starting from
+Apache Arrow 4.0.0.
+
+Parquet uses the envelope encryption practice, where file parts are encrypted
+with "data encryption keys" (DEKs), and the DEKs are encrypted with "master
+encryption keys" (MEKs). The DEKs are randomly generated by Parquet for each
+encrypted file/column. The MEKs are generated, stored and managed in a Key
+Management Service (KMS) of user’s choice.
+
+Reading and writing encrypted parquet files involves passing file encryption
+and decryption properties to :class:`~pyarrow.parquet.ParquetWriter` and to
+:class:`~.ParquetFile`, respectively.
+
+Writing an encrypted parquet:
+
+.. code-block:: python
+
+   encryption_properties = crypto_factory.file_encryption_properties(
+                                    kms_connection_config, encryption_config)
+   with pq.ParquetWriter(filename, schema,
+                        encryption_properties=encryption_properties) as writer:
+      writer.write_table(table)
+
+Reading an encrypted parquet:
+
+.. code-block:: python
+
+   decryption_properties = crypto_factory.file_decryption_properties(
+                                                    kms_connection_config)
+   parquet_file = pq.ParquetFile(filename,
+                              decryption_properties=decryption_properties)

Review comment:
       ```suggestion
                                    decryption_properties=decryption_properties)
   ```

##########
File path: docs/source/python/parquet.rst
##########
@@ -604,3 +604,171 @@ One example is Azure Blob storage, which can be 
interfaced through the
 
     abfs = AzureBlobFileSystem(account_name="XXXX", account_key="XXXX", 
container_name="XXXX")
     table = pq.read_table("file.parquet", filesystem=abfs)
+
+Parquet Modular Encryption (Columnar Encryption)
+------------------------------------------------
+
+Columnar encryption is supported for Parquet files starting from
+Apache Arrow 4.0.0.
+
+Parquet uses the envelope encryption practice, where file parts are encrypted
+with "data encryption keys" (DEKs), and the DEKs are encrypted with "master
+encryption keys" (MEKs). The DEKs are randomly generated by Parquet for each
+encrypted file/column. The MEKs are generated, stored and managed in a Key
+Management Service (KMS) of user’s choice.
+
+Reading and writing encrypted parquet files involves passing file encryption
+and decryption properties to :class:`~pyarrow.parquet.ParquetWriter` and to
+:class:`~.ParquetFile`, respectively.
+
+Writing an encrypted parquet:
+
+.. code-block:: python
+
+   encryption_properties = crypto_factory.file_encryption_properties(
+                                    kms_connection_config, encryption_config)
+   with pq.ParquetWriter(filename, schema,
+                        encryption_properties=encryption_properties) as writer:
+      writer.write_table(table)
+
+Reading an encrypted parquet:
+
+.. code-block:: python
+
+   decryption_properties = crypto_factory.file_decryption_properties(
+                                                    kms_connection_config)
+   parquet_file = pq.ParquetFile(filename,
+                              decryption_properties=decryption_properties)
+
+
+In order to create the encryption and decryption properties, a 
``CryptoFactory``
+should be created and initialized with KMS Client details, as described below.
+
+
+KMS Client
+~~~~~~~~~~
+
+The master encryption keys must be kept and managed in a production-grade KMS
+system, deployed in user's organization. Using Parquet encryption requires
+implementation of a client class for the KMS server.
+Any KmsClient implementation should implement the following informal interface:
+
+.. code-block:: python
+
+   class KmsClient:
+   def wrap_key(self, key_bytes, master_key_identifier):
+      """Wrap a key - encrypt it with the master key."""
+       raise NotImplementedError()
+
+   def unwrap_key(self, wrapped_key, master_key_identifier):
+      """Unwrap a key - decrypt it with the master key."""
+      raise NotImplementedError()
+
+An example KmsClient impelementation might look like the following:
+
+.. code-block:: python
+   class MyKmsClient(pq.KmsClient):

Review comment:
       ```suggestion
   .. code-block:: python
   
      class MyKmsClient(pq.KmsClient):
   ```

##########
File path: docs/source/python/parquet.rst
##########
@@ -604,3 +604,171 @@ One example is Azure Blob storage, which can be 
interfaced through the
 
     abfs = AzureBlobFileSystem(account_name="XXXX", account_key="XXXX", 
container_name="XXXX")
     table = pq.read_table("file.parquet", filesystem=abfs)
+
+Parquet Modular Encryption (Columnar Encryption)
+------------------------------------------------
+
+Columnar encryption is supported for Parquet files starting from
+Apache Arrow 4.0.0.
+
+Parquet uses the envelope encryption practice, where file parts are encrypted
+with "data encryption keys" (DEKs), and the DEKs are encrypted with "master
+encryption keys" (MEKs). The DEKs are randomly generated by Parquet for each
+encrypted file/column. The MEKs are generated, stored and managed in a Key
+Management Service (KMS) of user’s choice.
+
+Reading and writing encrypted parquet files involves passing file encryption
+and decryption properties to :class:`~pyarrow.parquet.ParquetWriter` and to
+:class:`~.ParquetFile`, respectively.
+
+Writing an encrypted parquet:
+
+.. code-block:: python
+
+   encryption_properties = crypto_factory.file_encryption_properties(
+                                    kms_connection_config, encryption_config)
+   with pq.ParquetWriter(filename, schema,
+                        encryption_properties=encryption_properties) as writer:
+      writer.write_table(table)
+
+Reading an encrypted parquet:
+
+.. code-block:: python
+
+   decryption_properties = crypto_factory.file_decryption_properties(
+                                                    kms_connection_config)
+   parquet_file = pq.ParquetFile(filename,
+                              decryption_properties=decryption_properties)
+
+
+In order to create the encryption and decryption properties, a 
``CryptoFactory``
+should be created and initialized with KMS Client details, as described below.
+
+
+KMS Client
+~~~~~~~~~~
+
+The master encryption keys must be kept and managed in a production-grade KMS
+system, deployed in user's organization. Using Parquet encryption requires
+implementation of a client class for the KMS server.
+Any KmsClient implementation should implement the following informal interface:
+
+.. code-block:: python
+
+   class KmsClient:
+   def wrap_key(self, key_bytes, master_key_identifier):

Review comment:
       Need to add some indentation here

##########
File path: docs/source/python/parquet.rst
##########
@@ -604,3 +604,171 @@ One example is Azure Blob storage, which can be 
interfaced through the
 
     abfs = AzureBlobFileSystem(account_name="XXXX", account_key="XXXX", 
container_name="XXXX")
     table = pq.read_table("file.parquet", filesystem=abfs)
+
+Parquet Modular Encryption (Columnar Encryption)
+------------------------------------------------
+
+Columnar encryption is supported for Parquet files starting from
+Apache Arrow 4.0.0.
+
+Parquet uses the envelope encryption practice, where file parts are encrypted
+with "data encryption keys" (DEKs), and the DEKs are encrypted with "master
+encryption keys" (MEKs). The DEKs are randomly generated by Parquet for each
+encrypted file/column. The MEKs are generated, stored and managed in a Key
+Management Service (KMS) of user’s choice.
+
+Reading and writing encrypted parquet files involves passing file encryption
+and decryption properties to :class:`~pyarrow.parquet.ParquetWriter` and to
+:class:`~.ParquetFile`, respectively.
+
+Writing an encrypted parquet:
+
+.. code-block:: python
+
+   encryption_properties = crypto_factory.file_encryption_properties(
+                                    kms_connection_config, encryption_config)
+   with pq.ParquetWriter(filename, schema,
+                        encryption_properties=encryption_properties) as writer:
+      writer.write_table(table)
+
+Reading an encrypted parquet:
+
+.. code-block:: python
+
+   decryption_properties = crypto_factory.file_decryption_properties(
+                                                    kms_connection_config)
+   parquet_file = pq.ParquetFile(filename,
+                              decryption_properties=decryption_properties)
+
+
+In order to create the encryption and decryption properties, a 
``CryptoFactory``
+should be created and initialized with KMS Client details, as described below.
+
+
+KMS Client
+~~~~~~~~~~
+
+The master encryption keys must be kept and managed in a production-grade KMS
+system, deployed in user's organization. Using Parquet encryption requires
+implementation of a client class for the KMS server.
+Any KmsClient implementation should implement the following informal interface:
+
+.. code-block:: python
+
+   class KmsClient:
+   def wrap_key(self, key_bytes, master_key_identifier):
+      """Wrap a key - encrypt it with the master key."""
+       raise NotImplementedError()
+
+   def unwrap_key(self, wrapped_key, master_key_identifier):
+      """Unwrap a key - decrypt it with the master key."""
+      raise NotImplementedError()
+
+An example KmsClient impelementation might look like the following:
+
+.. code-block:: python
+   class MyKmsClient(pq.KmsClient):
+      def __init__(self, kms_connection_configuration):
+         pq.KmsClient.__init__(self)

Review comment:
       The subclassing from pq.KmsClient is required, right? In that case, I 
would also add it to the minimal code snippet laying out the required methods 
above (or maybe combine the two code blocks into one)

##########
File path: docs/source/python/parquet.rst
##########
@@ -604,3 +604,171 @@ One example is Azure Blob storage, which can be 
interfaced through the
 
     abfs = AzureBlobFileSystem(account_name="XXXX", account_key="XXXX", 
container_name="XXXX")
     table = pq.read_table("file.parquet", filesystem=abfs)
+
+Parquet Modular Encryption (Columnar Encryption)
+------------------------------------------------
+
+Columnar encryption is supported for Parquet files starting from
+Apache Arrow 4.0.0.
+
+Parquet uses the envelope encryption practice, where file parts are encrypted
+with "data encryption keys" (DEKs), and the DEKs are encrypted with "master
+encryption keys" (MEKs). The DEKs are randomly generated by Parquet for each
+encrypted file/column. The MEKs are generated, stored and managed in a Key
+Management Service (KMS) of user’s choice.
+
+Reading and writing encrypted parquet files involves passing file encryption
+and decryption properties to :class:`~pyarrow.parquet.ParquetWriter` and to
+:class:`~.ParquetFile`, respectively.
+
+Writing an encrypted parquet:
+
+.. code-block:: python
+
+   encryption_properties = crypto_factory.file_encryption_properties(
+                                    kms_connection_config, encryption_config)
+   with pq.ParquetWriter(filename, schema,
+                        encryption_properties=encryption_properties) as writer:
+      writer.write_table(table)
+
+Reading an encrypted parquet:
+
+.. code-block:: python
+
+   decryption_properties = crypto_factory.file_decryption_properties(
+                                                    kms_connection_config)
+   parquet_file = pq.ParquetFile(filename,
+                              decryption_properties=decryption_properties)
+
+
+In order to create the encryption and decryption properties, a 
``CryptoFactory``
+should be created and initialized with KMS Client details, as described below.
+
+
+KMS Client
+~~~~~~~~~~
+
+The master encryption keys must be kept and managed in a production-grade KMS
+system, deployed in user's organization. Using Parquet encryption requires
+implementation of a client class for the KMS server.
+Any KmsClient implementation should implement the following informal interface:
+
+.. code-block:: python
+
+   class KmsClient:
+   def wrap_key(self, key_bytes, master_key_identifier):
+      """Wrap a key - encrypt it with the master key."""
+       raise NotImplementedError()
+
+   def unwrap_key(self, wrapped_key, master_key_identifier):
+      """Unwrap a key - decrypt it with the master key."""
+      raise NotImplementedError()
+
+An example KmsClient impelementation might look like the following:

Review comment:
       ```suggestion
   An example KmsClient implementation might look like the following:
   ```

##########
File path: python/pyarrow/_parquet.pyx
##########
@@ -1452,3 +1505,424 @@ cdef class ParquetWriter(_Weakrefable):
             return result
         raise RuntimeError(
             'file metadata is only available after writer close')
+
+cdef class EncryptionConfiguration(_Weakrefable):
+    cdef:
+        shared_ptr[CEncryptionConfiguration] configuration
+
+    # Avoid mistakingly creating attributes
+    __slots__ = ()
+
+    def __init__(self, footer_key, *, column_keys=None,
+                 uniform_encryption=None, encryption_algorithm=None,
+                 plaintext_footer=None, double_wrapping=None,
+                 cache_lifetime=None, internal_key_material=None,
+                 data_key_length_bits=None):
+        self.configuration.reset(
+            new CEncryptionConfiguration(tobytes(footer_key)))
+        if column_keys is not None:
+            self.column_keys = column_keys
+        if uniform_encryption is not None:
+            self.uniform_encryption = uniform_encryption
+        if encryption_algorithm is not None:
+            self.encryption_algorithm = encryption_algorithm
+        if plaintext_footer is not None:
+            self.plaintext_footer = plaintext_footer
+        if double_wrapping is not None:
+            self.double_wrapping = double_wrapping
+        if cache_lifetime is not None:
+            self.cache_lifetime = cache_lifetime
+        if internal_key_material is not None:
+            self.internal_key_material = internal_key_material
+        if data_key_length_bits is not None:
+            self.data_key_length_bits = data_key_length_bits
+
+    @property
+    def footer_key(self):
+        """ID of the master key for footer encryption/signing"""
+        return frombytes(self.configuration.get().footer_key)
+
+    @property
+    def column_keys(self):
+        """
+        List of columns to encrypt, with master key IDs.
+        """
+        column_keys_str = frombytes(self.configuration.get().column_keys)
+        # Convert from "masterKeyID:colName,colName;masterKeyID:colName..."
+        # (see HIVE-21848) to dictionary of master key ID to column name lists
+        column_keys_to_key_list_str = dict(subString.replace(" ", "").split(
+            ":") for subString in column_keys_str.split(";"))
+        column_keys_dict = {k: v.split(
+            ",") for k, v in column_keys_to_key_list_str.items()}
+        return column_keys_dict
+
+    @column_keys.setter
+    def column_keys(self, dict value):
+        if value is not None:
+            # convert a dictionary such as
+            # '{"key1": ["col1 ", "col2"], "key2": ["col3 ", "col4"]}''
+            # to the string defined by the spec
+            # 'key1: col1 , col2; key2: col3 , col4'
+            column_keys = "; ".join(
+                ["{}: {}".format(k, ", ".join(v)) for k, v in value.items()])
+            self.configuration.get().column_keys = tobytes(column_keys)
+
+    @property
+    def uniform_encryption(self):
+        """Encrypt footer and all columns with the same encryption key."""
+        return self.configuration.get().uniform_encryption
+
+    @uniform_encryption.setter
+    def uniform_encryption(self, value):
+        self.configuration.get().uniform_encryption = value
+
+    @property
+    def encryption_algorithm(self):
+        """Parquet encryption algorithm.
+        Can be "AES_GCM_V1" (default), or "AES_GCM_CTR_V1"."""
+        return cipher_to_name(self.configuration.get().encryption_algorithm)
+
+    @encryption_algorithm.setter
+    def encryption_algorithm(self, value):
+        cipher = cipher_from_name(value)
+        self.configuration.get().encryption_algorithm = cipher
+
+    @property
+    def plaintext_footer(self):
+        """Write files with plaintext footer."""
+        return self.configuration.get().plaintext_footer
+
+    @plaintext_footer.setter
+    def plaintext_footer(self, value):
+        self.configuration.get().plaintext_footer = value
+
+    @property
+    def double_wrapping(self):
+        """Use double wrapping - where data encryption keys (DEKs) are
+        encrypted with key encryption keys (KEKs), which in turn are
+        encrypted with master keys.
+        If set to false, use single wrapping - where DEKs are
+        encrypted directly with master keys."""
+        return self.configuration.get().double_wrapping
+
+    @double_wrapping.setter
+    def double_wrapping(self, value):
+        self.configuration.get().double_wrapping = value
+
+    @property
+    def cache_lifetime(self):
+        """Lifetime of cached entities (key encryption keys,
+        local wrapping keys, KMS client objects)."""
+        return timedelta(
+            seconds=self.configuration.get().cache_lifetime_seconds)
+
+    @cache_lifetime.setter
+    def cache_lifetime(self, value):
+        if not isinstance(value, timedelta):
+            raise TypeError("cache_lifetime should be a timedelta")
+        self.configuration.get().cache_lifetime_seconds = value.total_seconds()
+
+    @property
+    def internal_key_material(self):
+        """Store key material inside Parquet file footers; this mode doesn’t
+        produce additional files. If set to false, key material is stored in
+        separate files in the same folder, which enables key rotation for
+        immutable Parquet files."""
+        return self.configuration.get().internal_key_material
+
+    @internal_key_material.setter
+    def internal_key_material(self, value):
+        self.configuration.get().internal_key_material = value
+
+    @property
+    def data_key_length_bits(self):
+        """Length of data encryption keys (DEKs), randomly generated by 
parquet key
+        management tools. Can be 128, 192 or 256 bits."""
+        return self.configuration.get().data_key_length_bits
+
+    @data_key_length_bits.setter
+    def data_key_length_bits(self, value):
+        self.configuration.get().data_key_length_bits = value
+
+    cdef inline shared_ptr[CEncryptionConfiguration] unwrap(self) nogil:
+        return self.configuration
+
+cdef class DecryptionConfiguration(_Weakrefable):
+    cdef:
+        shared_ptr[CDecryptionConfiguration] configuration
+
+    # Avoid mistakingly creating attributes
+    __slots__ = ()
+
+    def __init__(self, *, cache_lifetime=None):
+        self.configuration.reset(new CDecryptionConfiguration())
+
+    @property
+    def cache_lifetime(self):
+        """Lifetime of cached entities (key encryption keys,
+        local wrapping keys, KMS client objects)."""
+        return timedelta(
+            seconds=self.configuration.get().cache_lifetime_seconds)
+
+    @cache_lifetime.setter
+    def cache_lifetime(self, value):
+        self.configuration.get().cache_lifetime_seconds = value.total_seconds()
+
+    cdef inline shared_ptr[CDecryptionConfiguration] unwrap(self) nogil:
+        return self.configuration
+
+
+cdef class KmsConnectionConfig(_Weakrefable):

Review comment:
       Can you give those classes that are exposed publicly a docstring?

##########
File path: python/pyarrow/tests/parquet/test_parquet_encryption.py
##########
@@ -0,0 +1,603 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import base64
+import pytest
+from datetime import timedelta
+
+import pyarrow as pa
+try:
+    import pyarrow.parquet as pq
+except ImportError:
+    pq = None
+
+
+PARQUET_NAME = 'encrypted_table.in_mem.parquet'
+FOOTER_KEY = b"0123456789112345"
+FOOTER_KEY_NAME = "footer_key"
+COL_KEY = b"1234567890123450"
+COL_KEY_NAME = "col_key"
+BASIC_ENCRYPTION_CONFIG = pq.EncryptionConfiguration(
+    footer_key=FOOTER_KEY_NAME,
+    column_keys={
+        COL_KEY_NAME: ["a", "b"],
+    },
+)
+
+
+@pytest.fixture(scope='module')
+def data_table():
+    data_table = pa.Table.from_pydict({
+        'a': pa.array([1, 2, 3]),
+        'b': pa.array(['a', 'b', 'c']),
+        'c': pa.array(['x', 'y', 'z'])
+    })
+    return data_table
+
+
+class InMemoryKmsClient(pq.KmsClient):
+    """This is a mock class implementation of KmsClient, built for testing 
only.
+    """
+
+    def __init__(self, config):
+        """Create an InMemoryKmsClient instance."""
+        pq.KmsClient.__init__(self)
+        self.master_keys_map = config.custom_kms_conf
+
+    def wrap_key(self, key_bytes, master_key_identifier):
+        """Not a secure cipher - the wrapped key
+        is just the master key concatenated with key bytes"""
+        master_key_bytes = self.master_keys_map[master_key_identifier].encode(
+            'utf-8')
+        wrapped_key = b"".join([master_key_bytes, key_bytes])
+        result = base64.b64encode(wrapped_key)
+        return result
+
+    def unwrap_key(self, wrapped_key, master_key_identifier):
+        """Not a secure cipher - just extract the key from
+        the wrapped key"""
+        expected_master_key = self.master_keys_map[master_key_identifier]
+        decoded_wrapped_key = base64.b64decode(wrapped_key)
+        master_key_bytes = decoded_wrapped_key[:16]
+        decrypted_key = decoded_wrapped_key[16:]
+        if (expected_master_key == master_key_bytes.decode('utf-8')):
+            return decrypted_key
+        raise ValueError("Incorrect master key used",
+                         master_key_bytes, decrypted_key)
+
+
+def verify_file_encrypted(path):
+    """Verify that the file is encrypted by looking at its first 4 bytes.
+    If it's the magic string PARE
+    then this is a parquet with encrypted footer."""
+    with open(path, "rb") as file:
+        magic_str = file.read(4)
+        # Verify magic string for parquet with encrypted footer is PARE
+        assert(magic_str == b'PARE')
+
+
+@pytest.mark.parquet

Review comment:
       You can put a `pytestmark = pytest.mark.parquet` at the top of the file 
(after the imports), then you don't have to mark each test separately

##########
File path: python/pyarrow/tests/parquet/test_parquet_encryption.py
##########
@@ -0,0 +1,603 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import base64
+import pytest
+from datetime import timedelta
+
+import pyarrow as pa
+try:
+    import pyarrow.parquet as pq
+except ImportError:
+    pq = None
+
+
+PARQUET_NAME = 'encrypted_table.in_mem.parquet'
+FOOTER_KEY = b"0123456789112345"
+FOOTER_KEY_NAME = "footer_key"
+COL_KEY = b"1234567890123450"
+COL_KEY_NAME = "col_key"
+BASIC_ENCRYPTION_CONFIG = pq.EncryptionConfiguration(
+    footer_key=FOOTER_KEY_NAME,
+    column_keys={
+        COL_KEY_NAME: ["a", "b"],
+    },
+)
+
+
+@pytest.fixture(scope='module')
+def data_table():
+    data_table = pa.Table.from_pydict({
+        'a': pa.array([1, 2, 3]),
+        'b': pa.array(['a', 'b', 'c']),
+        'c': pa.array(['x', 'y', 'z'])
+    })
+    return data_table
+
+
+class InMemoryKmsClient(pq.KmsClient):
+    """This is a mock class implementation of KmsClient, built for testing 
only.
+    """
+
+    def __init__(self, config):
+        """Create an InMemoryKmsClient instance."""
+        pq.KmsClient.__init__(self)
+        self.master_keys_map = config.custom_kms_conf
+
+    def wrap_key(self, key_bytes, master_key_identifier):
+        """Not a secure cipher - the wrapped key
+        is just the master key concatenated with key bytes"""
+        master_key_bytes = self.master_keys_map[master_key_identifier].encode(
+            'utf-8')
+        wrapped_key = b"".join([master_key_bytes, key_bytes])
+        result = base64.b64encode(wrapped_key)
+        return result
+
+    def unwrap_key(self, wrapped_key, master_key_identifier):
+        """Not a secure cipher - just extract the key from
+        the wrapped key"""
+        expected_master_key = self.master_keys_map[master_key_identifier]
+        decoded_wrapped_key = base64.b64decode(wrapped_key)
+        master_key_bytes = decoded_wrapped_key[:16]
+        decrypted_key = decoded_wrapped_key[16:]
+        if (expected_master_key == master_key_bytes.decode('utf-8')):
+            return decrypted_key
+        raise ValueError("Incorrect master key used",
+                         master_key_bytes, decrypted_key)
+
+
+def verify_file_encrypted(path):
+    """Verify that the file is encrypted by looking at its first 4 bytes.
+    If it's the magic string PARE
+    then this is a parquet with encrypted footer."""
+    with open(path, "rb") as file:
+        magic_str = file.read(4)
+        # Verify magic string for parquet with encrypted footer is PARE
+        assert(magic_str == b'PARE')
+
+
+@pytest.mark.parquet
+def test_encrypted_parquet_write_read(tempdir, data_table):
+    """Write an encrypted parquet, verify it's encrypted, and then read it."""
+    path = tempdir / PARQUET_NAME
+
+    # Encrypt the footer with the footer key,
+    # encrypt column `a` and column `b` with another key,
+    # keep `c` plaintext
+    encryption_config = pq.EncryptionConfiguration(
+        footer_key=FOOTER_KEY_NAME,
+        column_keys={
+            COL_KEY_NAME: ["a", "b"],
+        },
+        encryption_algorithm="AES_GCM_V1",
+        cache_lifetime=timedelta(minutes=5.0),
+        data_key_length_bits=256)
+
+    kms_connection_config = pq.KmsConnectionConfig(
+        custom_kms_conf={
+            FOOTER_KEY_NAME: FOOTER_KEY.decode("UTF-8"),
+            COL_KEY_NAME: COL_KEY.decode("UTF-8"),
+        }
+    )
+
+    def kms_factory(kms_connection_configuration):
+        return InMemoryKmsClient(kms_connection_configuration)
+
+    crypto_factory = pq.CryptoFactory(kms_factory)
+    # Write with encryption properties
+    write_encrypted_parquet(path, data_table, encryption_config,
+                            kms_connection_config, crypto_factory)
+    verify_file_encrypted(path)
+
+    # Read with decryption properties
+    decryption_config = pq.DecryptionConfiguration(
+        cache_lifetime=timedelta(minutes=5.0))
+    result_table = read_encrypted_parquet(
+        path, decryption_config, kms_connection_config, crypto_factory)
+    assert data_table.equals(result_table)
+
+
+def write_encrypted_parquet(path, table, encryption_config,
+                            kms_connection_config, crypto_factory):
+    file_encryption_properties = crypto_factory.file_encryption_properties(
+        kms_connection_config, encryption_config)
+    assert(file_encryption_properties is not None)
+    with pq.ParquetWriter(
+            path, table.schema,
+            encryption_properties=file_encryption_properties) as writer:
+        writer.write_table(table)
+
+
+def read_encrypted_parquet(path, decryption_config,
+                           kms_connection_config, crypto_factory):
+    file_decryption_properties = crypto_factory.file_decryption_properties(
+        kms_connection_config, decryption_config)
+    assert(file_decryption_properties is not None)
+    meta = pq.read_metadata(
+        path, decryption_properties=file_decryption_properties)
+    assert(meta.num_columns == 3)
+    schema = pq.read_schema(
+        path, decryption_properties=file_decryption_properties)
+    assert(len(schema.names) == 3)
+
+    result = pq.ParquetFile(
+        path, decryption_properties=file_decryption_properties)
+    return result.read(use_threads=False)
+
+
+@pytest.mark.parquet
+def test_encrypted_parquet_write_read_wrong_key(tempdir, data_table):
+    """Write an encrypted parquet, verify it's encrypted,
+    and then read it using wrong keys."""
+    path = tempdir / PARQUET_NAME
+
+    # Encrypt the footer with the footer key,
+    # encrypt column `a` and column `b` with another key,
+    # keep `c` plaintext
+    encryption_config = pq.EncryptionConfiguration(
+        footer_key=FOOTER_KEY_NAME,
+        column_keys={
+            COL_KEY_NAME: ["a", "b"],
+        },
+        encryption_algorithm="AES_GCM_V1",
+        cache_lifetime=timedelta(minutes=5.0),
+        data_key_length_bits=256)
+
+    kms_connection_config = pq.KmsConnectionConfig(
+        custom_kms_conf={
+            FOOTER_KEY_NAME: FOOTER_KEY.decode("UTF-8"),
+            COL_KEY_NAME: COL_KEY.decode("UTF-8"),
+        }
+    )
+
+    def kms_factory(kms_connection_configuration):
+        return InMemoryKmsClient(kms_connection_configuration)
+
+    crypto_factory = pq.CryptoFactory(kms_factory)
+    # Write with encryption properties
+    write_encrypted_parquet(path, data_table, encryption_config,
+                            kms_connection_config, crypto_factory)
+    verify_file_encrypted(path)
+
+    # Read with decryption properties
+    wrong_kms_connection_config = pq.KmsConnectionConfig(
+        custom_kms_conf={
+            # Wrong keys - mixup in names
+            FOOTER_KEY_NAME: COL_KEY.decode("UTF-8"),
+            COL_KEY_NAME: FOOTER_KEY.decode("UTF-8"),
+        }
+    )
+    decryption_config = pq.DecryptionConfiguration(
+        cache_lifetime=timedelta(minutes=5.0))
+    with pytest.raises(ValueError, match=r"Incorrect master key used"):
+        read_encrypted_parquet(
+            path, decryption_config, wrong_kms_connection_config,
+            crypto_factory)
+
+
+@pytest.mark.parquet
+def test_encrypted_parquet_read_no_decryption_config(tempdir, data_table):
+    """Write an encrypted parquet, verify it's encrypted,
+    but then try to read it without decryption properties."""
+    test_encrypted_parquet_write_read(tempdir, data_table)
+    # Read without decryption properties
+    with pytest.raises(IOError, match=r"no decryption"):
+        pq.ParquetFile(tempdir / PARQUET_NAME).read()
+
+
+@pytest.mark.parquet
+def test_encrypted_parquet_read_metadata_no_decryption_config(
+        tempdir, data_table):
+    """Write an encrypted parquet, verify it's encrypted,
+    but then try to read its metadata without decryption properties."""
+    test_encrypted_parquet_write_read(tempdir, data_table)
+    # Read metadata without decryption properties
+    with pytest.raises(IOError, match=r"no decryption"):
+        pq.read_metadata(tempdir / PARQUET_NAME)
+
+
+@pytest.mark.parquet
+def test_encrypted_parquet_read_schema_no_decryption_config(
+        tempdir, data_table):
+    """Write an encrypted parquet, verify it's encrypted,
+    but then try to read its schema without decryption properties."""
+    test_encrypted_parquet_write_read(tempdir, data_table)
+    with pytest.raises(IOError, match=r"no decryption"):
+        pq.read_schema(tempdir / PARQUET_NAME)
+
+
+@pytest.mark.parquet
+def test_encrypted_parquet_write_no_col_key(tempdir, data_table):
+    """Write an encrypted parquet, but give only footer key,
+    without column key."""
+    path = tempdir / 'encrypted_table_no_col_key.in_mem.parquet'
+
+    # Encrypt the footer with the footer key
+    encryption_config = pq.EncryptionConfiguration(
+        footer_key=FOOTER_KEY_NAME)
+
+    kms_connection_config = pq.KmsConnectionConfig(
+        custom_kms_conf={
+            FOOTER_KEY_NAME: FOOTER_KEY.decode("UTF-8"),
+            COL_KEY_NAME: COL_KEY.decode("UTF-8"),
+        }
+    )
+
+    def kms_factory(kms_connection_configuration):
+        return InMemoryKmsClient(kms_connection_configuration)
+
+    crypto_factory = pq.CryptoFactory(kms_factory)
+    with pytest.raises(OSError, match=r"Either column_keys or 
uniform_encryption must be set"):
+        # Write with encryption properties
+        write_encrypted_parquet(path, data_table, encryption_config,
+                                kms_connection_config, crypto_factory)
+
+
+@pytest.mark.parquet
+def test_encrypted_parquet_write_kms_error(tempdir, data_table):
+    """Write an encrypted parquet, but raise KeyError in KmsClient."""
+    path = tempdir / 'encrypted_table_kms_error.in_mem.parquet'
+    encryption_config = BASIC_ENCRYPTION_CONFIG
+
+    # Empty master_keys_map
+    kms_connection_config = pq.KmsConnectionConfig()
+
+    def kms_factory(kms_connection_configuration):
+        # Empty master keys map will cause KeyError to be raised
+        # on wrap/unwrap calls
+        return InMemoryKmsClient(kms_connection_configuration)
+
+    crypto_factory = pq.CryptoFactory(kms_factory)
+    with pytest.raises(KeyError, match="footer_key"):
+        # Write with encryption properties
+        write_encrypted_parquet(path, data_table, encryption_config,
+                                kms_connection_config, crypto_factory)
+
+
+@pytest.mark.parquet
+def test_encrypted_parquet_write_kms_specific_error(tempdir, data_table):
+    """Write an encrypted parquet, but raise KeyError in KmsClient."""
+    path = tempdir / 'encrypted_table_kms_error.in_mem.parquet'
+    encryption_config = BASIC_ENCRYPTION_CONFIG
+
+    # Empty master_keys_map
+    kms_connection_config = pq.KmsConnectionConfig()
+
+    class ThrowingKmsClient(pq.KmsClient):
+        """A KmsClient implementation that throws exception in
+        wrap/unwrap calls
+        """
+
+        def __init__(self, config):
+            """Create an InMemoryKmsClient instance."""
+            pq.KmsClient.__init__(self)
+            self.config = config
+
+        def wrap_key(self, key_bytes, master_key_identifier):
+            raise ValueError("Cannot Wrap Key")
+
+        def unwrap_key(self, wrapped_key, master_key_identifier):
+            raise ValueError("Cannot Unwrap Key")
+
+    def kms_factory(kms_connection_configuration):
+        # Exception thrown in wrap/unwrap calls
+        return ThrowingKmsClient(kms_connection_configuration)
+
+    crypto_factory = pq.CryptoFactory(kms_factory)
+    with pytest.raises(ValueError, match="Cannot Wrap Key"):
+        # Write with encryption properties
+        write_encrypted_parquet(path, data_table, encryption_config,
+                                kms_connection_config, crypto_factory)
+
+
+@pytest.mark.parquet
+def test_encrypted_parquet_write_kms_factory_error(tempdir, data_table):
+    """Write an encrypted parquet, but raise ValueError in kms_factory."""
+    path = tempdir / 'encrypted_table_kms_factory_error.in_mem.parquet'
+    encryption_config = BASIC_ENCRYPTION_CONFIG
+
+    # Empty master_keys_map
+    kms_connection_config = pq.KmsConnectionConfig()
+
+    def kms_factory(kms_connection_configuration):
+        raise ValueError('Cannot create KmsClient')
+
+    crypto_factory = pq.CryptoFactory(kms_factory)
+    with pytest.raises(ValueError,
+                       match="Cannot create KmsClient"):
+        # Write with encryption properties
+        write_encrypted_parquet(path, data_table, encryption_config,
+                                kms_connection_config, crypto_factory)
+
+
+@pytest.mark.parquet
+def test_encrypted_parquet_write_kms_factory_type_error(tempdir, data_table):
+    """Write an encrypted parquet, but use wrong KMS client type
+    that doesn't implement KmsClient."""
+    path = tempdir / 'encrypted_table_kms_factory_error.in_mem.parquet'
+    encryption_config = BASIC_ENCRYPTION_CONFIG
+
+    # Empty master_keys_map
+    kms_connection_config = pq.KmsConnectionConfig()
+
+    class WrongTypeKmsClient():
+        """This is not an implementation of KmsClient.
+        """
+
+        def __init__(self, config):
+            self.master_keys_map = config.custom_kms_conf
+
+        def wrap_key(self, key_bytes, master_key_identifier):
+            return None
+
+        def unwrap_key(self, wrapped_key, master_key_identifier):
+            return None
+
+    def kms_factory(kms_connection_configuration):
+        return WrongTypeKmsClient(kms_connection_configuration)
+
+    crypto_factory = pq.CryptoFactory(kms_factory)
+    with pytest.raises(TypeError):
+        # Write with encryption properties
+        write_encrypted_parquet(path, data_table, encryption_config,
+                                kms_connection_config, crypto_factory)
+
+
+@pytest.mark.parquet
+def test_encrypted_parquet_encryption_configuration():
+    def validate_encryption_configuration(encryption_config):
+        assert(FOOTER_KEY_NAME == encryption_config.footer_key)
+        assert(["a", "b"] == encryption_config.column_keys[COL_KEY_NAME])
+        assert("AES_GCM_CTR_V1" == encryption_config.encryption_algorithm)
+        assert(encryption_config.plaintext_footer)
+        assert(not encryption_config.double_wrapping)
+        assert(timedelta(minutes=10.0) == encryption_config.cache_lifetime)
+        assert(not encryption_config.internal_key_material)
+        assert(192 == encryption_config.data_key_length_bits)
+
+    encryption_config = pq.EncryptionConfiguration(
+        footer_key=FOOTER_KEY_NAME,
+        column_keys={COL_KEY_NAME: ["a", "b"], },
+        encryption_algorithm="AES_GCM_CTR_V1",
+        plaintext_footer=True,
+        double_wrapping=False,
+        cache_lifetime=timedelta(minutes=10.0),
+        internal_key_material=False,
+        data_key_length_bits=192,
+    )
+    validate_encryption_configuration(encryption_config)
+
+    encryption_config_1 = pq.EncryptionConfiguration(
+        footer_key=FOOTER_KEY_NAME)
+    encryption_config_1.column_keys = {COL_KEY_NAME: ["a", "b"], }
+    encryption_config_1.encryption_algorithm = "AES_GCM_CTR_V1"
+    encryption_config_1.plaintext_footer = True
+    encryption_config_1.double_wrapping = False
+    encryption_config_1.cache_lifetime = timedelta(minutes=10.0)
+    encryption_config_1.internal_key_material = False
+    encryption_config_1.data_key_length_bits = 192
+    validate_encryption_configuration(encryption_config_1)
+
+
+@pytest.mark.parquet
+def test_encrypted_parquet_decryption_configuration():
+    decryption_config = pq.DecryptionConfiguration(
+        cache_lifetime=timedelta(minutes=10.0))
+    assert(timedelta(minutes=10.0) == decryption_config.cache_lifetime)
+
+    decryption_config_1 = pq.DecryptionConfiguration()
+    decryption_config_1.cache_lifetime = timedelta(minutes=10.0)
+    assert(timedelta(minutes=10.0) == decryption_config_1.cache_lifetime)
+
+
+@pytest.mark.parquet
+def test_encrypted_parquet_kms_configuration():
+    def validate_kms_connection_config(kms_connection_config):
+        assert("Instance1" == kms_connection_config.kms_instance_id)
+        assert("URL1" == kms_connection_config.kms_instance_url)
+        assert("MyToken" == kms_connection_config.key_access_token)
+        assert({"key1": "key_material_1", "key2": "key_material_2"} ==
+               kms_connection_config.custom_kms_conf)
+
+    kms_connection_config = pq.KmsConnectionConfig(
+        kms_instance_id="Instance1",
+        kms_instance_url="URL1",
+        key_access_token="MyToken",
+        custom_kms_conf={
+            "key1": "key_material_1",
+            "key2": "key_material_2",
+        })
+    validate_kms_connection_config(kms_connection_config)
+
+    kms_connection_config_1 = pq.KmsConnectionConfig()
+    kms_connection_config_1.kms_instance_id = "Instance1"
+    kms_connection_config_1.kms_instance_url = "URL1"
+    kms_connection_config_1.key_access_token = "MyToken"
+    kms_connection_config_1.custom_kms_conf = {
+        "key1": "key_material_1",
+        "key2": "key_material_2",
+    }
+    validate_kms_connection_config(kms_connection_config_1)
+
+
+@pytest.mark.parquet
+def test_encrypted_parquet_write_read_uniform_ctr(tempdir, data_table):
+    """Write an encrypted parquet, with uniform encryption
+    and GCM_CTR encryption algorithm,
+    verify it's encrypted, and then read it."""
+    path = tempdir / PARQUET_NAME
+
+    # Encrypt the file with the footer key
+    encryption_config = pq.EncryptionConfiguration(
+        footer_key=FOOTER_KEY_NAME,
+        column_keys={},
+        uniform_encryption=True,
+        encryption_algorithm="AES_GCM_CTR_V1")
+
+    kms_connection_config = pq.KmsConnectionConfig(
+        custom_kms_conf={FOOTER_KEY_NAME: FOOTER_KEY.decode("UTF-8")}
+    )
+
+    def kms_factory(kms_connection_configuration):
+        return InMemoryKmsClient(kms_connection_configuration)
+
+    crypto_factory = pq.CryptoFactory(kms_factory)
+    # Write with encryption properties
+    write_encrypted_parquet(path, data_table, encryption_config,
+                            kms_connection_config, crypto_factory)
+    verify_file_encrypted(path)
+
+    # Read with decryption properties
+    decryption_config = pq.DecryptionConfiguration()
+    result_table = read_encrypted_parquet(
+        path, decryption_config, kms_connection_config, crypto_factory)
+    assert data_table.equals(result_table)
+
+
+@pytest.mark.parquet
+@pytest.mark.xfail(reason="Plaintext footer - reading plaintext column subset"
+                   " reads encrypted columns too")
+def test_encrypted_parquet_write_read_plain_footer_single_wrapping(
+        tempdir, data_table):
+    """Write an encrypted parquet, with plaintext footer
+    and with single wrapping,
+    verify it's encrypted, and then read plaintext columns."""
+    path = tempdir / PARQUET_NAME
+
+    # Encrypt the footer with the footer key,
+    # encrypt column `a` and column `b` with another key,
+    # keep `c` plaintext
+    encryption_config = pq.EncryptionConfiguration(
+        footer_key=FOOTER_KEY_NAME,
+        column_keys={
+            COL_KEY_NAME: ["a", "b"],
+        },
+        plaintext_footer=True,
+        double_wrapping=False)
+
+    kms_connection_config = pq.KmsConnectionConfig(
+        custom_kms_conf={
+            FOOTER_KEY_NAME: FOOTER_KEY.decode("UTF-8"),
+            COL_KEY_NAME: COL_KEY.decode("UTF-8"),
+        }
+    )
+
+    def kms_factory(kms_connection_configuration):
+        return InMemoryKmsClient(kms_connection_configuration)
+
+    crypto_factory = pq.CryptoFactory(kms_factory)
+    # Write with encryption properties
+    write_encrypted_parquet(path, data_table, encryption_config,
+                            kms_connection_config, crypto_factory)
+
+    # # Read without decryption properties only the plaintext column
+    # result = pq.ParquetFile(path)
+    # result_table = result.read(columns='c', use_threads=False)
+    # assert table.num_rows == result_table.num_rows
+
+
+@pytest.mark.parquet
+@pytest.mark.xfail(reason="External key material not supported yet")
+def test_encrypted_parquet_write_external(tempdir, data_table):
+    """Write an encrypted parquet, with external key
+    material.
+    Currently it's not implemented, so should throw
+    an exception"""
+    path = tempdir / PARQUET_NAME
+
+    # Encrypt the file with the footer key
+    encryption_config = pq.EncryptionConfiguration(
+        footer_key=FOOTER_KEY_NAME,
+        column_keys={},
+        uniform_encryption=True,
+        internal_key_material=False)
+
+    kms_connection_config = pq.KmsConnectionConfig(
+        custom_kms_conf={FOOTER_KEY_NAME: FOOTER_KEY.decode("UTF-8")}
+    )
+
+    def kms_factory(kms_connection_configuration):
+        return InMemoryKmsClient(kms_connection_configuration)
+
+    crypto_factory = pq.CryptoFactory(kms_factory)
+    # Write with encryption properties
+    write_encrypted_parquet(path, data_table, encryption_config,
+                            kms_connection_config, crypto_factory)
+
+
+@pytest.mark.parquet
+@pytest.mark.skip(reason="Multithreaded read sometimes fails decryption"
+                  " finalization and sometimes with Segmentation fault")

Review comment:
       Is there already a JIRA for this?

##########
File path: python/examples/parquet_encryption/sample_vault_kms_client.py
##########
@@ -0,0 +1,160 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""A sample KmsClient implementation."""
+import argparse
+import base64
+import os
+
+import requests
+
+import pyarrow as pa
+try:
+    import pyarrow.parquet as pq
+except ImportError:
+    pq = None
+
+
+class VaultClient(pq.KmsClient):
+    """An example of a KmsClient implementation with master keys
+    managed by Hashicorp Vault KMS.

Review comment:
       Maybe add a link to https://www.vaultproject.io/api/secret/transit ?

##########
File path: docs/source/python/parquet.rst
##########
@@ -604,3 +604,171 @@ One example is Azure Blob storage, which can be 
interfaced through the
 
     abfs = AzureBlobFileSystem(account_name="XXXX", account_key="XXXX", 
container_name="XXXX")
     table = pq.read_table("file.parquet", filesystem=abfs)
+
+Parquet Modular Encryption (Columnar Encryption)
+------------------------------------------------
+
+Columnar encryption is supported for Parquet files starting from
+Apache Arrow 4.0.0.
+
+Parquet uses the envelope encryption practice, where file parts are encrypted
+with "data encryption keys" (DEKs), and the DEKs are encrypted with "master
+encryption keys" (MEKs). The DEKs are randomly generated by Parquet for each
+encrypted file/column. The MEKs are generated, stored and managed in a Key
+Management Service (KMS) of user’s choice.
+
+Reading and writing encrypted parquet files involves passing file encryption
+and decryption properties to :class:`~pyarrow.parquet.ParquetWriter` and to
+:class:`~.ParquetFile`, respectively.
+
+Writing an encrypted parquet:
+
+.. code-block:: python
+
+   encryption_properties = crypto_factory.file_encryption_properties(
+                                    kms_connection_config, encryption_config)
+   with pq.ParquetWriter(filename, schema,
+                        encryption_properties=encryption_properties) as writer:
+      writer.write_table(table)
+
+Reading an encrypted parquet:
+
+.. code-block:: python
+
+   decryption_properties = crypto_factory.file_decryption_properties(
+                                                    kms_connection_config)
+   parquet_file = pq.ParquetFile(filename,
+                              decryption_properties=decryption_properties)
+
+
+In order to create the encryption and decryption properties, a 
``CryptoFactory``
+should be created and initialized with KMS Client details, as described below.
+
+
+KMS Client
+~~~~~~~~~~
+
+The master encryption keys must be kept and managed in a production-grade KMS
+system, deployed in user's organization. Using Parquet encryption requires
+implementation of a client class for the KMS server.
+Any KmsClient implementation should implement the following informal interface:
+
+.. code-block:: python
+
+   class KmsClient:
+   def wrap_key(self, key_bytes, master_key_identifier):
+      """Wrap a key - encrypt it with the master key."""
+       raise NotImplementedError()
+
+   def unwrap_key(self, wrapped_key, master_key_identifier):
+      """Unwrap a key - decrypt it with the master key."""
+      raise NotImplementedError()
+
+An example KmsClient impelementation might look like the following:
+
+.. code-block:: python
+   class MyKmsClient(pq.KmsClient):
+      def __init__(self, kms_connection_configuration):
+         pq.KmsClient.__init__(self)
+         # Any KMS-specific initialization based on
+         # kms_connection_configuration comes here
+
+      def wrap_key(self, key_bytes, master_key_identifier):
+         wrapped_key = ... # call KMS to wrap key_bytes with key specified by
+                           # master_key_identifier
+         return wrapped_key
+
+      def unwrap_key(self, wrapped_key, master_key_identifier):
+         key_bytes = ... # call KMS to unwrap wrapped_key with key specified by
+                         # master_key_identifier
+         return key_bytes
+
+The concrete implementation will be loaded at runtime by a factory method
+provided by the user. This factory method will be used to initialize the
+``CryptoFactory`` for creating file encryption and decryption properties.
+For example, in order to use the ``MyKmsClient`` defined above:
+
+.. code-block:: python
+
+   def kms_client_factory(kms_connection_configuration):
+      return MyKmsClient(kms_connection_configuration)
+
+   crypto_factory = CryptoFactory(kms_client_factory)
+
+An `example <sample_vault_kms_client.py>`_ of such a class for an open source

Review comment:
       I don't think this will link correctly? Maybe you need to fill the full, 
actual github url (of where the file will eventually land)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to