alamb commented on code in PR #7111:
URL: https://github.com/apache/arrow-rs/pull/7111#discussion_r2019023428


##########
parquet/src/column/page_encryption.rs:
##########
@@ -0,0 +1,119 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::column::page::CompressedPage;
+use crate::encryption::ciphers::BlockEncryptor;
+use crate::encryption::encrypt::{encrypt_object, FileEncryptor};
+use crate::encryption::modules::{create_module_aad, ModuleType};
+use crate::errors::ParquetError;
+use crate::errors::Result;
+use crate::format::PageHeader;
+use crate::format::PageType;
+use bytes::Bytes;
+use std::io::Write;
+use std::sync::Arc;
+
+#[derive(Debug)]
+/// Encrypts page headers and page data for columns
+pub struct PageEncryptor {

Review Comment:
   I am pretty sure this is crate private -- could we mark it explicitly like 
that?
   
   ```suggestion
   pub (crate) struct PageEncryptor {
   ```



##########
parquet/src/encryption/decrypt.rs:
##########
@@ -324,6 +346,14 @@ impl DecryptionPropertiesBuilder {
             .insert(column_name.to_string(), decryption_key);
         self
     }
+
+    /// Specify multiple column decryption keys
+    pub fn with_column_keys(mut self, column_names: Vec<&str>, keys: 
Vec<Vec<u8>>) -> Self {
+        for (column_name, key) in 
column_names.into_iter().zip(keys.into_iter()) {

Review Comment:
   it might be worth checking that the column_names and keys have the same 
length (`zip` silently ignores stuff if one side has more than the other)



##########
parquet/src/file/writer.rs:
##########
@@ -523,12 +601,24 @@ impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> 
{
         ) -> Result<C>,
     {
         self.assert_previous_writer_closed()?;
+
+        let encryptor_context = self.get_page_encryptor_context();
+
         Ok(match self.next_column_desc() {
             Some(column) => {
                 let props = self.props.clone();
                 let (buf, on_close) = self.get_on_close();
-                let page_writer = Box::new(SerializedPageWriter::new(buf));
-                Some(factory(column, props, page_writer, Box::new(on_close))?)
+
+                let page_writer = SerializedPageWriter::new(buf);

Review Comment:
   thank you -- I think this looks very reasonable now



##########
parquet/src/arrow/arrow_writer/mod.rs:
##########
@@ -727,85 +819,173 @@ pub fn get_column_writers(
 ) -> Result<Vec<ArrowColumnWriter>> {
     let mut writers = Vec::with_capacity(arrow.fields.len());
     let mut leaves = parquet.columns().iter();
+    let column_factory = ArrowColumnWriterFactory::new();
     for field in &arrow.fields {
-        get_arrow_column_writer(field.data_type(), props, &mut leaves, &mut 
writers)?;
+        column_factory.get_arrow_column_writer(
+            field.data_type(),
+            props,
+            &mut leaves,
+            &mut writers,
+        )?;
     }
     Ok(writers)
 }
 
-/// Gets the [`ArrowColumnWriter`] for the given `data_type`
-fn get_arrow_column_writer(
-    data_type: &ArrowDataType,
+/// Returns the [`ArrowColumnWriter`] for a given schema and supports columnar 
encryption
+#[cfg(feature = "encryption")]
+fn get_column_writers_with_encryptor(

Review Comment:
   I think we should file a follow on ticket for a feature request of "Support 
writing encrypted files with multiple threads" or something like that 
   
   I agree it is ok as a follow on



##########
parquet/src/encryption/decrypt.rs:
##########
@@ -324,6 +346,14 @@ impl DecryptionPropertiesBuilder {
             .insert(column_name.to_string(), decryption_key);
         self
     }
+
+    /// Specify multiple column decryption keys
+    pub fn with_column_keys(mut self, column_names: Vec<&str>, keys: 
Vec<Vec<u8>>) -> Self {
+        for (column_name, key) in 
column_names.into_iter().zip(keys.into_iter()) {

Review Comment:
   ```suggestion
           if column_names.len() != keys.len() { 
              ... error
           } 
           for (column_name, key) in 
column_names.into_iter().zip(keys.into_iter()) {
   ```



##########
parquet/src/encryption/encrypt.rs:
##########
@@ -0,0 +1,368 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Configuration and utilities for Parquet Modular Encryption
+
+use crate::encryption::ciphers::{BlockEncryptor, RingGcmBlockEncryptor};
+use crate::errors::{ParquetError, Result};
+use crate::file::column_crypto_metadata::{ColumnCryptoMetaData, 
EncryptionWithColumnKey};
+use crate::schema::types::{ColumnDescPtr, SchemaDescriptor};
+use crate::thrift::TSerializable;
+use ring::rand::{SecureRandom, SystemRandom};
+use std::collections::{HashMap, HashSet};
+use std::io::Write;
+use thrift::protocol::TCompactOutputProtocol;
+
+#[derive(Debug, Clone, PartialEq)]
+struct EncryptionKey {
+    key: Vec<u8>,
+    key_metadata: Option<Vec<u8>>,
+}
+
+impl EncryptionKey {
+    fn new(key: Vec<u8>) -> EncryptionKey {
+        Self {
+            key,
+            key_metadata: None,
+        }
+    }
+
+    fn with_metadata(mut self, metadata: Vec<u8>) -> Self {
+        self.key_metadata = Some(metadata);
+        self
+    }
+
+    fn key(&self) -> &Vec<u8> {
+        &self.key
+    }
+}
+
+#[derive(Debug, Clone, PartialEq)]
+/// Defines how data in a Parquet file should be encrypted
+pub struct FileEncryptionProperties {
+    encrypt_footer: bool,
+    footer_key: EncryptionKey,
+    column_keys: HashMap<String, EncryptionKey>,
+    aad_prefix: Option<Vec<u8>>,
+    store_aad_prefix: bool,
+}
+
+impl FileEncryptionProperties {
+    /// Create a new builder for encryption properties with the given footer 
encryption key
+    pub fn builder(footer_key: Vec<u8>) -> EncryptionPropertiesBuilder {
+        EncryptionPropertiesBuilder::new(footer_key)
+    }
+
+    /// Should the footer be encrypted
+    pub fn encrypt_footer(&self) -> bool {
+        self.encrypt_footer
+    }
+
+    /// Retrieval metadata of key used for encryption of footer and (possibly) 
columns
+    pub fn footer_key_metadata(&self) -> Option<&Vec<u8>> {
+        self.footer_key.key_metadata.as_ref()
+    }
+
+    /// Retrieval of key used for encryption of footer and (possibly) columns
+    pub fn footer_key(&self) -> &Vec<u8> {
+        &self.footer_key.key
+    }
+
+    /// Get the column names, keys, and metadata for columns to be encrypted
+    pub fn column_keys(&self) -> (Vec<String>, Vec<Vec<u8>>, Vec<Vec<u8>>) {
+        let mut column_names: Vec<String> = 
Vec::with_capacity(self.column_keys.len());
+        let mut keys: Vec<Vec<u8>> = 
Vec::with_capacity(self.column_keys.len());
+        let mut meta: Vec<Vec<u8>> = 
Vec::with_capacity(self.column_keys.len());
+        for (key, value) in self.column_keys.iter() {
+            column_names.push(key.clone());
+            keys.push(value.key.clone());
+            if let Some(metadata) = value.key_metadata.as_ref() {
+                meta.push(metadata.clone());
+            }
+        }
+        (column_names, keys, meta)
+    }
+
+    /// AAD prefix string uniquely identifies the file and prevents file 
swapping
+    pub fn aad_prefix(&self) -> Option<&Vec<u8>> {
+        self.aad_prefix.as_ref()
+    }
+
+    /// Should the AAD prefix be stored in the file
+    pub fn store_aad_prefix(&self) -> bool {
+        self.store_aad_prefix && self.aad_prefix.is_some()
+    }
+
+    /// Checks if columns that are to be encrypted are present in schema
+    pub(crate) fn validate_encrypted_column_names(
+        &self,
+        schema: &SchemaDescriptor,
+    ) -> std::result::Result<(), ParquetError> {
+        let column_paths = schema
+            .columns()
+            .iter()
+            .map(|c| c.path().string())
+            .collect::<HashSet<_>>();
+        let encryption_columns = self
+            .column_keys
+            .keys()
+            .cloned()
+            .collect::<HashSet<String>>();
+        if !encryption_columns.is_subset(&column_paths) {
+            let mut columns_missing_in_schema = encryption_columns
+                .difference(&column_paths)
+                .cloned()
+                .collect::<Vec<String>>();
+            columns_missing_in_schema.sort();
+            return Err(ParquetError::General(
+                format!(
+                    "The following columns with encryption keys specified were 
not found in the schema: {}",
+                    columns_missing_in_schema.join(", ")
+                )
+                .to_string(),
+            ));
+        }
+        Ok(())
+    }
+}
+
+/// Builder for [`FileEncryptionProperties`]
+pub struct EncryptionPropertiesBuilder {
+    encrypt_footer: bool,
+    footer_key: EncryptionKey,
+    column_keys: HashMap<String, EncryptionKey>,
+    aad_prefix: Option<Vec<u8>>,
+    store_aad_prefix: bool,
+}
+
+impl EncryptionPropertiesBuilder {
+    /// Create a new [`EncryptionPropertiesBuilder`] with the given footer 
encryption key
+    pub fn new(footer_key: Vec<u8>) -> EncryptionPropertiesBuilder {
+        Self {
+            footer_key: EncryptionKey::new(footer_key),
+            column_keys: HashMap::default(),
+            aad_prefix: None,
+            encrypt_footer: true,
+            store_aad_prefix: false,
+        }
+    }
+
+    /// Set if the footer should be stored in plaintext (not encrypted). 
Defaults to false.
+    pub fn with_plaintext_footer(mut self, plaintext_footer: bool) -> Self {
+        self.encrypt_footer = !plaintext_footer;
+        self
+    }
+
+    /// Set retrieval metadata of key used for encryption of footer and 
(possibly) columns
+    pub fn with_footer_key_metadata(mut self, metadata: Vec<u8>) -> Self {
+        self.footer_key = self.footer_key.with_metadata(metadata);
+        self
+    }
+
+    /// Set the key used for encryption of a column. Note that if no column 
keys are configured then
+    /// all columns will be encrypted with the footer key.
+    /// If any column keys are configured then only the columns with a key 
will be encrypted.
+    pub fn with_column_key(mut self, column_name: &str, key: Vec<u8>) -> Self {
+        self.column_keys
+            .insert(column_name.to_string(), EncryptionKey::new(key));
+        self
+    }
+
+    /// Set the key used for encryption of a column and its metadata. The 
Key's metadata field is to

Review Comment:
   💯 for the comments



##########
parquet/tests/encryption/encryption_async.rs:
##########
@@ -367,3 +399,39 @@ async fn verify_encryption_test_file_read_async(
     verify_encryption_test_data(record_batches, metadata);
     Ok(())
 }
+
+async fn read_and_roundtrip_to_encrypted_file_async(

Review Comment:
   💯 



##########
parquet/src/encryption/ciphers.rs:
##########
@@ -61,3 +64,121 @@ impl BlockDecryptor for RingGcmBlockDecryptor {
         Ok(result)
     }
 }
+
+pub trait BlockEncryptor: Debug + Send + Sync {

Review Comment:
   likewise I don't think this is public so it would be great to make that 
explicit
   
   ```suggestion
   pub (crate) trait BlockEncryptor: Debug + Send + Sync {
   ```



##########
parquet/tests/encryption/encryption.rs:
##########
@@ -0,0 +1,709 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This module contains tests for reading encrypted Parquet files with the 
Arrow API
+
+use crate::encryption_util::{verify_encryption_test_data, TestKeyRetriever};
+use arrow::array::*;
+use arrow::error::Result as ArrowResult;
+use arrow_array::{Int32Array, RecordBatch};
+use arrow_schema::{DataType as ArrowDataType, DataType, Field, Schema};
+use parquet::arrow::arrow_reader::{
+    ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReaderBuilder,
+};
+use parquet::arrow::ArrowWriter;
+use parquet::data_type::{ByteArray, ByteArrayType};
+use parquet::encryption::decrypt::FileDecryptionProperties;
+use parquet::encryption::encrypt::FileEncryptionProperties;
+use parquet::file::metadata::ParquetMetaData;
+use parquet::file::properties::WriterProperties;
+use parquet::file::writer::SerializedFileWriter;
+use parquet::schema::parser::parse_message_type;
+use std::fs::File;
+use std::sync::Arc;
+
+#[test]
+fn test_non_uniform_encryption_plaintext_footer() {
+    let test_data = arrow::util::test_util::parquet_test_data();
+    let path = 
format!("{test_data}/encrypt_columns_plaintext_footer.parquet.encrypted");
+    let file = File::open(path).unwrap();
+
+    // There is always a footer key even with a plaintext footer,
+    // but this is used for signing the footer.
+    let footer_key = "0123456789012345".as_bytes(); // 128bit/16
+    let column_1_key = "1234567890123450".as_bytes();
+    let column_2_key = "1234567890123451".as_bytes();
+
+    let decryption_properties = 
FileDecryptionProperties::builder(footer_key.to_vec())
+        .with_column_key("double_field", column_1_key.to_vec())
+        .with_column_key("float_field", column_2_key.to_vec())
+        .build()
+        .unwrap();
+
+    verify_encryption_test_file_read(file, decryption_properties);
+}
+
+#[test]
+fn test_non_uniform_encryption_disabled_aad_storage() {
+    let test_data = arrow::util::test_util::parquet_test_data();
+    let path =
+        
format!("{test_data}/encrypt_columns_and_footer_disable_aad_storage.parquet.encrypted");
+    let file = File::open(path.clone()).unwrap();
+
+    let footer_key = b"0123456789012345".to_vec(); // 128bit/16
+    let column_1_key = b"1234567890123450".to_vec();
+    let column_2_key = b"1234567890123451".to_vec();
+
+    // Can read successfully when providing the correct AAD prefix
+    let decryption_properties = 
FileDecryptionProperties::builder(footer_key.clone())
+        .with_column_key("double_field", column_1_key.clone())
+        .with_column_key("float_field", column_2_key.clone())
+        .with_aad_prefix(b"tester".to_vec())
+        .build()
+        .unwrap();
+
+    verify_encryption_test_file_read(file, decryption_properties);
+
+    // Using wrong AAD prefix should fail
+    let decryption_properties = 
FileDecryptionProperties::builder(footer_key.clone())
+        .with_column_key("double_field", column_1_key.clone())
+        .with_column_key("float_field", column_2_key.clone())
+        .with_aad_prefix(b"wrong_aad_prefix".to_vec())
+        .build()
+        .unwrap();
+
+    let file = File::open(path.clone()).unwrap();
+    let options = ArrowReaderOptions::default()
+        .with_file_decryption_properties(decryption_properties.clone());
+    let result = ArrowReaderMetadata::load(&file, options.clone());
+    assert!(result.is_err());
+    assert_eq!(
+        result.unwrap_err().to_string(),
+        "Parquet error: Provided footer key and AAD were unable to decrypt 
parquet footer"
+    );
+
+    // Not providing any AAD prefix should fail as it isn't stored in the file
+    let decryption_properties = FileDecryptionProperties::builder(footer_key)
+        .with_column_key("double_field", column_1_key)
+        .with_column_key("float_field", column_2_key)
+        .build()
+        .unwrap();
+
+    let file = File::open(path).unwrap();
+    let options = ArrowReaderOptions::default()
+        .with_file_decryption_properties(decryption_properties.clone());
+    let result = ArrowReaderMetadata::load(&file, options.clone());
+    assert!(result.is_err());
+    assert_eq!(
+        result.unwrap_err().to_string(),
+        "Parquet error: Parquet file was encrypted with AAD prefix that is not 
stored in the file"
+    );
+}
+
+#[test]
+#[cfg(feature = "snap")]
+fn test_plaintext_footer_read_without_decryption() {
+    
crate::encryption_agnostic::read_plaintext_footer_file_without_decryption_properties();
+}
+
+#[test]
+fn test_non_uniform_encryption() {
+    let test_data = arrow::util::test_util::parquet_test_data();
+    let path = 
format!("{test_data}/encrypt_columns_and_footer.parquet.encrypted");
+    let file = File::open(path).unwrap();
+
+    let footer_key = b"0123456789012345".to_vec(); // 128bit/16
+    let column_1_key = b"1234567890123450".to_vec();
+    let column_2_key = b"1234567890123451".to_vec();
+
+    let decryption_properties = FileDecryptionProperties::builder(footer_key)
+        .with_column_key("double_field", column_1_key)
+        .with_column_key("float_field", column_2_key)
+        .build()
+        .unwrap();
+
+    verify_encryption_test_file_read(file, decryption_properties);
+}
+
+#[test]
+fn test_uniform_encryption() {
+    let test_data = arrow::util::test_util::parquet_test_data();
+    let path = format!("{test_data}/uniform_encryption.parquet.encrypted");
+    let file = File::open(path).unwrap();
+
+    let key_code = b"0123456789012345".to_vec();
+    let decryption_properties = 
FileDecryptionProperties::builder(key_code).build().unwrap();
+
+    verify_encryption_test_file_read(file, decryption_properties);
+}
+
+#[test]
+fn test_decrypting_without_decryption_properties_fails() {
+    let test_data = arrow::util::test_util::parquet_test_data();
+    let path = format!("{test_data}/uniform_encryption.parquet.encrypted");
+    let file = File::open(path).unwrap();
+
+    let options = ArrowReaderOptions::default();
+    let result = ArrowReaderMetadata::load(&file, options.clone());
+    assert!(result.is_err());
+    assert_eq!(
+        result.unwrap_err().to_string(),
+        "Parquet error: Parquet file has an encrypted footer but no decryption 
properties were provided"
+    );
+}
+
+#[test]
+fn test_aes_ctr_encryption() {
+    let test_data = arrow::util::test_util::parquet_test_data();
+    let path = 
format!("{test_data}/encrypt_columns_and_footer_ctr.parquet.encrypted");
+    let file = File::open(path).unwrap();
+
+    let footer_key = b"0123456789012345".to_vec();
+    let column_1_key = b"1234567890123450".to_vec();
+    let column_2_key = b"1234567890123451".to_vec();
+
+    let decryption_properties = FileDecryptionProperties::builder(footer_key)
+        .with_column_key("double_field", column_1_key)
+        .with_column_key("float_field", column_2_key)
+        .build()
+        .unwrap();
+
+    let options =
+        
ArrowReaderOptions::default().with_file_decryption_properties(decryption_properties);
+    let metadata = ArrowReaderMetadata::load(&file, options);
+
+    match metadata {
+        Err(parquet::errors::ParquetError::NYI(s)) => {
+            assert!(s.contains("AES_GCM_CTR_V1"));
+        }
+        _ => {
+            panic!("Expected ParquetError::NYI");
+        }
+    };
+}
+
+#[test]
+fn test_non_uniform_encryption_plaintext_footer_with_key_retriever() {
+    let test_data = arrow::util::test_util::parquet_test_data();
+    let path = 
format!("{test_data}/encrypt_columns_plaintext_footer.parquet.encrypted");
+    let file = File::open(path).unwrap();
+
+    let key_retriever = TestKeyRetriever::new()
+        .with_key("kf".to_owned(), "0123456789012345".as_bytes().to_vec())
+        .with_key("kc1".to_owned(), "1234567890123450".as_bytes().to_vec())
+        .with_key("kc2".to_owned(), "1234567890123451".as_bytes().to_vec());
+
+    let decryption_properties =
+        FileDecryptionProperties::with_key_retriever(Arc::new(key_retriever))
+            .build()
+            .unwrap();
+
+    verify_encryption_test_file_read(file, decryption_properties);
+}
+
+#[test]
+fn test_non_uniform_encryption_with_key_retriever() {
+    let test_data = arrow::util::test_util::parquet_test_data();
+    let path = 
format!("{test_data}/encrypt_columns_and_footer.parquet.encrypted");
+    let file = File::open(path).unwrap();
+
+    let key_retriever = TestKeyRetriever::new()
+        .with_key("kf".to_owned(), "0123456789012345".as_bytes().to_vec())
+        .with_key("kc1".to_owned(), "1234567890123450".as_bytes().to_vec())
+        .with_key("kc2".to_owned(), "1234567890123451".as_bytes().to_vec());
+
+    let decryption_properties =
+        FileDecryptionProperties::with_key_retriever(Arc::new(key_retriever))
+            .build()
+            .unwrap();
+
+    verify_encryption_test_file_read(file, decryption_properties);
+}
+
+#[test]
+fn test_uniform_encryption_with_key_retriever() {
+    let test_data = arrow::util::test_util::parquet_test_data();
+    let path = format!("{test_data}/uniform_encryption.parquet.encrypted");
+    let file = File::open(path).unwrap();
+
+    let key_retriever =
+        TestKeyRetriever::new().with_key("kf".to_owned(), 
"0123456789012345".as_bytes().to_vec());
+
+    let decryption_properties =
+        FileDecryptionProperties::with_key_retriever(Arc::new(key_retriever))
+            .build()
+            .unwrap();
+
+    verify_encryption_test_file_read(file, decryption_properties);
+}
+
+fn verify_encryption_test_file_read(file: File, decryption_properties: 
FileDecryptionProperties) {
+    let options =
+        
ArrowReaderOptions::default().with_file_decryption_properties(decryption_properties);
+    let reader_metadata = ArrowReaderMetadata::load(&file, 
options.clone()).unwrap();
+    let metadata = reader_metadata.metadata();
+
+    let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(file, 
options).unwrap();
+    let record_reader = builder.build().unwrap();
+    let record_batches = record_reader
+        .map(|x| x.unwrap())
+        .collect::<Vec<RecordBatch>>();
+
+    verify_encryption_test_data(record_batches, metadata);
+}
+
+fn row_group_sizes(metadata: &ParquetMetaData) -> Vec<i64> {
+    metadata.row_groups().iter().map(|x| x.num_rows()).collect()
+}
+
+#[test]
+fn test_uniform_encryption_roundtrip() {
+    let x0_arrays = [
+        Int32Array::from((0..100).collect::<Vec<_>>()),
+        Int32Array::from((100..150).collect::<Vec<_>>()),
+    ];
+    let x1_arrays = [
+        Int32Array::from((100..200).collect::<Vec<_>>()),
+        Int32Array::from((200..250).collect::<Vec<_>>()),
+    ];
+
+    let schema = Arc::new(Schema::new(vec![
+        Field::new("x0", ArrowDataType::Int32, false),
+        Field::new("x1", ArrowDataType::Int32, false),
+    ]));
+
+    let file = tempfile::tempfile().unwrap();
+
+    let footer_key = b"0123456789012345";
+    let file_encryption_properties = 
FileEncryptionProperties::builder(footer_key.to_vec())
+        .build()
+        .unwrap();
+
+    let props = WriterProperties::builder()
+        // Ensure multiple row groups
+        .set_max_row_group_size(50)
+        // Ensure multiple pages per row group
+        .set_write_batch_size(20)
+        .set_data_page_row_count_limit(20)
+        .with_file_encryption_properties(file_encryption_properties)
+        .build();
+
+    let mut writer =
+        ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(), 
Some(props)).unwrap();
+
+    for (x0, x1) in x0_arrays.into_iter().zip(x1_arrays.into_iter()) {
+        let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(x0), 
Arc::new(x1)]).unwrap();
+        writer.write(&batch).unwrap();
+    }
+
+    writer.close().unwrap();
+
+    let decryption_properties = 
FileDecryptionProperties::builder(footer_key.to_vec())
+        .build()
+        .unwrap();
+
+    let options = 
ArrowReaderOptions::new().with_file_decryption_properties(decryption_properties);
+
+    let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(file, 
options).unwrap();
+    assert_eq!(&row_group_sizes(builder.metadata()), &[50, 50, 50]);
+
+    let batches = builder
+        .with_batch_size(100)
+        .build()
+        .unwrap()
+        .collect::<ArrowResult<Vec<_>>>()
+        .unwrap();
+
+    assert_eq!(batches.len(), 2);
+    assert!(batches.iter().all(|x| x.num_columns() == 2));
+
+    let batch_sizes: Vec<_> = batches.iter().map(|x| x.num_rows()).collect();
+
+    assert_eq!(&batch_sizes, &[100, 50]);
+
+    let x0_values: Vec<_> = batches
+        .iter()
+        .flat_map(|x| {
+            x.column(0)
+                .as_any()
+                .downcast_ref::<Int32Array>()
+                .unwrap()
+                .values()
+                .iter()
+                .cloned()
+        })
+        .collect();
+
+    let x1_values: Vec<_> = batches
+        .iter()
+        .flat_map(|x| {
+            x.column(1)
+                .as_any()
+                .downcast_ref::<Int32Array>()
+                .unwrap()
+                .values()
+                .iter()
+                .cloned()
+        })
+        .collect();
+
+    let expected_x0_values: Vec<_> = [0..100, 
100..150].into_iter().flatten().collect();
+    assert_eq!(&x0_values, &expected_x0_values);
+
+    let expected_x1_values: Vec<_> = [100..200, 
200..250].into_iter().flatten().collect();
+    assert_eq!(&x1_values, &expected_x1_values);
+}
+
+#[test]
+fn test_write_non_uniform_encryption() {
+    let testdata = arrow::util::test_util::parquet_test_data();
+    let path = 
format!("{testdata}/encrypt_columns_and_footer.parquet.encrypted");
+
+    let footer_key = b"0123456789012345".to_vec(); // 128bit/16
+    let column_names = vec!["double_field", "float_field"];
+    let column_keys = vec![b"1234567890123450".to_vec(), 
b"1234567890123451".to_vec()];
+
+    let decryption_properties = 
FileDecryptionProperties::builder(footer_key.clone())
+        .with_column_keys(column_names.clone(), column_keys.clone())
+        .build()
+        .unwrap();
+
+    let file_encryption_properties = 
FileEncryptionProperties::builder(footer_key)
+        .with_column_keys(column_names, column_keys)
+        .build()
+        .unwrap();
+
+    read_and_roundtrip_to_encrypted_file(&path, decryption_properties, 
file_encryption_properties);
+}
+
+// todo: currently we raise if writing with plaintext footer, but we should 
support it

Review Comment:
   maybe it is worth a ticket to track this (and add a reference to this 
comment)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to