adamreeve commented on code in PR #7111: URL: https://github.com/apache/arrow-rs/pull/7111#discussion_r2002336070
########## parquet/src/encryption/encrypt.rs: ########## @@ -0,0 +1,339 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::encryption::ciphers::{BlockEncryptor, RingGcmBlockEncryptor}; +use crate::errors::{ParquetError, Result}; +use crate::file::column_crypto_metadata::{ColumnCryptoMetaData, EncryptionWithColumnKey}; +use crate::schema::types::{ColumnDescPtr, SchemaDescriptor}; +use crate::thrift::TSerializable; +use ring::rand::{SecureRandom, SystemRandom}; +use std::collections::{HashMap, HashSet}; +use std::io::Write; +use thrift::protocol::TCompactOutputProtocol; + +#[derive(Debug, Clone, PartialEq)] +struct EncryptionKey { + key: Vec<u8>, + key_metadata: Option<Vec<u8>>, +} + +impl EncryptionKey { + fn new(key: Vec<u8>) -> EncryptionKey { + Self { + key, + key_metadata: None, + } + } + + fn with_metadata(mut self, metadata: Vec<u8>) -> Self { + self.key_metadata = Some(metadata); + self + } + + fn key(&self) -> &Vec<u8> { + &self.key + } +} + +#[derive(Debug, Clone, PartialEq)] +pub struct FileEncryptionProperties { + encrypt_footer: bool, + footer_key: EncryptionKey, + column_keys: HashMap<String, EncryptionKey>, + aad_prefix: Option<Vec<u8>>, + store_aad_prefix: bool, +} + +impl FileEncryptionProperties { + /// Create a new builder for encryption properties + pub fn builder(footer_key: Vec<u8>) -> EncryptionPropertiesBuilder { + EncryptionPropertiesBuilder::new(footer_key) + } + + /// Should the footer be encrypted + pub fn encrypt_footer(&self) -> bool { + self.encrypt_footer + } + + /// Retrieval metadata of key used for encryption of footer and (possibly) columns + pub fn footer_key_metadata(&self) -> Option<&Vec<u8>> { + self.footer_key.key_metadata.as_ref() + } + + /// AAD prefix string uniquely identifies the file and prevents file swapping + pub fn aad_prefix(&self) -> Option<&Vec<u8>> { + self.aad_prefix.as_ref() + } + + /// Should the AAD prefix should be stored in the file + pub fn store_aad_prefix(&self) -> bool { + self.store_aad_prefix && self.aad_prefix.is_some() + } + + /// Checks if columns that are to be encrypted are present in schema + #[cfg(feature = "encryption")] + pub(crate) fn validate_encrypted_column_names( + &self, + schema: SchemaDescriptor, + ) -> std::result::Result<(), ParquetError> { + let schema_columns = schema + .columns() + .iter() + .map(|c| c.path().string()) + .collect::<HashSet<_>>(); + let encryption_columns = self + .column_keys + .keys() + .cloned() + .collect::<HashSet<String>>(); + if !encryption_columns.is_subset(&schema_columns) { + let mut columns_missing_in_schema = encryption_columns + .difference(&schema_columns) + .cloned() + .collect::<Vec<String>>(); + columns_missing_in_schema.sort(); + return Err(ParquetError::General( + format!( + "The following columns with encryption keys specified were not found in the schema: {}", + columns_missing_in_schema.join(", ") + ) + .to_string(), + )); + } + Ok(()) + } +} + +pub struct EncryptionPropertiesBuilder { + footer_key: EncryptionKey, + column_keys: HashMap<String, EncryptionKey>, + aad_prefix: Option<Vec<u8>>, + encrypt_footer: bool, + store_aad_prefix: bool, +} + +impl EncryptionPropertiesBuilder { + pub fn new(footer_key: Vec<u8>) -> EncryptionPropertiesBuilder { + Self { + footer_key: EncryptionKey::new(footer_key), + column_keys: HashMap::default(), + aad_prefix: None, + encrypt_footer: true, + store_aad_prefix: true, + } + } + + /// Set if the footer be encrypted Review Comment: ```suggestion /// Set if the footer should be encrypted. Defaults to true. ``` ########## parquet/src/encryption/ciphers.rs: ########## @@ -61,3 +64,118 @@ impl BlockDecryptor for RingGcmBlockDecryptor { Ok(result) } } + +pub trait BlockEncryptor: Debug + Send + Sync { + fn encrypt(&mut self, plaintext: &[u8], aad: &[u8]) -> Result<Vec<u8>>; +} + +#[derive(Debug, Clone)] +struct CounterNonce { + start: u128, + counter: u128, +} + +impl CounterNonce { + pub fn new(rng: &SystemRandom) -> Result<Self> { + let mut buf = [0; 16]; + rng.fill(&mut buf)?; + + // Since this is a random seed value, endianness doesn't matter at all, + // and we can use whatever is platform-native. + let start = u128::from_ne_bytes(buf) & RIGHT_TWELVE; + let counter = start.wrapping_add(1); + + Ok(Self { start, counter }) + } + + /// One accessor for the nonce bytes to avoid potentially flipping endianness + #[inline] + pub fn get_bytes(&self) -> [u8; NONCE_LEN] { + self.counter.to_le_bytes()[0..NONCE_LEN].try_into().unwrap() + } +} + +impl NonceSequence for CounterNonce { + fn advance(&mut self) -> Result<ring::aead::Nonce, ring::error::Unspecified> { + // If we've wrapped around, we've exhausted this nonce sequence + if (self.counter & RIGHT_TWELVE) == (self.start & RIGHT_TWELVE) { + Err(ring::error::Unspecified) + } else { + // Otherwise, just advance and return the new value + let buf: [u8; NONCE_LEN] = self.get_bytes(); + self.counter = self.counter.wrapping_add(1); + Ok(ring::aead::Nonce::assume_unique_for_key(buf)) + } + } +} + +#[derive(Debug, Clone)] +pub(crate) struct RingGcmBlockEncryptor { + key: LessSafeKey, + nonce_sequence: CounterNonce, +} + +impl RingGcmBlockEncryptor { + /// Create a new `RingGcmBlockEncryptor` with a given key and random nonce. + /// The nonce will advance appropriately with each block encryption and + /// return an error if it wraps around. + pub(crate) fn new(key_bytes: &[u8]) -> Result<Self> { + let rng = SystemRandom::new(); + + // todo support other key sizes + let key = UnboundKey::new(&AES_128_GCM, key_bytes) + .map_err(|e| general_err!("Error creating AES key: {}", e))?; + let nonce = CounterNonce::new(&rng)?; + + Ok(Self { + key: LessSafeKey::new(key), + nonce_sequence: nonce, + }) + } +} + +impl BlockEncryptor for RingGcmBlockEncryptor { + fn encrypt(&mut self, plaintext: &[u8], aad: &[u8]) -> Result<Vec<u8>> { + // Create encrypted buffer. + // Format is: [ciphertext size, nonce, ciphertext, authentication tag] + let ciphertext_length = NONCE_LEN + plaintext.len() + TAG_LEN; + let mut ciphertext = Vec::with_capacity(SIZE_LEN + ciphertext_length); + ciphertext.extend((ciphertext_length as u32).to_le_bytes()); Review Comment: It's pretty unlikely that this will be an issue, but we should check for overflow here and return an error if the length doesn't fit in u32. Arrow C++ only seems to support sizes up to max signed 32 bit int rather than unsigned, but the format docs just say this is stored as "a 4-byte little endian", so I don't know that we need to be as restrictive here. ########## parquet/src/encryption/encrypt.rs: ########## @@ -0,0 +1,339 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::encryption::ciphers::{BlockEncryptor, RingGcmBlockEncryptor}; +use crate::errors::{ParquetError, Result}; +use crate::file::column_crypto_metadata::{ColumnCryptoMetaData, EncryptionWithColumnKey}; +use crate::schema::types::{ColumnDescPtr, SchemaDescriptor}; +use crate::thrift::TSerializable; +use ring::rand::{SecureRandom, SystemRandom}; +use std::collections::{HashMap, HashSet}; +use std::io::Write; +use thrift::protocol::TCompactOutputProtocol; + +#[derive(Debug, Clone, PartialEq)] +struct EncryptionKey { + key: Vec<u8>, + key_metadata: Option<Vec<u8>>, +} + +impl EncryptionKey { + fn new(key: Vec<u8>) -> EncryptionKey { + Self { + key, + key_metadata: None, + } + } + + fn with_metadata(mut self, metadata: Vec<u8>) -> Self { + self.key_metadata = Some(metadata); + self + } + + fn key(&self) -> &Vec<u8> { + &self.key + } +} + +#[derive(Debug, Clone, PartialEq)] +pub struct FileEncryptionProperties { + encrypt_footer: bool, + footer_key: EncryptionKey, + column_keys: HashMap<String, EncryptionKey>, + aad_prefix: Option<Vec<u8>>, + store_aad_prefix: bool, +} + +impl FileEncryptionProperties { + /// Create a new builder for encryption properties + pub fn builder(footer_key: Vec<u8>) -> EncryptionPropertiesBuilder { + EncryptionPropertiesBuilder::new(footer_key) + } + + /// Should the footer be encrypted + pub fn encrypt_footer(&self) -> bool { + self.encrypt_footer + } + + /// Retrieval metadata of key used for encryption of footer and (possibly) columns + pub fn footer_key_metadata(&self) -> Option<&Vec<u8>> { + self.footer_key.key_metadata.as_ref() + } + + /// AAD prefix string uniquely identifies the file and prevents file swapping + pub fn aad_prefix(&self) -> Option<&Vec<u8>> { + self.aad_prefix.as_ref() + } + + /// Should the AAD prefix should be stored in the file + pub fn store_aad_prefix(&self) -> bool { + self.store_aad_prefix && self.aad_prefix.is_some() + } + + /// Checks if columns that are to be encrypted are present in schema + #[cfg(feature = "encryption")] + pub(crate) fn validate_encrypted_column_names( + &self, + schema: SchemaDescriptor, + ) -> std::result::Result<(), ParquetError> { + let schema_columns = schema + .columns() + .iter() + .map(|c| c.path().string()) + .collect::<HashSet<_>>(); + let encryption_columns = self + .column_keys + .keys() + .cloned() + .collect::<HashSet<String>>(); + if !encryption_columns.is_subset(&schema_columns) { + let mut columns_missing_in_schema = encryption_columns + .difference(&schema_columns) + .cloned() + .collect::<Vec<String>>(); + columns_missing_in_schema.sort(); + return Err(ParquetError::General( + format!( + "The following columns with encryption keys specified were not found in the schema: {}", + columns_missing_in_schema.join(", ") + ) + .to_string(), + )); + } + Ok(()) + } +} + +pub struct EncryptionPropertiesBuilder { + footer_key: EncryptionKey, + column_keys: HashMap<String, EncryptionKey>, + aad_prefix: Option<Vec<u8>>, + encrypt_footer: bool, + store_aad_prefix: bool, +} + +impl EncryptionPropertiesBuilder { + pub fn new(footer_key: Vec<u8>) -> EncryptionPropertiesBuilder { + Self { + footer_key: EncryptionKey::new(footer_key), + column_keys: HashMap::default(), + aad_prefix: None, + encrypt_footer: true, + store_aad_prefix: true, + } + } + + /// Set if the footer be encrypted + pub fn with_plaintext_footer(mut self, plaintext_footer: bool) -> Self { + self.encrypt_footer = !plaintext_footer; + self + } + + /// Set retrieval metadata of key used for encryption of footer and (possibly) columns + pub fn with_footer_key_metadata(mut self, metadata: Vec<u8>) -> Self { + self.footer_key = self.footer_key.with_metadata(metadata); + self + } + + /// Set the key used for encryption of a column. Note that if no column keys are provided but + /// footer key is all columns will be encrypted with the footer key. If column keys are provided + /// only the columns with a key will be encrypted even if footer key is provided. + pub fn with_column_key(mut self, column_name: &str, key: Vec<u8>) -> Self { + self.column_keys + .insert(column_name.to_string(), EncryptionKey::new(key)); + self + } + + /// Set the key used for encryption of a column and it's metadata. Key's metadata field is to + /// enable file readers to recover the key. For example, the key_metadata can keep a serialized + /// ID of a data key. Note that if no column keys are provided but footer key is all columns + /// will be encrypted with the footer key. If column keys are provided only the columns with + /// a key will be encrypted even if footer key is provided. + pub fn with_column_key_and_metadata( + mut self, + column_name: &str, + key: Vec<u8>, + metadata: Vec<u8>, + ) -> Self { + self.column_keys.insert( + column_name.to_string(), + EncryptionKey::new(key).with_metadata(metadata), + ); + self + } + + /// Set the keys used for encryption of columns. Analogous to + /// with_column_key but for multiple columns. + pub fn with_column_keys(mut self, column_names: Vec<&str>, keys: Vec<Vec<u8>>) -> Self { + for (i, column_name) in column_names.into_iter().enumerate() { + self.column_keys + .insert(column_name.to_string(), EncryptionKey::new(keys[i].clone())); + } + self + } + + /// AAD prefix string uniquely identifies the file and allows to differentiate it e.g. from + /// older versions of the file or from other partition files in the same data set (table). + /// This string is optionally passed by a writer upon file creation. + pub fn with_aad_prefix(mut self, aad_prefix: Vec<u8>) -> Self { + self.aad_prefix = Some(aad_prefix); + self + } + + /// Should the AAD prefix be stored in the file Review Comment: ```suggestion /// Should the AAD prefix be stored in the file. If false, readers will need to provide the AAD prefix to be able to decrypt data. Defaults to true. ``` ########## parquet/tests/arrow_reader/encryption_util.rs: ########## @@ -17,7 +17,35 @@ use arrow_array::cast::AsArray; use arrow_array::{types, RecordBatch}; +use parquet::arrow::arrow_reader::{ + ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReaderBuilder, +}; + +use parquet::arrow::ArrowWriter; +use parquet::encryption::decrypt::FileDecryptionProperties; +use parquet::encryption::encrypt::FileEncryptionProperties; +use parquet::errors::Result; use parquet::file::metadata::ParquetMetaData; +use parquet::file::properties::WriterProperties; +use std::fs::File; + +/// Tests reading an encrypted file from the parquet-testing repository +pub(crate) fn verify_encryption_test_file_read( + file: File, + decryption_properties: FileDecryptionProperties, +) { + let options = ArrowReaderOptions::default() + .with_file_decryption_properties(decryption_properties.clone()); Review Comment: No need to clone, `decryption_properties` aren't being used again in this function, they can be moved. ```suggestion .with_file_decryption_properties(decryption_properties); ``` ########## parquet/tests/arrow_reader/encryption.rs: ########## @@ -201,3 +211,410 @@ fn verify_encryption_test_file_read(file: File, decryption_properties: FileDecry verify_encryption_test_data(record_batches, metadata); } + +fn row_group_sizes(metadata: &ParquetMetaData) -> Vec<i64> { + metadata.row_groups().iter().map(|x| x.num_rows()).collect() +} + +#[test] +fn test_uniform_encryption_roundtrip() { Review Comment: These round trip tests are to test writing, so it doesn't seem like they belong in the arrow_**reader** tests. We could create separate `arrow_writer` tests, but then there would probably need to be some encryption related utilities shared between the reading and writing tests. Maybe we should have a whole test library just for encryption instead? Or we could rename `arrow_reader` to `arrow_reader_writer`? ########## parquet/src/encryption/encrypt.rs: ########## @@ -0,0 +1,339 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::encryption::ciphers::{BlockEncryptor, RingGcmBlockEncryptor}; +use crate::errors::{ParquetError, Result}; +use crate::file::column_crypto_metadata::{ColumnCryptoMetaData, EncryptionWithColumnKey}; +use crate::schema::types::{ColumnDescPtr, SchemaDescriptor}; +use crate::thrift::TSerializable; +use ring::rand::{SecureRandom, SystemRandom}; +use std::collections::{HashMap, HashSet}; +use std::io::Write; +use thrift::protocol::TCompactOutputProtocol; + +#[derive(Debug, Clone, PartialEq)] +struct EncryptionKey { + key: Vec<u8>, + key_metadata: Option<Vec<u8>>, +} + +impl EncryptionKey { + fn new(key: Vec<u8>) -> EncryptionKey { + Self { + key, + key_metadata: None, + } + } + + fn with_metadata(mut self, metadata: Vec<u8>) -> Self { + self.key_metadata = Some(metadata); + self + } + + fn key(&self) -> &Vec<u8> { + &self.key + } +} + +#[derive(Debug, Clone, PartialEq)] +pub struct FileEncryptionProperties { + encrypt_footer: bool, + footer_key: EncryptionKey, + column_keys: HashMap<String, EncryptionKey>, + aad_prefix: Option<Vec<u8>>, + store_aad_prefix: bool, +} + +impl FileEncryptionProperties { + /// Create a new builder for encryption properties + pub fn builder(footer_key: Vec<u8>) -> EncryptionPropertiesBuilder { + EncryptionPropertiesBuilder::new(footer_key) + } + + /// Should the footer be encrypted + pub fn encrypt_footer(&self) -> bool { + self.encrypt_footer + } + + /// Retrieval metadata of key used for encryption of footer and (possibly) columns + pub fn footer_key_metadata(&self) -> Option<&Vec<u8>> { + self.footer_key.key_metadata.as_ref() + } + + /// AAD prefix string uniquely identifies the file and prevents file swapping + pub fn aad_prefix(&self) -> Option<&Vec<u8>> { + self.aad_prefix.as_ref() + } + + /// Should the AAD prefix should be stored in the file + pub fn store_aad_prefix(&self) -> bool { + self.store_aad_prefix && self.aad_prefix.is_some() + } + + /// Checks if columns that are to be encrypted are present in schema + #[cfg(feature = "encryption")] + pub(crate) fn validate_encrypted_column_names( + &self, + schema: SchemaDescriptor, Review Comment: ```suggestion schema: &SchemaDescriptor, ``` ########## parquet/src/encryption/encrypt.rs: ########## @@ -0,0 +1,339 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::encryption::ciphers::{BlockEncryptor, RingGcmBlockEncryptor}; +use crate::errors::{ParquetError, Result}; +use crate::file::column_crypto_metadata::{ColumnCryptoMetaData, EncryptionWithColumnKey}; +use crate::schema::types::{ColumnDescPtr, SchemaDescriptor}; +use crate::thrift::TSerializable; +use ring::rand::{SecureRandom, SystemRandom}; +use std::collections::{HashMap, HashSet}; +use std::io::Write; +use thrift::protocol::TCompactOutputProtocol; + +#[derive(Debug, Clone, PartialEq)] +struct EncryptionKey { + key: Vec<u8>, + key_metadata: Option<Vec<u8>>, +} + +impl EncryptionKey { + fn new(key: Vec<u8>) -> EncryptionKey { + Self { + key, + key_metadata: None, + } + } + + fn with_metadata(mut self, metadata: Vec<u8>) -> Self { + self.key_metadata = Some(metadata); + self + } + + fn key(&self) -> &Vec<u8> { + &self.key + } +} + +#[derive(Debug, Clone, PartialEq)] +pub struct FileEncryptionProperties { + encrypt_footer: bool, + footer_key: EncryptionKey, + column_keys: HashMap<String, EncryptionKey>, + aad_prefix: Option<Vec<u8>>, + store_aad_prefix: bool, +} + +impl FileEncryptionProperties { + /// Create a new builder for encryption properties + pub fn builder(footer_key: Vec<u8>) -> EncryptionPropertiesBuilder { + EncryptionPropertiesBuilder::new(footer_key) + } + + /// Should the footer be encrypted + pub fn encrypt_footer(&self) -> bool { + self.encrypt_footer + } + + /// Retrieval metadata of key used for encryption of footer and (possibly) columns + pub fn footer_key_metadata(&self) -> Option<&Vec<u8>> { + self.footer_key.key_metadata.as_ref() + } + + /// AAD prefix string uniquely identifies the file and prevents file swapping + pub fn aad_prefix(&self) -> Option<&Vec<u8>> { + self.aad_prefix.as_ref() + } + + /// Should the AAD prefix should be stored in the file Review Comment: ```suggestion /// Should the AAD prefix be stored in the file ``` ########## parquet/src/file/writer.rs: ########## @@ -171,7 +183,30 @@ impl<W: Write + Send> SerializedFileWriter<W> { /// Creates new file writer. pub fn new(buf: W, schema: TypePtr, properties: WriterPropertiesPtr) -> Result<Self> { let mut buf = TrackedWrite::new(buf); - Self::start_file(&mut buf)?; + + #[cfg(feature = "encryption")] + let file_encryptor = match properties.file_encryption_properties.as_ref() { + None => None, + Some(encryption_props) => Some(Arc::new(FileEncryptor::new(encryption_props.clone())?)), + }; + + #[cfg(feature = "encryption")] + if properties.file_encryption_properties.is_some() { + properties + .file_encryption_properties + .clone() + .unwrap() + .validate_encrypted_column_names(SchemaDescriptor::new(schema.clone()))?; + } Review Comment: ```suggestion let schema_descriptor = SchemaDescriptor::new(schema.clone()); #[cfg(feature = "encryption")] if let Some(file_encryption_properties) = properties.file_encryption_properties.as_ref() { file_encryption_properties.validate_encrypted_column_names(&schema_descriptor)?; } ``` Plus change below to: ``` Ok(Self { buf, schema: schema, descr: schema_descriptor, ``` ########## parquet/tests/arrow_reader/encryption_util.rs: ########## @@ -17,7 +17,35 @@ use arrow_array::cast::AsArray; use arrow_array::{types, RecordBatch}; +use parquet::arrow::arrow_reader::{ + ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReaderBuilder, +}; + +use parquet::arrow::ArrowWriter; +use parquet::encryption::decrypt::FileDecryptionProperties; +use parquet::encryption::encrypt::FileEncryptionProperties; +use parquet::errors::Result; use parquet::file::metadata::ParquetMetaData; +use parquet::file::properties::WriterProperties; +use std::fs::File; + +/// Tests reading an encrypted file from the parquet-testing repository +pub(crate) fn verify_encryption_test_file_read( + file: File, + decryption_properties: FileDecryptionProperties, +) { + let options = ArrowReaderOptions::default() + .with_file_decryption_properties(decryption_properties.clone()); + let metadata = ArrowReaderMetadata::load(&file, options.clone()).unwrap(); + + let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap(); + let record_reader = builder.build().unwrap(); + let record_batches = record_reader + .map(|x| x.unwrap()) + .collect::<Vec<RecordBatch>>(); + + verify_encryption_test_data(record_batches, &metadata.metadata().clone()); Review Comment: ```suggestion verify_encryption_test_data(record_batches, metadata.metadata()); ``` ########## parquet/src/file/metadata/writer.rs: ########## @@ -182,6 +258,95 @@ impl<'a, W: Write> ThriftMetadataWriter<'a, W> { self.key_value_metadata = Some(key_value_metadata); self } + + #[cfg(feature = "encryption")] + pub fn with_file_encryptor(mut self, file_encryptor: Option<Arc<FileEncryptor>>) -> Self { + self.file_encryptor = file_encryptor; + self + } + + #[cfg(feature = "encryption")] + fn file_crypto_metadata( + file_encryptor: &Arc<FileEncryptor>, + ) -> Result<crate::format::FileCryptoMetaData> { + let properties = file_encryptor.properties(); + let supply_aad_prefix = properties + .aad_prefix() + .map(|_| !properties.store_aad_prefix()); + let encryption_algorithm = AesGcmV1 { + aad_prefix: properties.aad_prefix().cloned(), Review Comment: We shouldn't set `aad_prefix` here unless `properties.store_aad_prefix()` is true. We should also add a test that we can't read a file written with `store_aad_prefix = false` unless the AAD prefix is passed. When reading, I guess we could check `supply_aad_prefix` as an indication that an AAD prefix is expected and raise a more helpful error if one isn't supplied, but that's probably out of scope for this PR. ########## parquet/src/encryption/encrypt.rs: ########## @@ -0,0 +1,339 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::encryption::ciphers::{BlockEncryptor, RingGcmBlockEncryptor}; +use crate::errors::{ParquetError, Result}; +use crate::file::column_crypto_metadata::{ColumnCryptoMetaData, EncryptionWithColumnKey}; +use crate::schema::types::{ColumnDescPtr, SchemaDescriptor}; +use crate::thrift::TSerializable; +use ring::rand::{SecureRandom, SystemRandom}; +use std::collections::{HashMap, HashSet}; +use std::io::Write; +use thrift::protocol::TCompactOutputProtocol; + +#[derive(Debug, Clone, PartialEq)] +struct EncryptionKey { + key: Vec<u8>, + key_metadata: Option<Vec<u8>>, +} + +impl EncryptionKey { + fn new(key: Vec<u8>) -> EncryptionKey { + Self { + key, + key_metadata: None, + } + } + + fn with_metadata(mut self, metadata: Vec<u8>) -> Self { + self.key_metadata = Some(metadata); + self + } + + fn key(&self) -> &Vec<u8> { + &self.key + } +} + +#[derive(Debug, Clone, PartialEq)] +pub struct FileEncryptionProperties { + encrypt_footer: bool, + footer_key: EncryptionKey, + column_keys: HashMap<String, EncryptionKey>, + aad_prefix: Option<Vec<u8>>, + store_aad_prefix: bool, +} + +impl FileEncryptionProperties { + /// Create a new builder for encryption properties + pub fn builder(footer_key: Vec<u8>) -> EncryptionPropertiesBuilder { + EncryptionPropertiesBuilder::new(footer_key) + } + + /// Should the footer be encrypted + pub fn encrypt_footer(&self) -> bool { + self.encrypt_footer + } + + /// Retrieval metadata of key used for encryption of footer and (possibly) columns + pub fn footer_key_metadata(&self) -> Option<&Vec<u8>> { + self.footer_key.key_metadata.as_ref() + } + + /// AAD prefix string uniquely identifies the file and prevents file swapping + pub fn aad_prefix(&self) -> Option<&Vec<u8>> { + self.aad_prefix.as_ref() + } + + /// Should the AAD prefix should be stored in the file + pub fn store_aad_prefix(&self) -> bool { + self.store_aad_prefix && self.aad_prefix.is_some() + } + + /// Checks if columns that are to be encrypted are present in schema + #[cfg(feature = "encryption")] + pub(crate) fn validate_encrypted_column_names( + &self, + schema: SchemaDescriptor, + ) -> std::result::Result<(), ParquetError> { + let schema_columns = schema + .columns() + .iter() + .map(|c| c.path().string()) + .collect::<HashSet<_>>(); + let encryption_columns = self + .column_keys + .keys() + .cloned() + .collect::<HashSet<String>>(); + if !encryption_columns.is_subset(&schema_columns) { + let mut columns_missing_in_schema = encryption_columns + .difference(&schema_columns) + .cloned() + .collect::<Vec<String>>(); + columns_missing_in_schema.sort(); + return Err(ParquetError::General( + format!( + "The following columns with encryption keys specified were not found in the schema: {}", + columns_missing_in_schema.join(", ") + ) + .to_string(), + )); + } + Ok(()) + } +} + +pub struct EncryptionPropertiesBuilder { + footer_key: EncryptionKey, + column_keys: HashMap<String, EncryptionKey>, + aad_prefix: Option<Vec<u8>>, + encrypt_footer: bool, + store_aad_prefix: bool, +} + +impl EncryptionPropertiesBuilder { + pub fn new(footer_key: Vec<u8>) -> EncryptionPropertiesBuilder { + Self { + footer_key: EncryptionKey::new(footer_key), + column_keys: HashMap::default(), + aad_prefix: None, + encrypt_footer: true, + store_aad_prefix: true, + } + } + + /// Set if the footer be encrypted + pub fn with_plaintext_footer(mut self, plaintext_footer: bool) -> Self { + self.encrypt_footer = !plaintext_footer; + self + } + + /// Set retrieval metadata of key used for encryption of footer and (possibly) columns + pub fn with_footer_key_metadata(mut self, metadata: Vec<u8>) -> Self { + self.footer_key = self.footer_key.with_metadata(metadata); + self + } + + /// Set the key used for encryption of a column. Note that if no column keys are provided but + /// footer key is all columns will be encrypted with the footer key. If column keys are provided + /// only the columns with a key will be encrypted even if footer key is provided. + pub fn with_column_key(mut self, column_name: &str, key: Vec<u8>) -> Self { + self.column_keys + .insert(column_name.to_string(), EncryptionKey::new(key)); + self + } + + /// Set the key used for encryption of a column and it's metadata. Key's metadata field is to + /// enable file readers to recover the key. For example, the key_metadata can keep a serialized + /// ID of a data key. Note that if no column keys are provided but footer key is all columns + /// will be encrypted with the footer key. If column keys are provided only the columns with + /// a key will be encrypted even if footer key is provided. + pub fn with_column_key_and_metadata( + mut self, + column_name: &str, + key: Vec<u8>, + metadata: Vec<u8>, + ) -> Self { + self.column_keys.insert( + column_name.to_string(), + EncryptionKey::new(key).with_metadata(metadata), + ); + self + } + + /// Set the keys used for encryption of columns. Analogous to + /// with_column_key but for multiple columns. + pub fn with_column_keys(mut self, column_names: Vec<&str>, keys: Vec<Vec<u8>>) -> Self { + for (i, column_name) in column_names.into_iter().enumerate() { + self.column_keys + .insert(column_name.to_string(), EncryptionKey::new(keys[i].clone())); + } + self + } + + /// AAD prefix string uniquely identifies the file and allows to differentiate it e.g. from + /// older versions of the file or from other partition files in the same data set (table). + /// This string is optionally passed by a writer upon file creation. + pub fn with_aad_prefix(mut self, aad_prefix: Vec<u8>) -> Self { + self.aad_prefix = Some(aad_prefix); + self + } + + /// Should the AAD prefix be stored in the file + pub fn with_aad_prefix_storage(mut self, store_aad_prefix: bool) -> Self { + self.store_aad_prefix = store_aad_prefix; + self + } + + /// Build the encryption properties + pub fn build(self) -> FileEncryptionProperties { + FileEncryptionProperties { + encrypt_footer: self.encrypt_footer, + footer_key: self.footer_key, + column_keys: self.column_keys, + aad_prefix: self.aad_prefix, + store_aad_prefix: self.store_aad_prefix, + } + } +} + +#[derive(Debug)] +pub(crate) struct FileEncryptor { + properties: FileEncryptionProperties, + aad_file_unique: Vec<u8>, + file_aad: Vec<u8>, +} + +impl FileEncryptor { + pub(crate) fn new(properties: FileEncryptionProperties) -> Result<Self> { + // Generate unique AAD for file + let rng = SystemRandom::new(); + let mut aad_file_unique = vec![0u8; 8]; + rng.fill(&mut aad_file_unique)?; + + let file_aad = match properties.aad_prefix.as_ref() { + None => aad_file_unique.clone(), + Some(aad_prefix) => [aad_prefix.clone(), aad_file_unique.clone()].concat(), + }; + + Ok(Self { + properties, + aad_file_unique, + file_aad, + }) + } + + /// Get the encryptor's file encryption properties + pub fn properties(&self) -> &FileEncryptionProperties { + &self.properties + } + + /// Combined AAD prefix and suffix for the file generated + pub fn file_aad(&self) -> &[u8] { + &self.file_aad + } + + /// Unique file identifier part of AAD suffix. This is generated per module by + /// concatenating unique file unique, module type, row group ordinal (all except Review Comment: ```suggestion /// Unique file identifier part of AAD suffix. The full AAD suffix is generated per module by /// concatenating aad_file_unique, module type, row group ordinal (all except ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org