adamreeve commented on code in PR #6637:
URL: https://github.com/apache/arrow-rs/pull/6637#discussion_r1986422042


##########
parquet/src/arrow/arrow_reader/mod.rs:
##########
@@ -670,14 +691,59 @@ impl<T: ChunkReader + 'static> Iterator for 
ReaderPageIterator<T> {
         let meta = rg.column(self.column_idx);
         let offset_index = self.metadata.offset_index();
         // `offset_index` may not exist and `i[rg_idx]` will be empty.
-        // To avoid `i[rg_idx][self.oolumn_idx`] panic, we need to filter out 
empty `i[rg_idx]`.
+        // To avoid `i[rg_idx][self.column_idx`] panic, we need to filter out 
empty `i[rg_idx]`.
         let page_locations = offset_index
             .filter(|i| !i[rg_idx].is_empty())
             .map(|i| i[rg_idx][self.column_idx].page_locations.clone());
         let total_rows = rg.num_rows() as usize;
         let reader = self.reader.clone();
 
+        #[cfg(feature = "encryption")]
+        let crypto_context = if let Some(file_decryptor) = 
self.metadata.file_decryptor() {
+            let column_name = self
+                .metadata
+                .file_metadata()
+                .schema_descr()
+                .column(self.column_idx);
+
+            if file_decryptor.is_column_encrypted(column_name.name()) {
+                let data_decryptor = 
file_decryptor.get_column_data_decryptor(column_name.name());
+                let data_decryptor = match data_decryptor {
+                    Ok(data_decryptor) => data_decryptor,
+                    Err(err) => return Some(Err(err)),
+                };
+
+                let metadata_decryptor =
+                    
file_decryptor.get_column_metadata_decryptor(column_name.name());
+                let metadata_decryptor = match metadata_decryptor {
+                    Ok(metadata_decryptor) => metadata_decryptor,
+                    Err(err) => return Some(Err(err)),
+                };
+
+                let crypto_context = CryptoContext::new(
+                    rg_idx,
+                    self.column_idx,
+                    data_decryptor,
+                    metadata_decryptor,
+                    file_decryptor.file_aad().clone(),
+                );
+                Some(Arc::new(crypto_context))
+            } else {
+                None
+            }
+        } else {
+            None
+        };
+
         let ret = SerializedPageReader::new(reader, meta, total_rows, 
page_locations);
+
+        #[cfg(feature = "encryption")]
+        {
+            let ret = Ok(ret.unwrap().with_crypto_context(crypto_context));

Review Comment:
   Unwrap here could panic. This section including the `#[cfg(not(feature = 
"encryption"))]` part below could be simplified to something like:
   ```rust
           #[cfg(feature = "encryption")]
           let ret = ret.map(|reader| 
reader.with_crypto_context(crypto_context));
   
           Some(ret.map(|x| Box::new(x) as _))
   
   ```



##########
parquet/src/encryption/decryption.rs:
##########
@@ -0,0 +1,264 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::encryption::ciphers::{BlockDecryptor, RingGcmBlockDecryptor};
+use crate::encryption::modules::{create_module_aad, ModuleType};
+use crate::errors::{ParquetError, Result};
+use std::collections::HashMap;
+use std::io::Read;
+use std::sync::Arc;
+
+pub fn read_and_decrypt<T: Read>(
+    decryptor: &Arc<dyn BlockDecryptor>,
+    input: &mut T,
+    aad: &[u8],
+) -> Result<Vec<u8>> {
+    let mut len_bytes = [0; 4];
+    input.read_exact(&mut len_bytes)?;
+    let ciphertext_len = u32::from_le_bytes(len_bytes) as usize;
+    let mut ciphertext = vec![0; 4 + ciphertext_len];
+    input.read_exact(&mut ciphertext[4..])?;
+
+    decryptor.decrypt(&ciphertext, aad.as_ref())
+}
+
+// CryptoContext is a data structure that holds the context required to
+// decrypt parquet modules (data pages, dictionary pages, etc.).
+#[derive(Debug, Clone)]
+pub struct CryptoContext {
+    pub(crate) row_group_ordinal: usize,
+    pub(crate) column_ordinal: usize,
+    pub(crate) page_ordinal: Option<usize>,
+    pub(crate) dictionary_page: bool,
+    // We have separate data and metadata decryptors because
+    // in GCM CTR mode, the metadata and data pages use
+    // different algorithms.
+    data_decryptor: Arc<dyn BlockDecryptor>,
+    metadata_decryptor: Arc<dyn BlockDecryptor>,
+    file_aad: Vec<u8>,
+}
+
+impl CryptoContext {
+    pub fn new(
+        row_group_ordinal: usize,
+        column_ordinal: usize,
+        data_decryptor: Arc<dyn BlockDecryptor>,
+        metadata_decryptor: Arc<dyn BlockDecryptor>,
+        file_aad: Vec<u8>,
+    ) -> Self {
+        Self {
+            row_group_ordinal,
+            column_ordinal,
+            page_ordinal: None,
+            dictionary_page: false,
+            data_decryptor,
+            metadata_decryptor,
+            file_aad,
+        }
+    }
+
+    pub fn with_page_ordinal(&self, page_ordinal: usize) -> Self {
+        Self {
+            row_group_ordinal: self.row_group_ordinal,
+            column_ordinal: self.column_ordinal,
+            page_ordinal: Some(page_ordinal),
+            dictionary_page: false,
+            data_decryptor: self.data_decryptor.clone(),
+            metadata_decryptor: self.metadata_decryptor.clone(),
+            file_aad: self.file_aad.clone(),
+        }
+    }
+
+    pub(crate) fn create_page_header_aad(&self) -> Result<Vec<u8>> {
+        let module_type = if self.dictionary_page {
+            ModuleType::DictionaryPageHeader
+        } else {
+            ModuleType::DataPageHeader
+        };
+
+        create_module_aad(
+            self.file_aad(),
+            module_type,
+            self.row_group_ordinal,
+            self.column_ordinal,
+            self.page_ordinal,
+        )
+    }
+
+    pub(crate) fn create_page_aad(&self) -> Result<Vec<u8>> {
+        let module_type = if self.dictionary_page {
+            ModuleType::DictionaryPage
+        } else {
+            ModuleType::DataPage
+        };
+
+        create_module_aad(
+            self.file_aad(),
+            module_type,
+            self.row_group_ordinal,
+            self.column_ordinal,
+            self.page_ordinal,
+        )
+    }
+
+    pub fn for_dictionary_page(&self) -> Self {
+        Self {
+            row_group_ordinal: self.row_group_ordinal,
+            column_ordinal: self.column_ordinal,
+            page_ordinal: self.page_ordinal,
+            dictionary_page: true,
+            data_decryptor: self.data_decryptor.clone(),
+            metadata_decryptor: self.metadata_decryptor.clone(),
+            file_aad: self.file_aad.clone(),
+        }
+    }
+
+    pub fn data_decryptor(&self) -> &Arc<dyn BlockDecryptor> {
+        &self.data_decryptor
+    }
+
+    pub fn metadata_decryptor(&self) -> &Arc<dyn BlockDecryptor> {
+        &self.metadata_decryptor
+    }
+
+    pub fn file_aad(&self) -> &Vec<u8> {
+        &self.file_aad
+    }
+}
+
+/// FileDecryptionProperties hold keys and AAD data required to decrypt a 
Parquet file.
+#[derive(Debug, Clone, PartialEq)]
+pub struct FileDecryptionProperties {
+    footer_key: Vec<u8>,
+    column_keys: HashMap<String, Vec<u8>>,
+    aad_prefix: Option<Vec<u8>>,
+}
+
+impl FileDecryptionProperties {
+    /// Returns a new FileDecryptionProperties builder
+    pub fn builder(footer_key: Vec<u8>) -> DecryptionPropertiesBuilder {
+        DecryptionPropertiesBuilder::new(footer_key)
+    }
+}
+
+pub struct DecryptionPropertiesBuilder {
+    footer_key: Vec<u8>,
+    column_keys: HashMap<String, Vec<u8>>,
+    aad_prefix: Option<Vec<u8>>,
+}
+
+impl DecryptionPropertiesBuilder {
+    pub fn new(footer_key: Vec<u8>) -> DecryptionPropertiesBuilder {
+        Self {
+            footer_key,
+            column_keys: HashMap::default(),
+            aad_prefix: None,
+        }
+    }
+
+    pub fn build(self) -> Result<FileDecryptionProperties> {
+        Ok(FileDecryptionProperties {
+            footer_key: self.footer_key,
+            column_keys: self.column_keys,
+            aad_prefix: self.aad_prefix,
+        })
+    }
+
+    pub fn with_aad_prefix(mut self, value: Vec<u8>) -> Self {
+        self.aad_prefix = Some(value);
+        self
+    }
+
+    pub fn with_column_key(mut self, column_name: &str, decryption_key: 
Vec<u8>) -> Self {
+        self.column_keys
+            .insert(column_name.to_string(), decryption_key);
+        self
+    }
+}
+
+#[derive(Clone, Debug)]
+pub struct FileDecryptor {
+    decryption_properties: FileDecryptionProperties,
+    footer_decryptor: Option<Arc<dyn BlockDecryptor>>,
+    file_aad: Vec<u8>,
+}
+
+impl PartialEq for FileDecryptor {
+    fn eq(&self, other: &Self) -> bool {
+        self.decryption_properties == other.decryption_properties
+    }
+}
+
+impl FileDecryptor {
+    pub(crate) fn new(
+        decryption_properties: &FileDecryptionProperties,
+        aad_file_unique: Vec<u8>,
+        aad_prefix: Vec<u8>,
+    ) -> Result<Self, ParquetError> {
+        let file_aad = [aad_prefix.as_slice(), 
aad_file_unique.as_slice()].concat();
+        // todo decr: if no key available yet (not set in properties, should 
be retrieved from metadata)
+        let footer_decryptor = 
RingGcmBlockDecryptor::new(&decryption_properties.footer_key)
+            .map_err(|e| {
+                let msg = String::from("Invalid footer key. ")
+                    + e.to_string().replace("Parquet error: ", "").as_str();
+                ParquetError::General(msg)

Review Comment:
   ```suggestion
               .map_err(|e| general_err!("Invalid footer key. {}", 
e.to_string().replace("Parquet error: ", "")))?;
   ```



##########
parquet/src/arrow/arrow_reader/mod.rs:
##########
@@ -1788,6 +1859,151 @@ mod tests {
         assert!(col.value(2).is_nan());
     }
 
+    #[test]

Review Comment:
   Can we also add a test for the 
`encrypt_columns_and_footer_disable_aad_storage.parquet.encrypted` file? It 
looks like that will need a change to `get_file_decryptor` so it uses the AAD 
from the file decryption properties rather than the one from the file.
   
   And if an AAD prefix is specified in both the decryption properties and the 
file, the one in the decryption properties should be used.
   
   A test that specifying the wrong AAD prefix results in an error when the AAD 
is stored in the file would also be useful.
   
   This could possibly be a follow up change rather than needing to be done in 
this PR though.



##########
parquet/src/arrow/async_reader/mod.rs:
##########
@@ -1167,6 +1297,8 @@ mod tests {
             data: data.clone(),
             metadata: metadata.clone(),
             requests: Default::default(),
+            #[cfg(feature = "encryption")]
+            file_decryption_properties: None,

Review Comment:
   This could be tidied up by using a similar approach to what we've done 
elsewhere and making a `TestReader::new` method and 
`TestReader::with_file_decryption_properties`



##########
parquet/src/errors.rs:
##########
@@ -132,6 +132,13 @@ impl From<object_store::Error> for ParquetError {
     }
 }
 
+#[cfg(feature = "encryption")]
+impl From<ring::error::Unspecified> for ParquetError {
+    fn from(e: ring::error::Unspecified) -> ParquetError {
+        ParquetError::External(Box::new(e))

Review Comment:
   Probably a follow up piece of work rather than for this PR, but it seems 
like having an error type specifically for encryption related errors could be 
useful?



##########
parquet/src/file/metadata/mod.rs:
##########
@@ -214,6 +232,12 @@ impl ParquetMetaData {
         &self.file_metadata
     }
 
+    /// Returns file decryptor as reference.
+    #[cfg(feature = "encryption")]
+    pub fn file_decryptor(&self) -> &Option<FileDecryptor> {
+        &self.file_decryptor

Review Comment:
   It's probably a bit more convenient to work with an `Option<&FileDecryptor>`:
   ```suggestion
       pub fn file_decryptor(&self) -> Option<&FileDecryptor> {
           self.file_decryptor.as_ref()
   ```



##########
parquet/src/arrow/async_reader/store.rs:
##########
@@ -164,14 +170,36 @@ impl AsyncFileReader for ParquetObjectReader {
     // `AsyncFileReader`, the calls to `MetadataFetch::fetch` are just 
delegated to
     // `Self::get_bytes`.
     fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>> {
+        Box::pin(async move {
+            let file_size = self.meta.size;
+            let metadata = ParquetMetaDataReader::new()
+                .with_column_indexes(self.preload_column_index)
+                .with_offset_indexes(self.preload_offset_index)
+                .with_prefetch_hint(self.metadata_size_hint);
+            #[cfg(feature = "encryption")]
+            let metadata = metadata
+                
.with_decryption_properties(self.file_decryption_properties.clone().as_ref());

Review Comment:
   It looks like this `with_decryption_properties` call should be removed as 
this is now in the `get_encrypted_metadata` method.



##########
parquet/src/encryption/mod.rs:
##########
@@ -0,0 +1,23 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Encryption implementation specific to Parquet, as described
+//! in the 
[spec](https://github.com/apache/parquet-format/blob/master/Encryption.md).
+
+pub mod ciphers;
+pub mod decryption;
+pub mod modules;

Review Comment:
   I don't think the `ciphers` or `modules` modules should need to be public. 
Can these be made private? Or at least `pub(crate)`.



##########
parquet/src/file/metadata/reader.rs:
##########
@@ -372,6 +414,11 @@ impl ParquetMetaDataReader {
         mut fetch: F,
         file_size: usize,
     ) -> Result<()> {
+        #[cfg(feature = "encryption")]

Review Comment:
   The `encryption` and `not(encryption)` blocks here are identical.



##########
parquet/src/arrow/async_reader/mod.rs:
##########
@@ -175,6 +227,15 @@ impl ArrowReaderMetadata {
     ) -> Result<Self> {
         // TODO: this is all rather awkward. It would be nice if 
AsyncFileReader::get_metadata
         // took an argument to fetch the page indexes.
+        #[cfg(feature = "encryption")]
+        let mut metadata = if options.file_decryption_properties.is_some() {

Review Comment:
   With these recent changes it doesn't look like we properly handle when a 
file is encrypted but no decryption properties are provided. Here we check that 
file decryption properties are set before calling `get_encrypted_metadata`, and 
then later in `ParquetMetadataReader::decrypt_metadata` we raise an error if 
the footer is encrypted but no decryption properties are set 
(https://github.com/apache/arrow-rs/blob/276fc1a153c7a64bdf50401d007224b12ee1ddd8/parquet/src/file/metadata/reader.rs#L730).
   
   The code here also seems to have the same problem: 
https://github.com/apache/arrow-rs/blob/276fc1a153c7a64bdf50401d007224b12ee1ddd8/parquet/src/file/metadata/reader.rs#L577
   
   I think we can just remove the `if 
options.file_decryption_properties.is_some()` check, and maybe rename 
`get_encrypted_metadata` etc to indicate that the metadata isn't necessarily 
encrypted. (maybe `get_metadata_with_encryption`?). Plus 
`ParquetMetaDataReader::decrypt_metadata` also seems misnamed. 
   
   Can we fix that and add a test case? And also a test case for a file with an 
encrypted footer that's run when encryption is disabled to check we get this 
error: 
https://github.com/apache/arrow-rs/blob/276fc1a153c7a64bdf50401d007224b12ee1ddd8/parquet/src/file/metadata/reader.rs#L571



##########
parquet/src/file/metadata/mod.rs:
##########
@@ -183,11 +193,19 @@ impl ParquetMetaData {
         ParquetMetaData {
             file_metadata,
             row_groups,
+            #[cfg(feature = "encryption")]
+            file_decryptor: None,
             column_index: None,
             offset_index: None,
         }
     }
 
+    #[allow(missing_docs)]

Review Comment:
   Can this just have a doc comment added :)



##########
parquet/src/arrow/async_reader/mod.rs:
##########
@@ -2336,4 +2494,245 @@ mod tests {
         let result = reader.try_collect::<Vec<_>>().await.unwrap();
         assert_eq!(result.len(), 1);
     }
+
+    #[tokio::test]
+    #[cfg(feature = "encryption")]
+    async fn test_non_uniform_encryption_plaintext_footer() {
+        let testdata = arrow::util::test_util::parquet_test_data();
+        let path = 
format!("{testdata}/encrypt_columns_plaintext_footer.parquet.encrypted");
+        let mut file = File::open(&path).await.unwrap();
+
+        // There is always a footer key even with a plaintext footer,
+        // but this is used for signing the footer.
+        let footer_key = "0123456789012345".as_bytes().to_vec(); // 128bit/16
+        let column_1_key = "1234567890123450".as_bytes().to_vec();
+        let column_2_key = "1234567890123451".as_bytes().to_vec();
+
+        let decryption_properties = 
FileDecryptionProperties::builder(footer_key)
+            .with_column_key("double_field", column_1_key)
+            .with_column_key("float_field", column_2_key)
+            .build()
+            .unwrap();
+
+        let _ = verify_encryption_test_file_read_async(&mut file, 
decryption_properties).await;
+    }
+
+    #[tokio::test]
+    #[cfg(feature = "encryption")]
+    async fn test_misspecified_encryption_keys() {
+        let testdata = arrow::util::test_util::parquet_test_data();
+        let path = 
format!("{testdata}/encrypt_columns_and_footer.parquet.encrypted");
+
+        // There is always a footer key even with a plaintext footer,
+        // but this is used for signing the footer.
+        let footer_key = "0123456789012345".as_bytes(); // 128bit/16
+        let column_1_key = "1234567890123450".as_bytes();
+        let column_2_key = "1234567890123451".as_bytes();
+
+        // read file with keys and check for expected error message
+        async fn check_for_error(
+            expected_message: &str,
+            path: &String,
+            footer_key: &[u8],
+            column_1_key: &[u8],
+            column_2_key: &[u8],
+        ) {
+            let mut file = File::open(&path).await.unwrap();
+
+            let mut decryption_properties = 
FileDecryptionProperties::builder(footer_key.to_vec());
+
+            if column_1_key.is_empty() {
+                decryption_properties =
+                    decryption_properties.with_column_key("double_field", 
column_1_key.to_vec());
+            }
+
+            if column_2_key.is_empty() {
+                decryption_properties =
+                    decryption_properties.with_column_key("float_field", 
column_2_key.to_vec());
+            }
+
+            let decryption_properties = decryption_properties.build().unwrap();
+
+            match verify_encryption_test_file_read_async(&mut file, 
decryption_properties).await {
+                Ok(_) => {
+                    panic!("did not get expected error")
+                }
+                Err(e) => {
+                    assert_eq!(e.to_string(), expected_message);
+                }
+            }
+        }
+
+        // Too short footer key
+        check_for_error(
+            "Parquet error: Invalid footer key. Failed to create AES key",
+            &path,
+            "bad_pwd".as_bytes(),
+            column_1_key,
+            column_2_key,
+        )
+        .await;
+
+        // Wrong footer key
+        check_for_error(
+            "Parquet error: Provided footer key was unable to decrypt parquet 
footer",
+            &path,
+            "1123456789012345".as_bytes(),
+            column_1_key,
+            column_2_key,
+        )
+        .await;
+
+        // todo: should this be double_field?

Review Comment:
   These todos should all be addressed. It looks like `column_2_key` is used 
for `float_field` so the error should be about `double_field`, so something is 
wrong here.
   
   I think the problem is the `if column_1_key.is_empty()` and `if 
column_2_key.is_empty()` checks above. These should surely be `if 
!column_1_key.is_empty()` and `if !column_2_key.is_empty()`.
   
   That should also fix the expected key length error problem.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to