etseidl commented on code in PR #8445:
URL: https://github.com/apache/arrow-rs/pull/8445#discussion_r2382592093
##########
parquet/src/file/metadata/thrift_gen.rs:
##########
@@ -1158,6 +1160,358 @@ impl PageHeader {
}
}
+/////////////////////////////////////////////////
+// helper functions for writing file meta data
+
+// serialize the bits of the column chunk needed for a thrift ColumnMetaData
+// struct ColumnMetaData {
+// 1: required Type type
+// 2: required list<Encoding> encodings
+// 3: required list<string> path_in_schema
+// 4: required CompressionCodec codec
+// 5: required i64 num_values
+// 6: required i64 total_uncompressed_size
+// 7: required i64 total_compressed_size
+// 8: optional list<KeyValue> key_value_metadata
+// 9: required i64 data_page_offset
+// 10: optional i64 index_page_offset
+// 11: optional i64 dictionary_page_offset
+// 12: optional Statistics statistics;
+// 13: optional list<PageEncodingStats> encoding_stats;
+// 14: optional i64 bloom_filter_offset;
+// 15: optional i32 bloom_filter_length;
+// 16: optional SizeStatistics size_statistics;
+// 17: optional GeospatialStatistics geospatial_statistics;
+// }
+pub(crate) fn serialize_column_meta_data<W: Write>(
+ column_chunk: &ColumnChunkMetaData,
+ w: &mut ThriftCompactOutputProtocol<W>,
+) -> Result<()> {
+ use crate::file::statistics::page_stats_to_thrift;
+
+ column_chunk.column_type().write_thrift_field(w, 1, 0)?;
+ column_chunk.encodings.write_thrift_field(w, 2, 1)?;
+ let path = column_chunk.column_descr.path().parts();
+ let path: Vec<&str> = path.iter().map(|v| v.as_str()).collect();
+ path.write_thrift_field(w, 3, 2)?;
+ column_chunk.compression.write_thrift_field(w, 4, 3)?;
+ column_chunk.num_values.write_thrift_field(w, 5, 4)?;
+ column_chunk
+ .total_uncompressed_size
+ .write_thrift_field(w, 6, 5)?;
+ column_chunk
+ .total_compressed_size
+ .write_thrift_field(w, 7, 6)?;
+ // no key_value_metadata here
+ let mut last_field_id =
column_chunk.data_page_offset.write_thrift_field(w, 9, 7)?;
+ if let Some(index_page_offset) = column_chunk.index_page_offset {
+ last_field_id = index_page_offset.write_thrift_field(w, 10,
last_field_id)?;
+ }
+ if let Some(dictionary_page_offset) = column_chunk.dictionary_page_offset {
+ last_field_id = dictionary_page_offset.write_thrift_field(w, 11,
last_field_id)?;
+ }
+ // PageStatistics is the same as thrift Statistics, but writable
+ let stats = page_stats_to_thrift(column_chunk.statistics());
+ if let Some(stats) = stats {
+ last_field_id = stats.write_thrift_field(w, 12, last_field_id)?;
+ }
+ if let Some(page_encoding_stats) = column_chunk.page_encoding_stats() {
+ last_field_id = page_encoding_stats.write_thrift_field(w, 13,
last_field_id)?;
+ }
+ if let Some(bloom_filter_offset) = column_chunk.bloom_filter_offset {
+ last_field_id = bloom_filter_offset.write_thrift_field(w, 14,
last_field_id)?;
+ }
+ if let Some(bloom_filter_length) = column_chunk.bloom_filter_length {
+ last_field_id = bloom_filter_length.write_thrift_field(w, 15,
last_field_id)?;
+ }
+
+ // SizeStatistics
+ let size_stats = if column_chunk.unencoded_byte_array_data_bytes.is_some()
+ || column_chunk.repetition_level_histogram.is_some()
+ || column_chunk.definition_level_histogram.is_some()
+ {
+ let repetition_level_histogram = column_chunk
+ .repetition_level_histogram()
+ .map(|hist| hist.clone().into_inner());
+
+ let definition_level_histogram = column_chunk
+ .definition_level_histogram()
+ .map(|hist| hist.clone().into_inner());
+
+ Some(SizeStatistics {
+ unencoded_byte_array_data_bytes:
column_chunk.unencoded_byte_array_data_bytes,
+ repetition_level_histogram,
+ definition_level_histogram,
+ })
+ } else {
+ None
+ };
+ if let Some(size_stats) = size_stats {
+ size_stats.write_thrift_field(w, 16, last_field_id)?;
+ }
+
+ // TODO: field 17 geo spatial stats here
+ w.write_struct_end()
+}
+
+// temp struct used for writing
+pub(crate) struct FileMeta<'a> {
+ pub(crate) file_metadata: &'a crate::file::metadata::FileMetaData,
+ pub(crate) row_groups: &'a Vec<RowGroupMetaData>,
+ pub(crate) encryption_algorithm: Option<EncryptionAlgorithm>,
+ pub(crate) footer_signing_key_metadata: Option<Vec<u8>>,
+}
+
+impl<'a> WriteThrift for FileMeta<'a> {
+ const ELEMENT_TYPE: ElementType = ElementType::Struct;
+
+ #[allow(unused_assignments)]
+ fn write_thrift<W: Write>(&self, writer: &mut
ThriftCompactOutputProtocol<W>) -> Result<()> {
+ self.file_metadata
+ .version
+ .write_thrift_field(writer, 1, 0)?;
+
+ // field 2 is schema. do depth-first traversal of tree, converting to
SchemaElement and
+ // writing along the way.
+ let root = self.file_metadata.schema_descr().root_schema_ptr();
+ let schema_len = num_nodes(&root);
+ writer.write_field_begin(FieldType::List, 2, 1)?;
+ writer.write_list_begin(ElementType::Struct, schema_len)?;
+ // recursively write Type nodes as SchemaElements
+ write_schema(&root, writer)?;
+
+ self.file_metadata
+ .num_rows
+ .write_thrift_field(writer, 3, 2)?;
+
+ // this will call RowGroupMetaData::write_thrift
+ let mut last_field_id = self.row_groups.write_thrift_field(writer, 4,
3)?;
+
+ if let Some(kv_metadata) = self.file_metadata.key_value_metadata() {
+ last_field_id = kv_metadata.write_thrift_field(writer, 5,
last_field_id)?;
+ }
+ if let Some(created_by) = self.file_metadata.created_by() {
+ last_field_id = created_by.write_thrift_field(writer, 6,
last_field_id)?;
+ }
+ if let Some(column_orders) = self.file_metadata.column_orders() {
+ last_field_id = column_orders.write_thrift_field(writer, 7,
last_field_id)?;
+ }
+ if let Some(algo) = self.encryption_algorithm.as_ref() {
+ last_field_id = algo.write_thrift_field(writer, 8, last_field_id)?;
+ }
+ if let Some(key) = self.footer_signing_key_metadata.as_ref() {
+ key.as_slice()
+ .write_thrift_field(writer, 9, last_field_id)?;
+ }
+
+ writer.write_struct_end()
+ }
+}
+
+fn write_schema<W: Write>(
+ node: &TypePtr,
+ writer: &mut ThriftCompactOutputProtocol<W>,
+) -> Result<()> {
+ match node.as_ref() {
+ crate::schema::types::Type::PrimitiveType {
+ basic_info,
+ physical_type,
+ type_length,
+ scale,
+ precision,
+ } => {
+ let element = SchemaElement {
+ type_: Some(*physical_type),
+ type_length: if *type_length >= 0 {
+ Some(*type_length)
+ } else {
+ None
+ },
+ repetition_type: Some(basic_info.repetition()),
+ name: basic_info.name(),
+ num_children: None,
+ converted_type: match basic_info.converted_type() {
+ ConvertedType::NONE => None,
+ other => Some(other),
+ },
+ scale: if *scale >= 0 { Some(*scale) } else { None },
+ precision: if *precision >= 0 {
+ Some(*precision)
+ } else {
+ None
+ },
+ field_id: if basic_info.has_id() {
+ Some(basic_info.id())
+ } else {
+ None
+ },
+ logical_type: basic_info.logical_type(),
+ };
+ element.write_thrift(writer)
+ }
+ crate::schema::types::Type::GroupType { basic_info, fields } => {
+ let repetition = if basic_info.has_repetition() {
+ Some(basic_info.repetition())
+ } else {
+ None
+ };
+
+ let element = SchemaElement {
+ type_: None,
+ type_length: None,
+ repetition_type: repetition,
+ name: basic_info.name(),
+ num_children: Some(fields.len() as i32),
Review Comment:
That would be a BIG schema, but I could switch to a `try`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]