alamb commented on code in PR #8445:
URL: https://github.com/apache/arrow-rs/pull/8445#discussion_r2382269682


##########
parquet/src/file/metadata/thrift_gen.rs:
##########
@@ -1158,6 +1160,358 @@ impl PageHeader {
     }
 }
 
+/////////////////////////////////////////////////
+// helper functions for writing file meta data
+
+// serialize the bits of the column chunk needed for a thrift ColumnMetaData
+// struct ColumnMetaData {
+//   1: required Type type
+//   2: required list<Encoding> encodings
+//   3: required list<string> path_in_schema
+//   4: required CompressionCodec codec
+//   5: required i64 num_values
+//   6: required i64 total_uncompressed_size
+//   7: required i64 total_compressed_size
+//   8: optional list<KeyValue> key_value_metadata
+//   9: required i64 data_page_offset
+//   10: optional i64 index_page_offset
+//   11: optional i64 dictionary_page_offset
+//   12: optional Statistics statistics;
+//   13: optional list<PageEncodingStats> encoding_stats;
+//   14: optional i64 bloom_filter_offset;
+//   15: optional i32 bloom_filter_length;
+//   16: optional SizeStatistics size_statistics;
+//   17: optional GeospatialStatistics geospatial_statistics;
+// }
+pub(crate) fn serialize_column_meta_data<W: Write>(
+    column_chunk: &ColumnChunkMetaData,
+    w: &mut ThriftCompactOutputProtocol<W>,
+) -> Result<()> {
+    use crate::file::statistics::page_stats_to_thrift;
+
+    column_chunk.column_type().write_thrift_field(w, 1, 0)?;
+    column_chunk.encodings.write_thrift_field(w, 2, 1)?;
+    let path = column_chunk.column_descr.path().parts();
+    let path: Vec<&str> = path.iter().map(|v| v.as_str()).collect();
+    path.write_thrift_field(w, 3, 2)?;
+    column_chunk.compression.write_thrift_field(w, 4, 3)?;
+    column_chunk.num_values.write_thrift_field(w, 5, 4)?;
+    column_chunk
+        .total_uncompressed_size
+        .write_thrift_field(w, 6, 5)?;
+    column_chunk
+        .total_compressed_size
+        .write_thrift_field(w, 7, 6)?;
+    // no key_value_metadata here
+    let mut last_field_id = 
column_chunk.data_page_offset.write_thrift_field(w, 9, 7)?;
+    if let Some(index_page_offset) = column_chunk.index_page_offset {
+        last_field_id = index_page_offset.write_thrift_field(w, 10, 
last_field_id)?;
+    }
+    if let Some(dictionary_page_offset) = column_chunk.dictionary_page_offset {
+        last_field_id = dictionary_page_offset.write_thrift_field(w, 11, 
last_field_id)?;
+    }
+    // PageStatistics is the same as thrift Statistics, but writable
+    let stats = page_stats_to_thrift(column_chunk.statistics());
+    if let Some(stats) = stats {
+        last_field_id = stats.write_thrift_field(w, 12, last_field_id)?;
+    }
+    if let Some(page_encoding_stats) = column_chunk.page_encoding_stats() {
+        last_field_id = page_encoding_stats.write_thrift_field(w, 13, 
last_field_id)?;
+    }
+    if let Some(bloom_filter_offset) = column_chunk.bloom_filter_offset {
+        last_field_id = bloom_filter_offset.write_thrift_field(w, 14, 
last_field_id)?;
+    }
+    if let Some(bloom_filter_length) = column_chunk.bloom_filter_length {
+        last_field_id = bloom_filter_length.write_thrift_field(w, 15, 
last_field_id)?;
+    }
+
+    // SizeStatistics
+    let size_stats = if column_chunk.unencoded_byte_array_data_bytes.is_some()
+        || column_chunk.repetition_level_histogram.is_some()
+        || column_chunk.definition_level_histogram.is_some()
+    {
+        let repetition_level_histogram = column_chunk
+            .repetition_level_histogram()
+            .map(|hist| hist.clone().into_inner());
+
+        let definition_level_histogram = column_chunk
+            .definition_level_histogram()
+            .map(|hist| hist.clone().into_inner());
+
+        Some(SizeStatistics {
+            unencoded_byte_array_data_bytes: 
column_chunk.unencoded_byte_array_data_bytes,
+            repetition_level_histogram,
+            definition_level_histogram,
+        })
+    } else {
+        None
+    };
+    if let Some(size_stats) = size_stats {
+        size_stats.write_thrift_field(w, 16, last_field_id)?;
+    }
+
+    // TODO: field 17 geo spatial stats here
+    w.write_struct_end()
+}
+
+// temp struct used for writing
+pub(crate) struct FileMeta<'a> {
+    pub(crate) file_metadata: &'a crate::file::metadata::FileMetaData,
+    pub(crate) row_groups: &'a Vec<RowGroupMetaData>,
+    pub(crate) encryption_algorithm: Option<EncryptionAlgorithm>,
+    pub(crate) footer_signing_key_metadata: Option<Vec<u8>>,
+}
+
+impl<'a> WriteThrift for FileMeta<'a> {
+    const ELEMENT_TYPE: ElementType = ElementType::Struct;
+
+    #[allow(unused_assignments)]
+    fn write_thrift<W: Write>(&self, writer: &mut 
ThriftCompactOutputProtocol<W>) -> Result<()> {
+        self.file_metadata
+            .version
+            .write_thrift_field(writer, 1, 0)?;
+
+        // field 2 is schema. do depth-first traversal of tree, converting to 
SchemaElement and
+        // writing along the way.
+        let root = self.file_metadata.schema_descr().root_schema_ptr();
+        let schema_len = num_nodes(&root);
+        writer.write_field_begin(FieldType::List, 2, 1)?;
+        writer.write_list_begin(ElementType::Struct, schema_len)?;
+        // recursively write Type nodes as SchemaElements
+        write_schema(&root, writer)?;
+
+        self.file_metadata
+            .num_rows
+            .write_thrift_field(writer, 3, 2)?;
+
+        // this will call RowGroupMetaData::write_thrift
+        let mut last_field_id = self.row_groups.write_thrift_field(writer, 4, 
3)?;
+
+        if let Some(kv_metadata) = self.file_metadata.key_value_metadata() {
+            last_field_id = kv_metadata.write_thrift_field(writer, 5, 
last_field_id)?;
+        }
+        if let Some(created_by) = self.file_metadata.created_by() {
+            last_field_id = created_by.write_thrift_field(writer, 6, 
last_field_id)?;
+        }
+        if let Some(column_orders) = self.file_metadata.column_orders() {
+            last_field_id = column_orders.write_thrift_field(writer, 7, 
last_field_id)?;
+        }
+        if let Some(algo) = self.encryption_algorithm.as_ref() {
+            last_field_id = algo.write_thrift_field(writer, 8, last_field_id)?;
+        }
+        if let Some(key) = self.footer_signing_key_metadata.as_ref() {
+            key.as_slice()
+                .write_thrift_field(writer, 9, last_field_id)?;
+        }
+
+        writer.write_struct_end()
+    }
+}
+
+fn write_schema<W: Write>(
+    node: &TypePtr,
+    writer: &mut ThriftCompactOutputProtocol<W>,
+) -> Result<()> {
+    match node.as_ref() {
+        crate::schema::types::Type::PrimitiveType {
+            basic_info,
+            physical_type,
+            type_length,
+            scale,
+            precision,
+        } => {
+            let element = SchemaElement {
+                type_: Some(*physical_type),
+                type_length: if *type_length >= 0 {
+                    Some(*type_length)
+                } else {
+                    None
+                },
+                repetition_type: Some(basic_info.repetition()),
+                name: basic_info.name(),

Review Comment:
   I double checked and this does not allocate a string (uses &str) 👍 



##########
parquet/src/file/metadata/memory.rs:
##########
@@ -92,6 +92,25 @@ impl HeapSize for RowGroupMetaData {
     }
 }
 
+#[cfg(feature = "encryption")]
+impl HeapSize for ColumnChunkMetaData {
+    fn heap_size(&self) -> usize {
+        // don't count column_descr here because it is already counted in
+        // FileMetaData
+        self.encodings.heap_size()
+            + self.file_path.heap_size()
+            + self.compression.heap_size()
+            + self.statistics.heap_size()
+            + self.encoding_stats.heap_size()
+            + self.unencoded_byte_array_data_bytes.heap_size()
+            + self.repetition_level_histogram.heap_size()
+            + self.definition_level_histogram.heap_size()
+            + self.column_crypto_metadata.heap_size()

Review Comment:
   I was at first annoyed at the replication here, but the alternative is to 
`#cfg` out a different function r something which is not obviously simpler
   
   Though it would make it easier to keep these functions in sync 🤔 



##########
parquet/src/file/metadata/thrift_gen.rs:
##########
@@ -135,8 +137,8 @@ struct FileMetaData<'a> {
   5: optional list<KeyValue> key_value_metadata
   6: optional string created_by
   7: optional list<ColumnOrder> column_orders;
-  8: optional EncryptionAlgorithm<'a> encryption_algorithm
-  9: optional binary<'a> footer_signing_key_metadata
+  8: optional EncryptionAlgorithm encryption_algorithm

Review Comment:
   I haven't been following along, but what is the signficance of not using 
references (aka removing `<'a>` 🤔 



##########
parquet/src/file/metadata/mod.rs:
##########
@@ -2064,7 +2070,7 @@ mod tests {
         #[cfg(not(feature = "encryption"))]
         let base_expected_size = 2280;
         #[cfg(feature = "encryption")]
-        let base_expected_size = 2616;
+        let base_expected_size = 2712;

Review Comment:
   As a follow on, what do you think about Box'ing that -- 
`Option<Box<ColumnCryptoMetaData>>` 🤔  I have also thought recently the `#cfg`s 
for encryption make the code harder to work with (though they have the benefit 
there is no overhead if the feature is not enabled) 🤔 



##########
parquet/benches/metadata.rs:
##########
@@ -242,40 +244,36 @@ fn criterion_benchmark(c: &mut Criterion) {
     #[cfg(feature = "arrow")]
     c.bench_function("page headers", |b| {
         b.iter(|| {
-            metadata.row_groups.iter().for_each(|rg| {
-                rg.columns.iter().for_each(|col| {
-                    if let Some(col_meta) = &col.meta_data {
-                        if let Some(dict_offset) = 
col_meta.dictionary_page_offset {
-                            parquet::thrift::bench_page_header(
-                                &file_bytes.slice(dict_offset as usize..),
-                            );
-                        }
+            for rg in metadata.row_groups() {
+                for col in rg.columns() {
+                    if let Some(dict_offset) = col.dictionary_page_offset() {
                         parquet::thrift::bench_page_header(
-                            &file_bytes.slice(col_meta.data_page_offset as 
usize..),
+                            &file_bytes.slice(dict_offset as usize..),
                         );
                     }
-                });
-            });
+                    parquet::thrift::bench_page_header(
+                        &file_bytes.slice(col.data_page_offset() as usize..),
+                    );
+                }
+            }
         })
     });
 
     #[cfg(feature = "arrow")]
     c.bench_function("page headers (no stats)", |b| {

Review Comment:
   when reviewing these benchmarks, it seems like maybe it is time to remove 
benchmarks for page header statistics  (as they aren't really useful / widely 
used)



##########
parquet/src/arrow/async_writer/mod.rs:
##########
@@ -247,7 +247,7 @@ impl<W: AsyncFileWriter> AsyncArrowWriter<W> {
     /// Unlike [`Self::close`] this does not consume self
     ///
     /// Attempting to write after calling finish will result in an error
-    pub async fn finish(&mut self) -> Result<crate::format::FileMetaData> {
+    pub async fn finish(&mut self) -> Result<ParquetMetaData> {

Review Comment:
   I think that is a much more reasonable API, FWIW, as the ParquetMetadata is 
what is used in the rest of the APIs



##########
parquet/src/file/metadata/thrift_gen.rs:
##########
@@ -1158,6 +1160,358 @@ impl PageHeader {
     }
 }
 
+/////////////////////////////////////////////////
+// helper functions for writing file meta data
+
+// serialize the bits of the column chunk needed for a thrift ColumnMetaData
+// struct ColumnMetaData {
+//   1: required Type type
+//   2: required list<Encoding> encodings
+//   3: required list<string> path_in_schema
+//   4: required CompressionCodec codec
+//   5: required i64 num_values
+//   6: required i64 total_uncompressed_size
+//   7: required i64 total_compressed_size
+//   8: optional list<KeyValue> key_value_metadata
+//   9: required i64 data_page_offset
+//   10: optional i64 index_page_offset
+//   11: optional i64 dictionary_page_offset
+//   12: optional Statistics statistics;
+//   13: optional list<PageEncodingStats> encoding_stats;
+//   14: optional i64 bloom_filter_offset;
+//   15: optional i32 bloom_filter_length;
+//   16: optional SizeStatistics size_statistics;
+//   17: optional GeospatialStatistics geospatial_statistics;
+// }
+pub(crate) fn serialize_column_meta_data<W: Write>(
+    column_chunk: &ColumnChunkMetaData,
+    w: &mut ThriftCompactOutputProtocol<W>,
+) -> Result<()> {
+    use crate::file::statistics::page_stats_to_thrift;
+
+    column_chunk.column_type().write_thrift_field(w, 1, 0)?;
+    column_chunk.encodings.write_thrift_field(w, 2, 1)?;
+    let path = column_chunk.column_descr.path().parts();
+    let path: Vec<&str> = path.iter().map(|v| v.as_str()).collect();
+    path.write_thrift_field(w, 3, 2)?;
+    column_chunk.compression.write_thrift_field(w, 4, 3)?;
+    column_chunk.num_values.write_thrift_field(w, 5, 4)?;
+    column_chunk
+        .total_uncompressed_size
+        .write_thrift_field(w, 6, 5)?;
+    column_chunk
+        .total_compressed_size
+        .write_thrift_field(w, 7, 6)?;
+    // no key_value_metadata here
+    let mut last_field_id = 
column_chunk.data_page_offset.write_thrift_field(w, 9, 7)?;
+    if let Some(index_page_offset) = column_chunk.index_page_offset {
+        last_field_id = index_page_offset.write_thrift_field(w, 10, 
last_field_id)?;
+    }
+    if let Some(dictionary_page_offset) = column_chunk.dictionary_page_offset {
+        last_field_id = dictionary_page_offset.write_thrift_field(w, 11, 
last_field_id)?;
+    }
+    // PageStatistics is the same as thrift Statistics, but writable
+    let stats = page_stats_to_thrift(column_chunk.statistics());
+    if let Some(stats) = stats {
+        last_field_id = stats.write_thrift_field(w, 12, last_field_id)?;
+    }
+    if let Some(page_encoding_stats) = column_chunk.page_encoding_stats() {
+        last_field_id = page_encoding_stats.write_thrift_field(w, 13, 
last_field_id)?;
+    }
+    if let Some(bloom_filter_offset) = column_chunk.bloom_filter_offset {
+        last_field_id = bloom_filter_offset.write_thrift_field(w, 14, 
last_field_id)?;
+    }
+    if let Some(bloom_filter_length) = column_chunk.bloom_filter_length {
+        last_field_id = bloom_filter_length.write_thrift_field(w, 15, 
last_field_id)?;
+    }
+
+    // SizeStatistics
+    let size_stats = if column_chunk.unencoded_byte_array_data_bytes.is_some()
+        || column_chunk.repetition_level_histogram.is_some()
+        || column_chunk.definition_level_histogram.is_some()
+    {
+        let repetition_level_histogram = column_chunk
+            .repetition_level_histogram()
+            .map(|hist| hist.clone().into_inner());
+
+        let definition_level_histogram = column_chunk
+            .definition_level_histogram()
+            .map(|hist| hist.clone().into_inner());
+
+        Some(SizeStatistics {
+            unencoded_byte_array_data_bytes: 
column_chunk.unencoded_byte_array_data_bytes,
+            repetition_level_histogram,
+            definition_level_histogram,
+        })
+    } else {
+        None
+    };
+    if let Some(size_stats) = size_stats {
+        size_stats.write_thrift_field(w, 16, last_field_id)?;
+    }
+
+    // TODO: field 17 geo spatial stats here
+    w.write_struct_end()
+}
+
+// temp struct used for writing
+pub(crate) struct FileMeta<'a> {
+    pub(crate) file_metadata: &'a crate::file::metadata::FileMetaData,
+    pub(crate) row_groups: &'a Vec<RowGroupMetaData>,
+    pub(crate) encryption_algorithm: Option<EncryptionAlgorithm>,
+    pub(crate) footer_signing_key_metadata: Option<Vec<u8>>,
+}
+
+impl<'a> WriteThrift for FileMeta<'a> {
+    const ELEMENT_TYPE: ElementType = ElementType::Struct;
+
+    #[allow(unused_assignments)]
+    fn write_thrift<W: Write>(&self, writer: &mut 
ThriftCompactOutputProtocol<W>) -> Result<()> {
+        self.file_metadata
+            .version
+            .write_thrift_field(writer, 1, 0)?;
+
+        // field 2 is schema. do depth-first traversal of tree, converting to 
SchemaElement and
+        // writing along the way.
+        let root = self.file_metadata.schema_descr().root_schema_ptr();
+        let schema_len = num_nodes(&root);
+        writer.write_field_begin(FieldType::List, 2, 1)?;
+        writer.write_list_begin(ElementType::Struct, schema_len)?;
+        // recursively write Type nodes as SchemaElements
+        write_schema(&root, writer)?;
+
+        self.file_metadata
+            .num_rows
+            .write_thrift_field(writer, 3, 2)?;
+
+        // this will call RowGroupMetaData::write_thrift
+        let mut last_field_id = self.row_groups.write_thrift_field(writer, 4, 
3)?;
+
+        if let Some(kv_metadata) = self.file_metadata.key_value_metadata() {
+            last_field_id = kv_metadata.write_thrift_field(writer, 5, 
last_field_id)?;
+        }
+        if let Some(created_by) = self.file_metadata.created_by() {
+            last_field_id = created_by.write_thrift_field(writer, 6, 
last_field_id)?;
+        }
+        if let Some(column_orders) = self.file_metadata.column_orders() {
+            last_field_id = column_orders.write_thrift_field(writer, 7, 
last_field_id)?;
+        }
+        if let Some(algo) = self.encryption_algorithm.as_ref() {

Review Comment:
   
   
   I think we could avoid a lot of repetition if you just put the 
`#[cfg(not(feature = "encryption"))]` on these two fields, like
   
   ```suggestion
           #[cfg(feature = "encryption")]
           if let Some(algo) = self.encryption_algorithm.as_ref() {
   ```



##########
parquet/src/file/metadata/thrift_gen.rs:
##########
@@ -1158,6 +1160,358 @@ impl PageHeader {
     }
 }
 
+/////////////////////////////////////////////////
+// helper functions for writing file meta data
+
+// serialize the bits of the column chunk needed for a thrift ColumnMetaData
+// struct ColumnMetaData {
+//   1: required Type type
+//   2: required list<Encoding> encodings
+//   3: required list<string> path_in_schema
+//   4: required CompressionCodec codec
+//   5: required i64 num_values
+//   6: required i64 total_uncompressed_size
+//   7: required i64 total_compressed_size
+//   8: optional list<KeyValue> key_value_metadata
+//   9: required i64 data_page_offset
+//   10: optional i64 index_page_offset
+//   11: optional i64 dictionary_page_offset
+//   12: optional Statistics statistics;
+//   13: optional list<PageEncodingStats> encoding_stats;
+//   14: optional i64 bloom_filter_offset;
+//   15: optional i32 bloom_filter_length;
+//   16: optional SizeStatistics size_statistics;
+//   17: optional GeospatialStatistics geospatial_statistics;
+// }
+pub(crate) fn serialize_column_meta_data<W: Write>(
+    column_chunk: &ColumnChunkMetaData,
+    w: &mut ThriftCompactOutputProtocol<W>,
+) -> Result<()> {
+    use crate::file::statistics::page_stats_to_thrift;
+
+    column_chunk.column_type().write_thrift_field(w, 1, 0)?;
+    column_chunk.encodings.write_thrift_field(w, 2, 1)?;
+    let path = column_chunk.column_descr.path().parts();
+    let path: Vec<&str> = path.iter().map(|v| v.as_str()).collect();
+    path.write_thrift_field(w, 3, 2)?;
+    column_chunk.compression.write_thrift_field(w, 4, 3)?;
+    column_chunk.num_values.write_thrift_field(w, 5, 4)?;
+    column_chunk
+        .total_uncompressed_size
+        .write_thrift_field(w, 6, 5)?;
+    column_chunk
+        .total_compressed_size
+        .write_thrift_field(w, 7, 6)?;
+    // no key_value_metadata here
+    let mut last_field_id = 
column_chunk.data_page_offset.write_thrift_field(w, 9, 7)?;
+    if let Some(index_page_offset) = column_chunk.index_page_offset {
+        last_field_id = index_page_offset.write_thrift_field(w, 10, 
last_field_id)?;
+    }
+    if let Some(dictionary_page_offset) = column_chunk.dictionary_page_offset {
+        last_field_id = dictionary_page_offset.write_thrift_field(w, 11, 
last_field_id)?;
+    }
+    // PageStatistics is the same as thrift Statistics, but writable
+    let stats = page_stats_to_thrift(column_chunk.statistics());
+    if let Some(stats) = stats {
+        last_field_id = stats.write_thrift_field(w, 12, last_field_id)?;
+    }
+    if let Some(page_encoding_stats) = column_chunk.page_encoding_stats() {
+        last_field_id = page_encoding_stats.write_thrift_field(w, 13, 
last_field_id)?;
+    }
+    if let Some(bloom_filter_offset) = column_chunk.bloom_filter_offset {
+        last_field_id = bloom_filter_offset.write_thrift_field(w, 14, 
last_field_id)?;
+    }
+    if let Some(bloom_filter_length) = column_chunk.bloom_filter_length {
+        last_field_id = bloom_filter_length.write_thrift_field(w, 15, 
last_field_id)?;
+    }
+
+    // SizeStatistics
+    let size_stats = if column_chunk.unencoded_byte_array_data_bytes.is_some()
+        || column_chunk.repetition_level_histogram.is_some()
+        || column_chunk.definition_level_histogram.is_some()
+    {
+        let repetition_level_histogram = column_chunk
+            .repetition_level_histogram()
+            .map(|hist| hist.clone().into_inner());
+
+        let definition_level_histogram = column_chunk
+            .definition_level_histogram()
+            .map(|hist| hist.clone().into_inner());
+
+        Some(SizeStatistics {
+            unencoded_byte_array_data_bytes: 
column_chunk.unencoded_byte_array_data_bytes,
+            repetition_level_histogram,
+            definition_level_histogram,
+        })
+    } else {
+        None
+    };
+    if let Some(size_stats) = size_stats {
+        size_stats.write_thrift_field(w, 16, last_field_id)?;
+    }
+
+    // TODO: field 17 geo spatial stats here
+    w.write_struct_end()
+}
+
+// temp struct used for writing
+pub(crate) struct FileMeta<'a> {
+    pub(crate) file_metadata: &'a crate::file::metadata::FileMetaData,
+    pub(crate) row_groups: &'a Vec<RowGroupMetaData>,
+    pub(crate) encryption_algorithm: Option<EncryptionAlgorithm>,
+    pub(crate) footer_signing_key_metadata: Option<Vec<u8>>,
+}
+
+impl<'a> WriteThrift for FileMeta<'a> {
+    const ELEMENT_TYPE: ElementType = ElementType::Struct;
+
+    #[allow(unused_assignments)]

Review Comment:
   Is this still needed?



##########
parquet/src/file/metadata/thrift_gen.rs:
##########
@@ -1158,6 +1160,358 @@ impl PageHeader {
     }
 }
 
+/////////////////////////////////////////////////
+// helper functions for writing file meta data
+
+// serialize the bits of the column chunk needed for a thrift ColumnMetaData
+// struct ColumnMetaData {
+//   1: required Type type
+//   2: required list<Encoding> encodings
+//   3: required list<string> path_in_schema
+//   4: required CompressionCodec codec
+//   5: required i64 num_values
+//   6: required i64 total_uncompressed_size
+//   7: required i64 total_compressed_size
+//   8: optional list<KeyValue> key_value_metadata
+//   9: required i64 data_page_offset
+//   10: optional i64 index_page_offset
+//   11: optional i64 dictionary_page_offset
+//   12: optional Statistics statistics;
+//   13: optional list<PageEncodingStats> encoding_stats;
+//   14: optional i64 bloom_filter_offset;
+//   15: optional i32 bloom_filter_length;
+//   16: optional SizeStatistics size_statistics;
+//   17: optional GeospatialStatistics geospatial_statistics;
+// }
+pub(crate) fn serialize_column_meta_data<W: Write>(
+    column_chunk: &ColumnChunkMetaData,
+    w: &mut ThriftCompactOutputProtocol<W>,
+) -> Result<()> {
+    use crate::file::statistics::page_stats_to_thrift;
+
+    column_chunk.column_type().write_thrift_field(w, 1, 0)?;
+    column_chunk.encodings.write_thrift_field(w, 2, 1)?;
+    let path = column_chunk.column_descr.path().parts();
+    let path: Vec<&str> = path.iter().map(|v| v.as_str()).collect();
+    path.write_thrift_field(w, 3, 2)?;
+    column_chunk.compression.write_thrift_field(w, 4, 3)?;
+    column_chunk.num_values.write_thrift_field(w, 5, 4)?;
+    column_chunk
+        .total_uncompressed_size
+        .write_thrift_field(w, 6, 5)?;
+    column_chunk
+        .total_compressed_size
+        .write_thrift_field(w, 7, 6)?;
+    // no key_value_metadata here
+    let mut last_field_id = 
column_chunk.data_page_offset.write_thrift_field(w, 9, 7)?;
+    if let Some(index_page_offset) = column_chunk.index_page_offset {
+        last_field_id = index_page_offset.write_thrift_field(w, 10, 
last_field_id)?;
+    }
+    if let Some(dictionary_page_offset) = column_chunk.dictionary_page_offset {
+        last_field_id = dictionary_page_offset.write_thrift_field(w, 11, 
last_field_id)?;
+    }
+    // PageStatistics is the same as thrift Statistics, but writable
+    let stats = page_stats_to_thrift(column_chunk.statistics());
+    if let Some(stats) = stats {
+        last_field_id = stats.write_thrift_field(w, 12, last_field_id)?;
+    }
+    if let Some(page_encoding_stats) = column_chunk.page_encoding_stats() {
+        last_field_id = page_encoding_stats.write_thrift_field(w, 13, 
last_field_id)?;
+    }
+    if let Some(bloom_filter_offset) = column_chunk.bloom_filter_offset {
+        last_field_id = bloom_filter_offset.write_thrift_field(w, 14, 
last_field_id)?;
+    }
+    if let Some(bloom_filter_length) = column_chunk.bloom_filter_length {
+        last_field_id = bloom_filter_length.write_thrift_field(w, 15, 
last_field_id)?;
+    }
+
+    // SizeStatistics
+    let size_stats = if column_chunk.unencoded_byte_array_data_bytes.is_some()
+        || column_chunk.repetition_level_histogram.is_some()
+        || column_chunk.definition_level_histogram.is_some()
+    {
+        let repetition_level_histogram = column_chunk
+            .repetition_level_histogram()
+            .map(|hist| hist.clone().into_inner());
+
+        let definition_level_histogram = column_chunk
+            .definition_level_histogram()
+            .map(|hist| hist.clone().into_inner());
+
+        Some(SizeStatistics {
+            unencoded_byte_array_data_bytes: 
column_chunk.unencoded_byte_array_data_bytes,
+            repetition_level_histogram,
+            definition_level_histogram,
+        })
+    } else {
+        None
+    };
+    if let Some(size_stats) = size_stats {
+        size_stats.write_thrift_field(w, 16, last_field_id)?;
+    }
+
+    // TODO: field 17 geo spatial stats here
+    w.write_struct_end()
+}
+
+// temp struct used for writing
+pub(crate) struct FileMeta<'a> {
+    pub(crate) file_metadata: &'a crate::file::metadata::FileMetaData,
+    pub(crate) row_groups: &'a Vec<RowGroupMetaData>,
+    pub(crate) encryption_algorithm: Option<EncryptionAlgorithm>,
+    pub(crate) footer_signing_key_metadata: Option<Vec<u8>>,
+}
+
+impl<'a> WriteThrift for FileMeta<'a> {
+    const ELEMENT_TYPE: ElementType = ElementType::Struct;
+
+    #[allow(unused_assignments)]
+    fn write_thrift<W: Write>(&self, writer: &mut 
ThriftCompactOutputProtocol<W>) -> Result<()> {
+        self.file_metadata
+            .version
+            .write_thrift_field(writer, 1, 0)?;
+
+        // field 2 is schema. do depth-first traversal of tree, converting to 
SchemaElement and
+        // writing along the way.
+        let root = self.file_metadata.schema_descr().root_schema_ptr();
+        let schema_len = num_nodes(&root);
+        writer.write_field_begin(FieldType::List, 2, 1)?;
+        writer.write_list_begin(ElementType::Struct, schema_len)?;
+        // recursively write Type nodes as SchemaElements
+        write_schema(&root, writer)?;
+
+        self.file_metadata
+            .num_rows
+            .write_thrift_field(writer, 3, 2)?;
+
+        // this will call RowGroupMetaData::write_thrift
+        let mut last_field_id = self.row_groups.write_thrift_field(writer, 4, 
3)?;
+
+        if let Some(kv_metadata) = self.file_metadata.key_value_metadata() {
+            last_field_id = kv_metadata.write_thrift_field(writer, 5, 
last_field_id)?;
+        }
+        if let Some(created_by) = self.file_metadata.created_by() {
+            last_field_id = created_by.write_thrift_field(writer, 6, 
last_field_id)?;
+        }
+        if let Some(column_orders) = self.file_metadata.column_orders() {
+            last_field_id = column_orders.write_thrift_field(writer, 7, 
last_field_id)?;
+        }
+        if let Some(algo) = self.encryption_algorithm.as_ref() {
+            last_field_id = algo.write_thrift_field(writer, 8, last_field_id)?;
+        }
+        if let Some(key) = self.footer_signing_key_metadata.as_ref() {
+            key.as_slice()
+                .write_thrift_field(writer, 9, last_field_id)?;
+        }
+
+        writer.write_struct_end()
+    }
+}
+
+fn write_schema<W: Write>(
+    node: &TypePtr,
+    writer: &mut ThriftCompactOutputProtocol<W>,
+) -> Result<()> {
+    match node.as_ref() {
+        crate::schema::types::Type::PrimitiveType {
+            basic_info,
+            physical_type,
+            type_length,
+            scale,
+            precision,
+        } => {
+            let element = SchemaElement {
+                type_: Some(*physical_type),
+                type_length: if *type_length >= 0 {
+                    Some(*type_length)
+                } else {
+                    None
+                },
+                repetition_type: Some(basic_info.repetition()),
+                name: basic_info.name(),
+                num_children: None,
+                converted_type: match basic_info.converted_type() {
+                    ConvertedType::NONE => None,
+                    other => Some(other),
+                },
+                scale: if *scale >= 0 { Some(*scale) } else { None },
+                precision: if *precision >= 0 {
+                    Some(*precision)
+                } else {
+                    None
+                },
+                field_id: if basic_info.has_id() {
+                    Some(basic_info.id())
+                } else {
+                    None
+                },
+                logical_type: basic_info.logical_type(),
+            };
+            element.write_thrift(writer)
+        }
+        crate::schema::types::Type::GroupType { basic_info, fields } => {
+            let repetition = if basic_info.has_repetition() {
+                Some(basic_info.repetition())
+            } else {
+                None
+            };
+
+            let element = SchemaElement {
+                type_: None,
+                type_length: None,
+                repetition_type: repetition,
+                name: basic_info.name(),
+                num_children: Some(fields.len() as i32),
+                converted_type: match basic_info.converted_type() {
+                    ConvertedType::NONE => None,
+                    other => Some(other),
+                },
+                scale: None,
+                precision: None,
+                field_id: if basic_info.has_id() {
+                    Some(basic_info.id())
+                } else {
+                    None
+                },
+                logical_type: basic_info.logical_type(),
+            };
+
+            element.write_thrift(writer)?;
+
+            // Add child elements for a group
+            for field in fields {
+                write_schema(field, writer)?;
+            }
+            Ok(())
+        }
+    }
+}
+
+// struct RowGroup {

Review Comment:
   I double checked how to find this, and see that this maps straightforwardly 
to  the original thrift 👍 
   
   
https://github.com/apache/parquet-format/blob/9fd57b59e0ce1a82a69237dcf8977d3e72a2965d/src/main/thrift/parquet.thrift#L1001



##########
parquet/src/file/metadata/thrift_gen.rs:
##########
@@ -1158,6 +1160,358 @@ impl PageHeader {
     }
 }
 
+/////////////////////////////////////////////////
+// helper functions for writing file meta data
+
+// serialize the bits of the column chunk needed for a thrift ColumnMetaData
+// struct ColumnMetaData {
+//   1: required Type type
+//   2: required list<Encoding> encodings
+//   3: required list<string> path_in_schema
+//   4: required CompressionCodec codec
+//   5: required i64 num_values
+//   6: required i64 total_uncompressed_size
+//   7: required i64 total_compressed_size
+//   8: optional list<KeyValue> key_value_metadata
+//   9: required i64 data_page_offset
+//   10: optional i64 index_page_offset
+//   11: optional i64 dictionary_page_offset
+//   12: optional Statistics statistics;
+//   13: optional list<PageEncodingStats> encoding_stats;
+//   14: optional i64 bloom_filter_offset;
+//   15: optional i32 bloom_filter_length;
+//   16: optional SizeStatistics size_statistics;
+//   17: optional GeospatialStatistics geospatial_statistics;
+// }
+pub(crate) fn serialize_column_meta_data<W: Write>(
+    column_chunk: &ColumnChunkMetaData,
+    w: &mut ThriftCompactOutputProtocol<W>,
+) -> Result<()> {
+    use crate::file::statistics::page_stats_to_thrift;
+
+    column_chunk.column_type().write_thrift_field(w, 1, 0)?;
+    column_chunk.encodings.write_thrift_field(w, 2, 1)?;
+    let path = column_chunk.column_descr.path().parts();
+    let path: Vec<&str> = path.iter().map(|v| v.as_str()).collect();
+    path.write_thrift_field(w, 3, 2)?;
+    column_chunk.compression.write_thrift_field(w, 4, 3)?;
+    column_chunk.num_values.write_thrift_field(w, 5, 4)?;
+    column_chunk
+        .total_uncompressed_size
+        .write_thrift_field(w, 6, 5)?;
+    column_chunk
+        .total_compressed_size
+        .write_thrift_field(w, 7, 6)?;
+    // no key_value_metadata here
+    let mut last_field_id = 
column_chunk.data_page_offset.write_thrift_field(w, 9, 7)?;
+    if let Some(index_page_offset) = column_chunk.index_page_offset {
+        last_field_id = index_page_offset.write_thrift_field(w, 10, 
last_field_id)?;
+    }
+    if let Some(dictionary_page_offset) = column_chunk.dictionary_page_offset {
+        last_field_id = dictionary_page_offset.write_thrift_field(w, 11, 
last_field_id)?;
+    }
+    // PageStatistics is the same as thrift Statistics, but writable
+    let stats = page_stats_to_thrift(column_chunk.statistics());
+    if let Some(stats) = stats {
+        last_field_id = stats.write_thrift_field(w, 12, last_field_id)?;
+    }
+    if let Some(page_encoding_stats) = column_chunk.page_encoding_stats() {
+        last_field_id = page_encoding_stats.write_thrift_field(w, 13, 
last_field_id)?;
+    }
+    if let Some(bloom_filter_offset) = column_chunk.bloom_filter_offset {
+        last_field_id = bloom_filter_offset.write_thrift_field(w, 14, 
last_field_id)?;
+    }
+    if let Some(bloom_filter_length) = column_chunk.bloom_filter_length {
+        last_field_id = bloom_filter_length.write_thrift_field(w, 15, 
last_field_id)?;
+    }
+
+    // SizeStatistics
+    let size_stats = if column_chunk.unencoded_byte_array_data_bytes.is_some()
+        || column_chunk.repetition_level_histogram.is_some()
+        || column_chunk.definition_level_histogram.is_some()
+    {
+        let repetition_level_histogram = column_chunk
+            .repetition_level_histogram()
+            .map(|hist| hist.clone().into_inner());
+
+        let definition_level_histogram = column_chunk
+            .definition_level_histogram()
+            .map(|hist| hist.clone().into_inner());
+
+        Some(SizeStatistics {
+            unencoded_byte_array_data_bytes: 
column_chunk.unencoded_byte_array_data_bytes,
+            repetition_level_histogram,
+            definition_level_histogram,
+        })
+    } else {
+        None
+    };
+    if let Some(size_stats) = size_stats {
+        size_stats.write_thrift_field(w, 16, last_field_id)?;
+    }
+
+    // TODO: field 17 geo spatial stats here
+    w.write_struct_end()
+}
+
+// temp struct used for writing
+pub(crate) struct FileMeta<'a> {
+    pub(crate) file_metadata: &'a crate::file::metadata::FileMetaData,
+    pub(crate) row_groups: &'a Vec<RowGroupMetaData>,
+    pub(crate) encryption_algorithm: Option<EncryptionAlgorithm>,
+    pub(crate) footer_signing_key_metadata: Option<Vec<u8>>,
+}
+
+impl<'a> WriteThrift for FileMeta<'a> {
+    const ELEMENT_TYPE: ElementType = ElementType::Struct;
+
+    #[allow(unused_assignments)]
+    fn write_thrift<W: Write>(&self, writer: &mut 
ThriftCompactOutputProtocol<W>) -> Result<()> {
+        self.file_metadata
+            .version
+            .write_thrift_field(writer, 1, 0)?;
+
+        // field 2 is schema. do depth-first traversal of tree, converting to 
SchemaElement and
+        // writing along the way.
+        let root = self.file_metadata.schema_descr().root_schema_ptr();
+        let schema_len = num_nodes(&root);
+        writer.write_field_begin(FieldType::List, 2, 1)?;
+        writer.write_list_begin(ElementType::Struct, schema_len)?;
+        // recursively write Type nodes as SchemaElements
+        write_schema(&root, writer)?;
+
+        self.file_metadata
+            .num_rows
+            .write_thrift_field(writer, 3, 2)?;
+
+        // this will call RowGroupMetaData::write_thrift
+        let mut last_field_id = self.row_groups.write_thrift_field(writer, 4, 
3)?;
+
+        if let Some(kv_metadata) = self.file_metadata.key_value_metadata() {
+            last_field_id = kv_metadata.write_thrift_field(writer, 5, 
last_field_id)?;
+        }
+        if let Some(created_by) = self.file_metadata.created_by() {
+            last_field_id = created_by.write_thrift_field(writer, 6, 
last_field_id)?;
+        }
+        if let Some(column_orders) = self.file_metadata.column_orders() {
+            last_field_id = column_orders.write_thrift_field(writer, 7, 
last_field_id)?;
+        }
+        if let Some(algo) = self.encryption_algorithm.as_ref() {
+            last_field_id = algo.write_thrift_field(writer, 8, last_field_id)?;
+        }
+        if let Some(key) = self.footer_signing_key_metadata.as_ref() {
+            key.as_slice()
+                .write_thrift_field(writer, 9, last_field_id)?;
+        }
+
+        writer.write_struct_end()
+    }
+}
+
+fn write_schema<W: Write>(
+    node: &TypePtr,
+    writer: &mut ThriftCompactOutputProtocol<W>,
+) -> Result<()> {
+    match node.as_ref() {
+        crate::schema::types::Type::PrimitiveType {
+            basic_info,
+            physical_type,
+            type_length,
+            scale,
+            precision,
+        } => {
+            let element = SchemaElement {
+                type_: Some(*physical_type),
+                type_length: if *type_length >= 0 {
+                    Some(*type_length)
+                } else {
+                    None
+                },
+                repetition_type: Some(basic_info.repetition()),
+                name: basic_info.name(),
+                num_children: None,
+                converted_type: match basic_info.converted_type() {
+                    ConvertedType::NONE => None,
+                    other => Some(other),
+                },
+                scale: if *scale >= 0 { Some(*scale) } else { None },
+                precision: if *precision >= 0 {
+                    Some(*precision)
+                } else {
+                    None
+                },
+                field_id: if basic_info.has_id() {
+                    Some(basic_info.id())
+                } else {
+                    None
+                },
+                logical_type: basic_info.logical_type(),
+            };
+            element.write_thrift(writer)
+        }
+        crate::schema::types::Type::GroupType { basic_info, fields } => {
+            let repetition = if basic_info.has_repetition() {
+                Some(basic_info.repetition())
+            } else {
+                None
+            };
+
+            let element = SchemaElement {
+                type_: None,
+                type_length: None,
+                repetition_type: repetition,
+                name: basic_info.name(),
+                num_children: Some(fields.len() as i32),

Review Comment:
   maybe as a follow on we should validate this limit (aka that there are not 
more than 2M fields 🤔 )



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to