adamreeve commented on code in PR #7111:
URL: https://github.com/apache/arrow-rs/pull/7111#discussion_r2020279917


##########
parquet/tests/encryption/encryption.rs:
##########
@@ -0,0 +1,709 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This module contains tests for reading encrypted Parquet files with the 
Arrow API
+
+use crate::encryption_util::{verify_encryption_test_data, TestKeyRetriever};
+use arrow::array::*;
+use arrow::error::Result as ArrowResult;
+use arrow_array::{Int32Array, RecordBatch};
+use arrow_schema::{DataType as ArrowDataType, DataType, Field, Schema};
+use parquet::arrow::arrow_reader::{
+    ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReaderBuilder,
+};
+use parquet::arrow::ArrowWriter;
+use parquet::data_type::{ByteArray, ByteArrayType};
+use parquet::encryption::decrypt::FileDecryptionProperties;
+use parquet::encryption::encrypt::FileEncryptionProperties;
+use parquet::file::metadata::ParquetMetaData;
+use parquet::file::properties::WriterProperties;
+use parquet::file::writer::SerializedFileWriter;
+use parquet::schema::parser::parse_message_type;
+use std::fs::File;
+use std::sync::Arc;
+
+#[test]
+fn test_non_uniform_encryption_plaintext_footer() {
+    let test_data = arrow::util::test_util::parquet_test_data();
+    let path = 
format!("{test_data}/encrypt_columns_plaintext_footer.parquet.encrypted");
+    let file = File::open(path).unwrap();
+
+    // There is always a footer key even with a plaintext footer,
+    // but this is used for signing the footer.
+    let footer_key = "0123456789012345".as_bytes(); // 128bit/16
+    let column_1_key = "1234567890123450".as_bytes();
+    let column_2_key = "1234567890123451".as_bytes();
+
+    let decryption_properties = 
FileDecryptionProperties::builder(footer_key.to_vec())
+        .with_column_key("double_field", column_1_key.to_vec())
+        .with_column_key("float_field", column_2_key.to_vec())
+        .build()
+        .unwrap();
+
+    verify_encryption_test_file_read(file, decryption_properties);
+}
+
+#[test]
+fn test_non_uniform_encryption_disabled_aad_storage() {
+    let test_data = arrow::util::test_util::parquet_test_data();
+    let path =
+        
format!("{test_data}/encrypt_columns_and_footer_disable_aad_storage.parquet.encrypted");
+    let file = File::open(path.clone()).unwrap();
+
+    let footer_key = b"0123456789012345".to_vec(); // 128bit/16
+    let column_1_key = b"1234567890123450".to_vec();
+    let column_2_key = b"1234567890123451".to_vec();
+
+    // Can read successfully when providing the correct AAD prefix
+    let decryption_properties = 
FileDecryptionProperties::builder(footer_key.clone())
+        .with_column_key("double_field", column_1_key.clone())
+        .with_column_key("float_field", column_2_key.clone())
+        .with_aad_prefix(b"tester".to_vec())
+        .build()
+        .unwrap();
+
+    verify_encryption_test_file_read(file, decryption_properties);
+
+    // Using wrong AAD prefix should fail
+    let decryption_properties = 
FileDecryptionProperties::builder(footer_key.clone())
+        .with_column_key("double_field", column_1_key.clone())
+        .with_column_key("float_field", column_2_key.clone())
+        .with_aad_prefix(b"wrong_aad_prefix".to_vec())
+        .build()
+        .unwrap();
+
+    let file = File::open(path.clone()).unwrap();
+    let options = ArrowReaderOptions::default()
+        .with_file_decryption_properties(decryption_properties.clone());
+    let result = ArrowReaderMetadata::load(&file, options.clone());
+    assert!(result.is_err());
+    assert_eq!(
+        result.unwrap_err().to_string(),
+        "Parquet error: Provided footer key and AAD were unable to decrypt 
parquet footer"
+    );
+
+    // Not providing any AAD prefix should fail as it isn't stored in the file
+    let decryption_properties = FileDecryptionProperties::builder(footer_key)
+        .with_column_key("double_field", column_1_key)
+        .with_column_key("float_field", column_2_key)
+        .build()
+        .unwrap();
+
+    let file = File::open(path).unwrap();
+    let options = ArrowReaderOptions::default()
+        .with_file_decryption_properties(decryption_properties.clone());
+    let result = ArrowReaderMetadata::load(&file, options.clone());
+    assert!(result.is_err());
+    assert_eq!(
+        result.unwrap_err().to_string(),
+        "Parquet error: Parquet file was encrypted with AAD prefix that is not 
stored in the file"
+    );
+}
+
+#[test]
+#[cfg(feature = "snap")]
+fn test_plaintext_footer_read_without_decryption() {
+    
crate::encryption_agnostic::read_plaintext_footer_file_without_decryption_properties();
+}
+
+#[test]
+fn test_non_uniform_encryption() {
+    let test_data = arrow::util::test_util::parquet_test_data();
+    let path = 
format!("{test_data}/encrypt_columns_and_footer.parquet.encrypted");
+    let file = File::open(path).unwrap();
+
+    let footer_key = b"0123456789012345".to_vec(); // 128bit/16
+    let column_1_key = b"1234567890123450".to_vec();
+    let column_2_key = b"1234567890123451".to_vec();
+
+    let decryption_properties = FileDecryptionProperties::builder(footer_key)
+        .with_column_key("double_field", column_1_key)
+        .with_column_key("float_field", column_2_key)
+        .build()
+        .unwrap();
+
+    verify_encryption_test_file_read(file, decryption_properties);
+}
+
+#[test]
+fn test_uniform_encryption() {
+    let test_data = arrow::util::test_util::parquet_test_data();
+    let path = format!("{test_data}/uniform_encryption.parquet.encrypted");
+    let file = File::open(path).unwrap();
+
+    let key_code = b"0123456789012345".to_vec();
+    let decryption_properties = 
FileDecryptionProperties::builder(key_code).build().unwrap();
+
+    verify_encryption_test_file_read(file, decryption_properties);
+}
+
+#[test]
+fn test_decrypting_without_decryption_properties_fails() {
+    let test_data = arrow::util::test_util::parquet_test_data();
+    let path = format!("{test_data}/uniform_encryption.parquet.encrypted");
+    let file = File::open(path).unwrap();
+
+    let options = ArrowReaderOptions::default();
+    let result = ArrowReaderMetadata::load(&file, options.clone());
+    assert!(result.is_err());
+    assert_eq!(
+        result.unwrap_err().to_string(),
+        "Parquet error: Parquet file has an encrypted footer but no decryption 
properties were provided"
+    );
+}
+
+#[test]
+fn test_aes_ctr_encryption() {
+    let test_data = arrow::util::test_util::parquet_test_data();
+    let path = 
format!("{test_data}/encrypt_columns_and_footer_ctr.parquet.encrypted");
+    let file = File::open(path).unwrap();
+
+    let footer_key = b"0123456789012345".to_vec();
+    let column_1_key = b"1234567890123450".to_vec();
+    let column_2_key = b"1234567890123451".to_vec();
+
+    let decryption_properties = FileDecryptionProperties::builder(footer_key)
+        .with_column_key("double_field", column_1_key)
+        .with_column_key("float_field", column_2_key)
+        .build()
+        .unwrap();
+
+    let options =
+        
ArrowReaderOptions::default().with_file_decryption_properties(decryption_properties);
+    let metadata = ArrowReaderMetadata::load(&file, options);
+
+    match metadata {
+        Err(parquet::errors::ParquetError::NYI(s)) => {
+            assert!(s.contains("AES_GCM_CTR_V1"));
+        }
+        _ => {
+            panic!("Expected ParquetError::NYI");
+        }
+    };
+}
+
+#[test]
+fn test_non_uniform_encryption_plaintext_footer_with_key_retriever() {
+    let test_data = arrow::util::test_util::parquet_test_data();
+    let path = 
format!("{test_data}/encrypt_columns_plaintext_footer.parquet.encrypted");
+    let file = File::open(path).unwrap();
+
+    let key_retriever = TestKeyRetriever::new()
+        .with_key("kf".to_owned(), "0123456789012345".as_bytes().to_vec())
+        .with_key("kc1".to_owned(), "1234567890123450".as_bytes().to_vec())
+        .with_key("kc2".to_owned(), "1234567890123451".as_bytes().to_vec());
+
+    let decryption_properties =
+        FileDecryptionProperties::with_key_retriever(Arc::new(key_retriever))
+            .build()
+            .unwrap();
+
+    verify_encryption_test_file_read(file, decryption_properties);
+}
+
+#[test]
+fn test_non_uniform_encryption_with_key_retriever() {
+    let test_data = arrow::util::test_util::parquet_test_data();
+    let path = 
format!("{test_data}/encrypt_columns_and_footer.parquet.encrypted");
+    let file = File::open(path).unwrap();
+
+    let key_retriever = TestKeyRetriever::new()
+        .with_key("kf".to_owned(), "0123456789012345".as_bytes().to_vec())
+        .with_key("kc1".to_owned(), "1234567890123450".as_bytes().to_vec())
+        .with_key("kc2".to_owned(), "1234567890123451".as_bytes().to_vec());
+
+    let decryption_properties =
+        FileDecryptionProperties::with_key_retriever(Arc::new(key_retriever))
+            .build()
+            .unwrap();
+
+    verify_encryption_test_file_read(file, decryption_properties);
+}
+
+#[test]
+fn test_uniform_encryption_with_key_retriever() {
+    let test_data = arrow::util::test_util::parquet_test_data();
+    let path = format!("{test_data}/uniform_encryption.parquet.encrypted");
+    let file = File::open(path).unwrap();
+
+    let key_retriever =
+        TestKeyRetriever::new().with_key("kf".to_owned(), 
"0123456789012345".as_bytes().to_vec());
+
+    let decryption_properties =
+        FileDecryptionProperties::with_key_retriever(Arc::new(key_retriever))
+            .build()
+            .unwrap();
+
+    verify_encryption_test_file_read(file, decryption_properties);
+}
+
+fn verify_encryption_test_file_read(file: File, decryption_properties: 
FileDecryptionProperties) {
+    let options =
+        
ArrowReaderOptions::default().with_file_decryption_properties(decryption_properties);
+    let reader_metadata = ArrowReaderMetadata::load(&file, 
options.clone()).unwrap();
+    let metadata = reader_metadata.metadata();
+
+    let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(file, 
options).unwrap();
+    let record_reader = builder.build().unwrap();
+    let record_batches = record_reader
+        .map(|x| x.unwrap())
+        .collect::<Vec<RecordBatch>>();
+
+    verify_encryption_test_data(record_batches, metadata);
+}
+
+fn row_group_sizes(metadata: &ParquetMetaData) -> Vec<i64> {
+    metadata.row_groups().iter().map(|x| x.num_rows()).collect()
+}
+
+#[test]
+fn test_uniform_encryption_roundtrip() {
+    let x0_arrays = [
+        Int32Array::from((0..100).collect::<Vec<_>>()),
+        Int32Array::from((100..150).collect::<Vec<_>>()),
+    ];
+    let x1_arrays = [
+        Int32Array::from((100..200).collect::<Vec<_>>()),
+        Int32Array::from((200..250).collect::<Vec<_>>()),
+    ];
+
+    let schema = Arc::new(Schema::new(vec![
+        Field::new("x0", ArrowDataType::Int32, false),
+        Field::new("x1", ArrowDataType::Int32, false),
+    ]));
+
+    let file = tempfile::tempfile().unwrap();
+
+    let footer_key = b"0123456789012345";
+    let file_encryption_properties = 
FileEncryptionProperties::builder(footer_key.to_vec())
+        .build()
+        .unwrap();
+
+    let props = WriterProperties::builder()
+        // Ensure multiple row groups
+        .set_max_row_group_size(50)
+        // Ensure multiple pages per row group
+        .set_write_batch_size(20)
+        .set_data_page_row_count_limit(20)
+        .with_file_encryption_properties(file_encryption_properties)
+        .build();
+
+    let mut writer =
+        ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(), 
Some(props)).unwrap();
+
+    for (x0, x1) in x0_arrays.into_iter().zip(x1_arrays.into_iter()) {
+        let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(x0), 
Arc::new(x1)]).unwrap();
+        writer.write(&batch).unwrap();
+    }
+
+    writer.close().unwrap();
+
+    let decryption_properties = 
FileDecryptionProperties::builder(footer_key.to_vec())
+        .build()
+        .unwrap();
+
+    let options = 
ArrowReaderOptions::new().with_file_decryption_properties(decryption_properties);
+
+    let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(file, 
options).unwrap();
+    assert_eq!(&row_group_sizes(builder.metadata()), &[50, 50, 50]);
+
+    let batches = builder
+        .with_batch_size(100)
+        .build()
+        .unwrap()
+        .collect::<ArrowResult<Vec<_>>>()
+        .unwrap();
+
+    assert_eq!(batches.len(), 2);
+    assert!(batches.iter().all(|x| x.num_columns() == 2));
+
+    let batch_sizes: Vec<_> = batches.iter().map(|x| x.num_rows()).collect();
+
+    assert_eq!(&batch_sizes, &[100, 50]);
+
+    let x0_values: Vec<_> = batches
+        .iter()
+        .flat_map(|x| {
+            x.column(0)
+                .as_any()
+                .downcast_ref::<Int32Array>()
+                .unwrap()
+                .values()
+                .iter()
+                .cloned()
+        })
+        .collect();
+
+    let x1_values: Vec<_> = batches
+        .iter()
+        .flat_map(|x| {
+            x.column(1)
+                .as_any()
+                .downcast_ref::<Int32Array>()
+                .unwrap()
+                .values()
+                .iter()
+                .cloned()
+        })
+        .collect();
+
+    let expected_x0_values: Vec<_> = [0..100, 
100..150].into_iter().flatten().collect();
+    assert_eq!(&x0_values, &expected_x0_values);
+
+    let expected_x1_values: Vec<_> = [100..200, 
200..250].into_iter().flatten().collect();
+    assert_eq!(&x1_values, &expected_x1_values);
+}
+
+#[test]
+fn test_write_non_uniform_encryption() {
+    let testdata = arrow::util::test_util::parquet_test_data();
+    let path = 
format!("{testdata}/encrypt_columns_and_footer.parquet.encrypted");
+
+    let footer_key = b"0123456789012345".to_vec(); // 128bit/16
+    let column_names = vec!["double_field", "float_field"];
+    let column_keys = vec![b"1234567890123450".to_vec(), 
b"1234567890123451".to_vec()];
+
+    let decryption_properties = 
FileDecryptionProperties::builder(footer_key.clone())
+        .with_column_keys(column_names.clone(), column_keys.clone())
+        .build()
+        .unwrap();
+
+    let file_encryption_properties = 
FileEncryptionProperties::builder(footer_key)
+        .with_column_keys(column_names, column_keys)
+        .build()
+        .unwrap();
+
+    read_and_roundtrip_to_encrypted_file(&path, decryption_properties, 
file_encryption_properties);
+}
+
+// todo: currently we raise if writing with plaintext footer, but we should 
support it

Review Comment:
   There was already an issue open for this so I've just added it to the 
comment (#7320)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to