adamreeve commented on code in PR #7111: URL: https://github.com/apache/arrow-rs/pull/7111#discussion_r1992607802
########## parquet/src/arrow/arrow_writer/mod.rs: ########## @@ -457,20 +476,49 @@ type SharedColumnChunk = Arc<Mutex<ArrowColumnChunkData>>; #[derive(Default)] struct ArrowPageWriter { buffer: SharedColumnChunk, + #[cfg(feature = "encryption")] + page_encryptor: Option<PageEncryptor>, + #[cfg(not(feature = "encryption"))] + page_encryptor: Option<Never>, +} + +#[cfg(feature = "encryption")] +impl ArrowPageWriter { + pub fn with_encryptor(mut self, page_encryptor: Option<PageEncryptor>) -> Self { + self.page_encryptor = page_encryptor; + self + } } impl PageWriter for ArrowPageWriter { fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> { let mut buf = self.buffer.try_lock().unwrap(); - let page_header = page.to_thrift_header(); - let header = { - let mut header = Vec::with_capacity(1024); - let mut protocol = TCompactOutputProtocol::new(&mut header); - page_header.write_to_out_protocol(&mut protocol)?; - Bytes::from(header) + + let data = match self.page_encryptor.as_ref() { + #[cfg(feature = "encryption")] + Some(encryptor) => { + let encrypted_buffer = encryptor.encrypt_page(&page)?; + Bytes::from(encrypted_buffer) + } + _ => page.compressed_page().buffer().clone(), + }; + + let mut page_header = page.to_thrift_header(); + page_header.compressed_page_size = data.len() as i32; Review Comment: Rather than overwriting the compressed page size here, we should probably encrypt the page before creating the header. That might require making the page mutable to allow overwriting its `buf` member? I'm not sure how best to handle this but it's a little hacky at the moment. ########## parquet/src/encryption/encrypt.rs: ########## @@ -0,0 +1,283 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::encryption::ciphers::{BlockEncryptor, RingGcmBlockEncryptor}; +use crate::errors::{ParquetError, Result}; +use crate::file::column_crypto_metadata::{ColumnCryptoMetaData, EncryptionWithColumnKey}; +use crate::schema::types::{ColumnDescPtr, SchemaDescriptor}; +use crate::thrift::TSerializable; +use ring::rand::{SecureRandom, SystemRandom}; +use std::collections::{HashMap, HashSet}; +use std::io::Write; +use thrift::protocol::TCompactOutputProtocol; + +#[derive(Debug, Clone, PartialEq)] +pub struct EncryptionKey { + key: Vec<u8>, + key_metadata: Option<Vec<u8>>, +} + +impl EncryptionKey { + pub fn new(key: Vec<u8>) -> EncryptionKey { + Self { + key, + key_metadata: None, + } + } + + pub fn with_metadata(mut self, metadata: Vec<u8>) -> Self { + self.key_metadata = Some(metadata); + self + } + + pub fn key(&self) -> &Vec<u8> { + &self.key + } +} + +#[derive(Debug, Clone, PartialEq)] +pub struct FileEncryptionProperties { + encrypt_footer: bool, + footer_key: EncryptionKey, + column_keys: HashMap<String, EncryptionKey>, + aad_prefix: Option<Vec<u8>>, + store_aad_prefix: bool, +} + +impl FileEncryptionProperties { + pub fn builder(footer_key: Vec<u8>) -> EncryptionPropertiesBuilder { + EncryptionPropertiesBuilder::new(footer_key) + } + + pub fn encrypt_footer(&self) -> bool { + self.encrypt_footer + } + + pub fn footer_key_metadata(&self) -> Option<&Vec<u8>> { + self.footer_key.key_metadata.as_ref() + } + + pub fn aad_prefix(&self) -> Option<&Vec<u8>> { + self.aad_prefix.as_ref() + } + + pub fn store_aad_prefix(&self) -> bool { + self.store_aad_prefix && self.aad_prefix.is_some() + } + + /// Checks if columns that are to be encrypted are present in schema + #[cfg(feature = "encryption")] + pub(crate) fn encrypted_columns_in_schema( + &self, + schema: SchemaDescriptor, + ) -> std::result::Result<(), ParquetError> { + let schema_columns = schema + .columns() + .iter() + .map(|c| c.path().string()) + .collect::<HashSet<_>>(); + let encryption_columns = self + .column_keys + .keys() + .cloned() + .collect::<HashSet<String>>(); + if !encryption_columns.is_subset(&schema_columns) { + let mut columns_missing_in_schema = encryption_columns + .difference(&schema_columns) + .cloned() + .collect::<Vec<String>>(); + columns_missing_in_schema.sort(); + return Err(ParquetError::General( + format!( + "Column {} not found in schema", + columns_missing_in_schema.join(", ") + ) + .to_string(), + )); + } + Ok(()) + } +} + +pub struct EncryptionPropertiesBuilder { + footer_key: EncryptionKey, + column_keys: HashMap<String, EncryptionKey>, + aad_prefix: Option<Vec<u8>>, + encrypt_footer: bool, + store_aad_prefix: bool, +} + +impl EncryptionPropertiesBuilder { + pub fn new(footer_key: Vec<u8>) -> EncryptionPropertiesBuilder { + Self { + footer_key: EncryptionKey::new(footer_key), + column_keys: HashMap::default(), + aad_prefix: None, + encrypt_footer: true, + store_aad_prefix: true, + } + } + + pub fn with_plaintext_footer(mut self, plaintext_footer: bool) -> Self { Review Comment: These methods and the ones on `FileEncryptionProperties` are probably the main ones users will need to understand for encryption, so we should document them all. ########## parquet/src/encryption/encrypt.rs: ########## @@ -0,0 +1,283 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::encryption::ciphers::{BlockEncryptor, RingGcmBlockEncryptor}; +use crate::errors::{ParquetError, Result}; +use crate::file::column_crypto_metadata::{ColumnCryptoMetaData, EncryptionWithColumnKey}; +use crate::schema::types::{ColumnDescPtr, SchemaDescriptor}; +use crate::thrift::TSerializable; +use ring::rand::{SecureRandom, SystemRandom}; +use std::collections::{HashMap, HashSet}; +use std::io::Write; +use thrift::protocol::TCompactOutputProtocol; + +#[derive(Debug, Clone, PartialEq)] +pub struct EncryptionKey { + key: Vec<u8>, + key_metadata: Option<Vec<u8>>, +} + +impl EncryptionKey { + pub fn new(key: Vec<u8>) -> EncryptionKey { + Self { + key, + key_metadata: None, + } + } + + pub fn with_metadata(mut self, metadata: Vec<u8>) -> Self { + self.key_metadata = Some(metadata); + self + } + + pub fn key(&self) -> &Vec<u8> { + &self.key + } +} + +#[derive(Debug, Clone, PartialEq)] +pub struct FileEncryptionProperties { + encrypt_footer: bool, + footer_key: EncryptionKey, + column_keys: HashMap<String, EncryptionKey>, + aad_prefix: Option<Vec<u8>>, + store_aad_prefix: bool, +} + +impl FileEncryptionProperties { + pub fn builder(footer_key: Vec<u8>) -> EncryptionPropertiesBuilder { + EncryptionPropertiesBuilder::new(footer_key) + } + + pub fn encrypt_footer(&self) -> bool { + self.encrypt_footer + } + + pub fn footer_key_metadata(&self) -> Option<&Vec<u8>> { + self.footer_key.key_metadata.as_ref() + } + + pub fn aad_prefix(&self) -> Option<&Vec<u8>> { + self.aad_prefix.as_ref() + } + + pub fn store_aad_prefix(&self) -> bool { + self.store_aad_prefix && self.aad_prefix.is_some() + } + + /// Checks if columns that are to be encrypted are present in schema + #[cfg(feature = "encryption")] + pub(crate) fn encrypted_columns_in_schema( + &self, + schema: SchemaDescriptor, + ) -> std::result::Result<(), ParquetError> { + let schema_columns = schema + .columns() + .iter() + .map(|c| c.path().string()) + .collect::<HashSet<_>>(); + let encryption_columns = self + .column_keys + .keys() + .cloned() + .collect::<HashSet<String>>(); + if !encryption_columns.is_subset(&schema_columns) { + let mut columns_missing_in_schema = encryption_columns + .difference(&schema_columns) + .cloned() + .collect::<Vec<String>>(); + columns_missing_in_schema.sort(); + return Err(ParquetError::General( + format!( + "Column {} not found in schema", + columns_missing_in_schema.join(", ") + ) + .to_string(), + )); + } + Ok(()) + } +} + +pub struct EncryptionPropertiesBuilder { + footer_key: EncryptionKey, + column_keys: HashMap<String, EncryptionKey>, + aad_prefix: Option<Vec<u8>>, + encrypt_footer: bool, + store_aad_prefix: bool, +} + +impl EncryptionPropertiesBuilder { + pub fn new(footer_key: Vec<u8>) -> EncryptionPropertiesBuilder { + Self { + footer_key: EncryptionKey::new(footer_key), + column_keys: HashMap::default(), + aad_prefix: None, + encrypt_footer: true, + store_aad_prefix: true, + } + } + + pub fn with_plaintext_footer(mut self, plaintext_footer: bool) -> Self { + self.encrypt_footer = !plaintext_footer; + self + } + + pub fn with_footer_key_metadata(mut self, metadata: Vec<u8>) -> Self { + self.footer_key = self.footer_key.with_metadata(metadata); + self + } + + pub fn with_column_key(mut self, column_path: String, encryption_key: EncryptionKey) -> Self { + self.column_keys.insert(column_path, encryption_key); + self + } + + pub fn with_aad_prefix_storage(mut self, store_aad_prefix: bool) -> Self { + self.store_aad_prefix = store_aad_prefix; + self + } + + pub fn build(self) -> FileEncryptionProperties { + FileEncryptionProperties { + encrypt_footer: self.encrypt_footer, + footer_key: self.footer_key, + column_keys: self.column_keys, + aad_prefix: self.aad_prefix, + store_aad_prefix: self.store_aad_prefix, + } + } +} + +#[derive(Debug)] +pub struct FileEncryptor { + properties: FileEncryptionProperties, + aad_file_unique: Vec<u8>, + file_aad: Vec<u8>, +} + +impl FileEncryptor { + pub(crate) fn new(properties: FileEncryptionProperties) -> Result<Self> { + // Generate unique AAD for file + let rng = SystemRandom::new(); + let mut aad_file_unique = vec![0u8; 8]; + rng.fill(&mut aad_file_unique)?; + + let file_aad = match properties.aad_prefix.as_ref() { + None => aad_file_unique.clone(), + Some(aad_prefix) => [aad_prefix.clone(), aad_file_unique.clone()].concat(), + }; + + Ok(Self { + properties, + aad_file_unique, + file_aad, + }) + } + + pub fn properties(&self) -> &FileEncryptionProperties { + &self.properties + } + + pub fn file_aad(&self) -> &[u8] { Review Comment: We should document the difference between `file_aad` and `aad_file_unique` ########## parquet/src/encryption/encrypt.rs: ########## @@ -0,0 +1,283 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::encryption::ciphers::{BlockEncryptor, RingGcmBlockEncryptor}; +use crate::errors::{ParquetError, Result}; +use crate::file::column_crypto_metadata::{ColumnCryptoMetaData, EncryptionWithColumnKey}; +use crate::schema::types::{ColumnDescPtr, SchemaDescriptor}; +use crate::thrift::TSerializable; +use ring::rand::{SecureRandom, SystemRandom}; +use std::collections::{HashMap, HashSet}; +use std::io::Write; +use thrift::protocol::TCompactOutputProtocol; + +#[derive(Debug, Clone, PartialEq)] +pub struct EncryptionKey { + key: Vec<u8>, + key_metadata: Option<Vec<u8>>, +} + +impl EncryptionKey { + pub fn new(key: Vec<u8>) -> EncryptionKey { + Self { + key, + key_metadata: None, + } + } + + pub fn with_metadata(mut self, metadata: Vec<u8>) -> Self { + self.key_metadata = Some(metadata); + self + } + + pub fn key(&self) -> &Vec<u8> { + &self.key + } +} + +#[derive(Debug, Clone, PartialEq)] +pub struct FileEncryptionProperties { + encrypt_footer: bool, Review Comment: I think `encrypt_footer` is just ignored at the moment. We should raise an error somewhere if we try to write with it set to false. ########## parquet/src/arrow/arrow_writer/mod.rs: ########## @@ -3527,4 +3700,199 @@ mod tests { .unwrap(); assert_eq!(batches.len(), 0); } + + #[cfg(feature = "encryption")] + #[test] + fn test_uniform_encryption_roundtrip() { Review Comment: We should move these tests out to a test library, similar to #7279 ########## parquet/src/arrow/arrow_writer/mod.rs: ########## @@ -457,20 +476,49 @@ type SharedColumnChunk = Arc<Mutex<ArrowColumnChunkData>>; #[derive(Default)] struct ArrowPageWriter { buffer: SharedColumnChunk, + #[cfg(feature = "encryption")] + page_encryptor: Option<PageEncryptor>, + #[cfg(not(feature = "encryption"))] + page_encryptor: Option<Never>, +} + +#[cfg(feature = "encryption")] +impl ArrowPageWriter { + pub fn with_encryptor(mut self, page_encryptor: Option<PageEncryptor>) -> Self { + self.page_encryptor = page_encryptor; + self + } } impl PageWriter for ArrowPageWriter { fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> { let mut buf = self.buffer.try_lock().unwrap(); - let page_header = page.to_thrift_header(); - let header = { - let mut header = Vec::with_capacity(1024); - let mut protocol = TCompactOutputProtocol::new(&mut header); - page_header.write_to_out_protocol(&mut protocol)?; - Bytes::from(header) + + let data = match self.page_encryptor.as_ref() { + #[cfg(feature = "encryption")] + Some(encryptor) => { + let encrypted_buffer = encryptor.encrypt_page(&page)?; + Bytes::from(encrypted_buffer) + } + _ => page.compressed_page().buffer().clone(), + }; + + let mut page_header = page.to_thrift_header(); + page_header.compressed_page_size = data.len() as i32; + + let mut header = Vec::with_capacity(1024); + match self.page_encryptor.as_ref() { Review Comment: I wonder if we could tidy this up by having something like a `PageModuleWriter` trait that `PageEncryptor` could implement, and also have a non-encrypted implementation to avoid the need for all the `#[cfg(feature = "encryption")]` ########## parquet/src/encryption/encrypt.rs: ########## @@ -0,0 +1,283 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::encryption::ciphers::{BlockEncryptor, RingGcmBlockEncryptor}; +use crate::errors::{ParquetError, Result}; +use crate::file::column_crypto_metadata::{ColumnCryptoMetaData, EncryptionWithColumnKey}; +use crate::schema::types::{ColumnDescPtr, SchemaDescriptor}; +use crate::thrift::TSerializable; +use ring::rand::{SecureRandom, SystemRandom}; +use std::collections::{HashMap, HashSet}; +use std::io::Write; +use thrift::protocol::TCompactOutputProtocol; + +#[derive(Debug, Clone, PartialEq)] +pub struct EncryptionKey { + key: Vec<u8>, + key_metadata: Option<Vec<u8>>, +} + +impl EncryptionKey { + pub fn new(key: Vec<u8>) -> EncryptionKey { + Self { + key, + key_metadata: None, + } + } + + pub fn with_metadata(mut self, metadata: Vec<u8>) -> Self { + self.key_metadata = Some(metadata); + self + } + + pub fn key(&self) -> &Vec<u8> { + &self.key + } +} + +#[derive(Debug, Clone, PartialEq)] +pub struct FileEncryptionProperties { + encrypt_footer: bool, + footer_key: EncryptionKey, + column_keys: HashMap<String, EncryptionKey>, + aad_prefix: Option<Vec<u8>>, + store_aad_prefix: bool, +} + +impl FileEncryptionProperties { + pub fn builder(footer_key: Vec<u8>) -> EncryptionPropertiesBuilder { + EncryptionPropertiesBuilder::new(footer_key) + } + + pub fn encrypt_footer(&self) -> bool { + self.encrypt_footer + } + + pub fn footer_key_metadata(&self) -> Option<&Vec<u8>> { + self.footer_key.key_metadata.as_ref() + } + + pub fn aad_prefix(&self) -> Option<&Vec<u8>> { + self.aad_prefix.as_ref() + } + + pub fn store_aad_prefix(&self) -> bool { + self.store_aad_prefix && self.aad_prefix.is_some() + } + + /// Checks if columns that are to be encrypted are present in schema + #[cfg(feature = "encryption")] + pub(crate) fn encrypted_columns_in_schema( + &self, + schema: SchemaDescriptor, + ) -> std::result::Result<(), ParquetError> { + let schema_columns = schema + .columns() + .iter() + .map(|c| c.path().string()) + .collect::<HashSet<_>>(); + let encryption_columns = self + .column_keys + .keys() + .cloned() + .collect::<HashSet<String>>(); + if !encryption_columns.is_subset(&schema_columns) { + let mut columns_missing_in_schema = encryption_columns + .difference(&schema_columns) + .cloned() + .collect::<Vec<String>>(); + columns_missing_in_schema.sort(); + return Err(ParquetError::General( + format!( + "Column {} not found in schema", Review Comment: This error could probably provide a bit more context, maybe something like: ```suggestion "The following columns with encryption keys specified were not found in the schema: {}", ``` ########## parquet/src/encryption/encrypt.rs: ########## @@ -0,0 +1,283 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::encryption::ciphers::{BlockEncryptor, RingGcmBlockEncryptor}; +use crate::errors::{ParquetError, Result}; +use crate::file::column_crypto_metadata::{ColumnCryptoMetaData, EncryptionWithColumnKey}; +use crate::schema::types::{ColumnDescPtr, SchemaDescriptor}; +use crate::thrift::TSerializable; +use ring::rand::{SecureRandom, SystemRandom}; +use std::collections::{HashMap, HashSet}; +use std::io::Write; +use thrift::protocol::TCompactOutputProtocol; + +#[derive(Debug, Clone, PartialEq)] +pub struct EncryptionKey { + key: Vec<u8>, + key_metadata: Option<Vec<u8>>, +} + +impl EncryptionKey { + pub fn new(key: Vec<u8>) -> EncryptionKey { + Self { + key, + key_metadata: None, + } + } + + pub fn with_metadata(mut self, metadata: Vec<u8>) -> Self { + self.key_metadata = Some(metadata); + self + } + + pub fn key(&self) -> &Vec<u8> { + &self.key + } +} + +#[derive(Debug, Clone, PartialEq)] +pub struct FileEncryptionProperties { + encrypt_footer: bool, + footer_key: EncryptionKey, + column_keys: HashMap<String, EncryptionKey>, + aad_prefix: Option<Vec<u8>>, + store_aad_prefix: bool, +} + +impl FileEncryptionProperties { + pub fn builder(footer_key: Vec<u8>) -> EncryptionPropertiesBuilder { + EncryptionPropertiesBuilder::new(footer_key) + } + + pub fn encrypt_footer(&self) -> bool { + self.encrypt_footer + } + + pub fn footer_key_metadata(&self) -> Option<&Vec<u8>> { + self.footer_key.key_metadata.as_ref() + } + + pub fn aad_prefix(&self) -> Option<&Vec<u8>> { + self.aad_prefix.as_ref() + } + + pub fn store_aad_prefix(&self) -> bool { + self.store_aad_prefix && self.aad_prefix.is_some() + } + + /// Checks if columns that are to be encrypted are present in schema + #[cfg(feature = "encryption")] + pub(crate) fn encrypted_columns_in_schema( Review Comment: This sounds a bit like it's getting the encrypted columns, maybe something like `validate_encrypted_column_names` would be a better name? ########## parquet/src/encryption/encrypt.rs: ########## @@ -0,0 +1,283 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::encryption::ciphers::{BlockEncryptor, RingGcmBlockEncryptor}; +use crate::errors::{ParquetError, Result}; +use crate::file::column_crypto_metadata::{ColumnCryptoMetaData, EncryptionWithColumnKey}; +use crate::schema::types::{ColumnDescPtr, SchemaDescriptor}; +use crate::thrift::TSerializable; +use ring::rand::{SecureRandom, SystemRandom}; +use std::collections::{HashMap, HashSet}; +use std::io::Write; +use thrift::protocol::TCompactOutputProtocol; + +#[derive(Debug, Clone, PartialEq)] +pub struct EncryptionKey { + key: Vec<u8>, + key_metadata: Option<Vec<u8>>, +} + +impl EncryptionKey { + pub fn new(key: Vec<u8>) -> EncryptionKey { + Self { + key, + key_metadata: None, + } + } + + pub fn with_metadata(mut self, metadata: Vec<u8>) -> Self { + self.key_metadata = Some(metadata); + self + } + + pub fn key(&self) -> &Vec<u8> { + &self.key + } +} + +#[derive(Debug, Clone, PartialEq)] +pub struct FileEncryptionProperties { + encrypt_footer: bool, + footer_key: EncryptionKey, + column_keys: HashMap<String, EncryptionKey>, + aad_prefix: Option<Vec<u8>>, + store_aad_prefix: bool, +} + +impl FileEncryptionProperties { + pub fn builder(footer_key: Vec<u8>) -> EncryptionPropertiesBuilder { + EncryptionPropertiesBuilder::new(footer_key) + } + + pub fn encrypt_footer(&self) -> bool { + self.encrypt_footer + } + + pub fn footer_key_metadata(&self) -> Option<&Vec<u8>> { + self.footer_key.key_metadata.as_ref() + } + + pub fn aad_prefix(&self) -> Option<&Vec<u8>> { + self.aad_prefix.as_ref() + } + + pub fn store_aad_prefix(&self) -> bool { + self.store_aad_prefix && self.aad_prefix.is_some() + } + + /// Checks if columns that are to be encrypted are present in schema + #[cfg(feature = "encryption")] + pub(crate) fn encrypted_columns_in_schema( + &self, + schema: SchemaDescriptor, + ) -> std::result::Result<(), ParquetError> { + let schema_columns = schema + .columns() + .iter() + .map(|c| c.path().string()) + .collect::<HashSet<_>>(); + let encryption_columns = self + .column_keys + .keys() + .cloned() + .collect::<HashSet<String>>(); + if !encryption_columns.is_subset(&schema_columns) { + let mut columns_missing_in_schema = encryption_columns + .difference(&schema_columns) + .cloned() + .collect::<Vec<String>>(); + columns_missing_in_schema.sort(); + return Err(ParquetError::General( + format!( + "Column {} not found in schema", + columns_missing_in_schema.join(", ") + ) + .to_string(), + )); + } + Ok(()) + } +} + +pub struct EncryptionPropertiesBuilder { + footer_key: EncryptionKey, + column_keys: HashMap<String, EncryptionKey>, + aad_prefix: Option<Vec<u8>>, + encrypt_footer: bool, + store_aad_prefix: bool, +} + +impl EncryptionPropertiesBuilder { + pub fn new(footer_key: Vec<u8>) -> EncryptionPropertiesBuilder { + Self { + footer_key: EncryptionKey::new(footer_key), + column_keys: HashMap::default(), + aad_prefix: None, + encrypt_footer: true, + store_aad_prefix: true, + } + } + + pub fn with_plaintext_footer(mut self, plaintext_footer: bool) -> Self { + self.encrypt_footer = !plaintext_footer; + self + } + + pub fn with_footer_key_metadata(mut self, metadata: Vec<u8>) -> Self { + self.footer_key = self.footer_key.with_metadata(metadata); + self + } + + pub fn with_column_key(mut self, column_path: String, encryption_key: EncryptionKey) -> Self { Review Comment: When we document this, we should make it clear that specifying any column encryption key means that any columns without a key provided will be un-encrypted. ########## parquet/src/encryption/encrypt.rs: ########## @@ -0,0 +1,283 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::encryption::ciphers::{BlockEncryptor, RingGcmBlockEncryptor}; +use crate::errors::{ParquetError, Result}; +use crate::file::column_crypto_metadata::{ColumnCryptoMetaData, EncryptionWithColumnKey}; +use crate::schema::types::{ColumnDescPtr, SchemaDescriptor}; +use crate::thrift::TSerializable; +use ring::rand::{SecureRandom, SystemRandom}; +use std::collections::{HashMap, HashSet}; +use std::io::Write; +use thrift::protocol::TCompactOutputProtocol; + +#[derive(Debug, Clone, PartialEq)] +pub struct EncryptionKey { + key: Vec<u8>, + key_metadata: Option<Vec<u8>>, +} + +impl EncryptionKey { + pub fn new(key: Vec<u8>) -> EncryptionKey { + Self { + key, + key_metadata: None, + } + } + + pub fn with_metadata(mut self, metadata: Vec<u8>) -> Self { + self.key_metadata = Some(metadata); + self + } + + pub fn key(&self) -> &Vec<u8> { + &self.key + } +} + +#[derive(Debug, Clone, PartialEq)] +pub struct FileEncryptionProperties { + encrypt_footer: bool, + footer_key: EncryptionKey, + column_keys: HashMap<String, EncryptionKey>, + aad_prefix: Option<Vec<u8>>, + store_aad_prefix: bool, +} + +impl FileEncryptionProperties { + pub fn builder(footer_key: Vec<u8>) -> EncryptionPropertiesBuilder { + EncryptionPropertiesBuilder::new(footer_key) + } + + pub fn encrypt_footer(&self) -> bool { + self.encrypt_footer + } + + pub fn footer_key_metadata(&self) -> Option<&Vec<u8>> { + self.footer_key.key_metadata.as_ref() + } + + pub fn aad_prefix(&self) -> Option<&Vec<u8>> { + self.aad_prefix.as_ref() + } + + pub fn store_aad_prefix(&self) -> bool { + self.store_aad_prefix && self.aad_prefix.is_some() + } + + /// Checks if columns that are to be encrypted are present in schema + #[cfg(feature = "encryption")] + pub(crate) fn encrypted_columns_in_schema( + &self, + schema: SchemaDescriptor, + ) -> std::result::Result<(), ParquetError> { + let schema_columns = schema + .columns() + .iter() + .map(|c| c.path().string()) + .collect::<HashSet<_>>(); + let encryption_columns = self + .column_keys + .keys() + .cloned() + .collect::<HashSet<String>>(); + if !encryption_columns.is_subset(&schema_columns) { + let mut columns_missing_in_schema = encryption_columns + .difference(&schema_columns) + .cloned() + .collect::<Vec<String>>(); + columns_missing_in_schema.sort(); + return Err(ParquetError::General( + format!( + "Column {} not found in schema", + columns_missing_in_schema.join(", ") + ) + .to_string(), + )); + } + Ok(()) + } +} + +pub struct EncryptionPropertiesBuilder { + footer_key: EncryptionKey, + column_keys: HashMap<String, EncryptionKey>, + aad_prefix: Option<Vec<u8>>, + encrypt_footer: bool, + store_aad_prefix: bool, +} + +impl EncryptionPropertiesBuilder { + pub fn new(footer_key: Vec<u8>) -> EncryptionPropertiesBuilder { + Self { + footer_key: EncryptionKey::new(footer_key), + column_keys: HashMap::default(), + aad_prefix: None, + encrypt_footer: true, + store_aad_prefix: true, + } + } + + pub fn with_plaintext_footer(mut self, plaintext_footer: bool) -> Self { + self.encrypt_footer = !plaintext_footer; + self + } + + pub fn with_footer_key_metadata(mut self, metadata: Vec<u8>) -> Self { + self.footer_key = self.footer_key.with_metadata(metadata); + self + } + + pub fn with_column_key(mut self, column_path: String, encryption_key: EncryptionKey) -> Self { + self.column_keys.insert(column_path, encryption_key); + self + } + + pub fn with_aad_prefix_storage(mut self, store_aad_prefix: bool) -> Self { + self.store_aad_prefix = store_aad_prefix; + self + } + + pub fn build(self) -> FileEncryptionProperties { + FileEncryptionProperties { + encrypt_footer: self.encrypt_footer, + footer_key: self.footer_key, + column_keys: self.column_keys, + aad_prefix: self.aad_prefix, + store_aad_prefix: self.store_aad_prefix, + } + } +} + +#[derive(Debug)] +pub struct FileEncryptor { Review Comment: I don't think this needs to be pub ```suggestion pub(crate) struct FileEncryptor { ``` ########## parquet/src/file/writer.rs: ########## @@ -171,19 +183,37 @@ impl<W: Write + Send> SerializedFileWriter<W> { /// Creates new file writer. pub fn new(buf: W, schema: TypePtr, properties: WriterPropertiesPtr) -> Result<Self> { let mut buf = TrackedWrite::new(buf); - Self::start_file(&mut buf)?; + + #[cfg(feature = "encryption")] + let file_encryptor = match properties.file_encryption_properties.as_ref() { + None => None, + Some(encryption_props) => Some(Arc::new(FileEncryptor::new(encryption_props.clone())?)), + }; + + #[cfg(feature = "encryption")] + if properties.file_encryption_properties.is_some() { + properties + .file_encryption_properties + .clone() + .unwrap() + .encrypted_columns_in_schema(SchemaDescriptor::new(schema.clone()))?; + } + + Self::start_file(&properties, &mut buf)?; Ok(Self { buf, schema: schema.clone(), descr: Arc::new(SchemaDescriptor::new(schema)), - props: properties, + props: properties.clone(), Review Comment: I don't think this clone is needed ########## parquet/src/file/writer.rs: ########## @@ -523,12 +585,40 @@ impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> { ) -> Result<C>, { self.assert_previous_writer_closed()?; + + #[cfg(feature = "encryption")] + let file_encryptor = self.file_encryptor.clone(); + #[cfg(feature = "encryption")] + let row_group_index = self.row_group_index as usize; + #[cfg(feature = "encryption")] + let column_index = self.column_index; + Ok(match self.next_column_desc() { Some(column) => { let props = self.props.clone(); let (buf, on_close) = self.get_on_close(); - let page_writer = Box::new(SerializedPageWriter::new(buf)); - Some(factory(column, props, page_writer, Box::new(on_close))?) + + #[cfg(feature = "encryption")] + let page_encryptor = PageEncryptor::create_if_column_encrypted( + &file_encryptor, + row_group_index, + column_index, + column.path().string(), + ); + + #[cfg(feature = "encryption")] + let page_writer = + SerializedPageWriter::new(buf).with_page_encryptor(page_encryptor); + + #[cfg(not(feature = "encryption"))] + let page_writer = SerializedPageWriter::new(buf); Review Comment: ```suggestion let page_writer = SerializedPageWriter::new(buf); #[cfg(feature = "encryption")] let page_writer = page_writer.with_page_encryptor(page_encryptor); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org