This is an automated email from the ASF dual-hosted git repository.

liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 5cdd6eb1 fix(metadata): export iceberg schema in snapshots table 
(#1135)
5cdd6eb1 is described below

commit 5cdd6eb1d6f7352e6da6e63406bd488c7d322858
Author: xxchan <[email protected]>
AuthorDate: Mon Mar 31 21:20:06 2025 +0800

    fix(metadata): export iceberg schema in snapshots table (#1135)
    
    same as #871
    
    
    
    ---------
    
    Signed-off-by: xxchan <[email protected]>
---
 Cargo.toml                              |  18 ++---
 crates/iceberg/src/inspect/snapshots.rs | 115 ++++++++++++++++++++------------
 2 files changed, 80 insertions(+), 53 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index 26a0ae22..730d156c 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -42,14 +42,14 @@ rust-version = "1.77.1"
 anyhow = "1.0.72"
 apache-avro = "0.17"
 array-init = "2"
-arrow-arith = { version = "54.1.0" }
-arrow-array = { version = "54.1.0" }
-arrow-buffer = { version = "54.1.0" }
-arrow-cast = { version = "54.1.0" }
-arrow-ord = { version = "54.1.0" }
-arrow-schema = { version = "54.1.0" }
-arrow-select = { version = "54.1.0" }
-arrow-string = { version = "54.1.0" }
+arrow-arith = { version = "54.2.0" }
+arrow-array = { version = "54.2.0" }
+arrow-buffer = { version = "54.2.0" }
+arrow-cast = { version = "54.2.0" }
+arrow-ord = { version = "54.2.0" }
+arrow-schema = { version = "54.2.0" }
+arrow-select = { version = "54.2.0" }
+arrow-string = { version = "54.2.0" }
 async-trait = "0.1.86"
 async-std = "1.12"
 aws-config = "1"
@@ -76,7 +76,7 @@ num-bigint = "0.4.6"
 once_cell = "1.20"
 opendal = "0.51.2"
 ordered-float = "4"
-parquet = "54.1.0"
+parquet = "54.2.0"
 pilota = "0.11.2"
 pretty_assertions = "1.4"
 port_scanner = "0.1.5"
diff --git a/crates/iceberg/src/inspect/snapshots.rs 
b/crates/iceberg/src/inspect/snapshots.rs
index 1ee89963..63175562 100644
--- a/crates/iceberg/src/inspect/snapshots.rs
+++ b/crates/iceberg/src/inspect/snapshots.rs
@@ -15,15 +15,21 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use std::collections::HashMap;
 use std::sync::Arc;
 
-use arrow_array::builder::{MapBuilder, PrimitiveBuilder, StringBuilder};
-use arrow_array::types::{Int64Type, TimestampMillisecondType};
+use arrow_array::builder::{MapBuilder, MapFieldNames, PrimitiveBuilder, 
StringBuilder};
+use arrow_array::types::{Int64Type, TimestampMicrosecondType};
 use arrow_array::RecordBatch;
-use arrow_schema::{DataType, Field, Schema, TimeUnit};
+use arrow_schema::{DataType, Field};
 use futures::{stream, StreamExt};
+use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
 
+use crate::arrow::{schema_to_arrow_schema, DEFAULT_MAP_FIELD_NAME};
 use crate::scan::ArrowRecordBatchStream;
+use crate::spec::{
+    MapType, NestedField, PrimitiveType, Type, MAP_KEY_FIELD_NAME, 
MAP_VALUE_FIELD_NAME,
+};
 use crate::table::Table;
 use crate::Result;
 
@@ -38,51 +44,72 @@ impl<'a> SnapshotsTable<'a> {
         Self { table }
     }
 
-    /// Returns the schema of the snapshots table.
-    pub fn schema(&self) -> Schema {
-        Schema::new(vec![
-            Field::new(
+    /// Returns the iceberg schema of the snapshots table.
+    pub fn schema(&self) -> crate::spec::Schema {
+        let fields = vec![
+            NestedField::required(
+                1,
                 "committed_at",
-                DataType::Timestamp(TimeUnit::Millisecond, 
Some("+00:00".into())),
-                false,
+                Type::Primitive(PrimitiveType::Timestamptz),
             ),
-            Field::new("snapshot_id", DataType::Int64, false),
-            Field::new("parent_id", DataType::Int64, true),
-            Field::new("operation", DataType::Utf8, false),
-            Field::new("manifest_list", DataType::Utf8, false),
-            Field::new(
+            NestedField::required(2, "snapshot_id", 
Type::Primitive(PrimitiveType::Long)),
+            NestedField::optional(3, "parent_id", 
Type::Primitive(PrimitiveType::Long)),
+            NestedField::optional(4, "operation", 
Type::Primitive(PrimitiveType::String)),
+            NestedField::optional(5, "manifest_list", 
Type::Primitive(PrimitiveType::String)),
+            NestedField::optional(
+                6,
                 "summary",
-                DataType::Map(
-                    Arc::new(Field::new(
-                        "entries",
-                        DataType::Struct(
-                            vec![
-                                Field::new("keys", DataType::Utf8, false),
-                                Field::new("values", DataType::Utf8, true),
-                            ]
-                            .into(),
-                        ),
+                Type::Map(MapType {
+                    key_field: Arc::new(NestedField::map_key_element(
+                        7,
+                        Type::Primitive(PrimitiveType::String),
+                    )),
+                    value_field: Arc::new(NestedField::map_value_element(
+                        8,
+                        Type::Primitive(PrimitiveType::String),
                         false,
                     )),
-                    false,
-                ),
-                false,
+                }),
             ),
-        ])
+        ];
+        crate::spec::Schema::builder()
+            .with_fields(fields.into_iter().map(|f| f.into()))
+            .build()
+            .unwrap()
     }
 
     /// Scans the snapshots table.
     pub async fn scan(&self) -> Result<ArrowRecordBatchStream> {
+        let schema = schema_to_arrow_schema(&self.schema())?;
+
         let mut committed_at =
-            
PrimitiveBuilder::<TimestampMillisecondType>::new().with_timezone("+00:00");
+            
PrimitiveBuilder::<TimestampMicrosecondType>::new().with_timezone("+00:00");
         let mut snapshot_id = PrimitiveBuilder::<Int64Type>::new();
         let mut parent_id = PrimitiveBuilder::<Int64Type>::new();
         let mut operation = StringBuilder::new();
         let mut manifest_list = StringBuilder::new();
-        let mut summary = MapBuilder::new(None, StringBuilder::new(), 
StringBuilder::new());
-
+        let mut summary = MapBuilder::new(
+            Some(MapFieldNames {
+                entry: DEFAULT_MAP_FIELD_NAME.to_string(),
+                key: MAP_KEY_FIELD_NAME.to_string(),
+                value: MAP_VALUE_FIELD_NAME.to_string(),
+            }),
+            StringBuilder::new(),
+            StringBuilder::new(),
+        )
+        .with_keys_field(Arc::new(
+            Field::new(MAP_KEY_FIELD_NAME, DataType::Utf8, 
false).with_metadata(HashMap::from([(
+                PARQUET_FIELD_ID_META_KEY.to_string(),
+                "7".to_string(),
+            )])),
+        ))
+        .with_values_field(Arc::new(
+            Field::new(MAP_VALUE_FIELD_NAME, DataType::Utf8, 
true).with_metadata(HashMap::from([
+                (PARQUET_FIELD_ID_META_KEY.to_string(), "8".to_string()),
+            ])),
+        ));
         for snapshot in self.table.metadata().snapshots() {
-            committed_at.append_value(snapshot.timestamp_ms());
+            committed_at.append_value(snapshot.timestamp_ms() * 1000);
             snapshot_id.append_value(snapshot.snapshot_id());
             parent_id.append_option(snapshot.parent_snapshot_id());
             manifest_list.append_value(snapshot.manifest_list());
@@ -94,7 +121,7 @@ impl<'a> SnapshotsTable<'a> {
             summary.append(true)?;
         }
 
-        let batch = RecordBatch::try_new(Arc::new(self.schema()), vec![
+        let batch = RecordBatch::try_new(Arc::new(schema), vec![
             Arc::new(committed_at.finish()),
             Arc::new(snapshot_id.finish()),
             Arc::new(parent_id.finish()),
@@ -123,14 +150,14 @@ mod tests {
         check_record_batches(
             batch_stream,
             expect![[r#"
-                Field { name: "committed_at", data_type: 
Timestamp(Millisecond, Some("+00:00")), nullable: false, dict_id: 0, 
dict_is_ordered: false, metadata: {} },
-                Field { name: "snapshot_id", data_type: Int64, nullable: 
false, dict_id: 0, dict_is_ordered: false, metadata: {} },
-                Field { name: "parent_id", data_type: Int64, nullable: true, 
dict_id: 0, dict_is_ordered: false, metadata: {} },
-                Field { name: "operation", data_type: Utf8, nullable: false, 
dict_id: 0, dict_is_ordered: false, metadata: {} },
-                Field { name: "manifest_list", data_type: Utf8, nullable: 
false, dict_id: 0, dict_is_ordered: false, metadata: {} },
-                Field { name: "summary", data_type: Map(Field { name: 
"entries", data_type: Struct([Field { name: "keys", data_type: Utf8, nullable: 
false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: 
"values", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, 
metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, 
metadata: {} }, false), nullable: false, dict_id: 0, dict_is_ordered: false, 
metadata: {} }"#]],
+                Field { name: "committed_at", data_type: 
Timestamp(Microsecond, Some("+00:00")), nullable: false, dict_id: 0, 
dict_is_ordered: false, metadata: {"PARQUET:field_id": "1"} },
+                Field { name: "snapshot_id", data_type: Int64, nullable: 
false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "2"} 
},
+                Field { name: "parent_id", data_type: Int64, nullable: true, 
dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "3"} },
+                Field { name: "operation", data_type: Utf8, nullable: true, 
dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "4"} },
+                Field { name: "manifest_list", data_type: Utf8, nullable: 
true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "5"} },
+                Field { name: "summary", data_type: Map(Field { name: 
"key_value", data_type: Struct([Field { name: "key", data_type: Utf8, nullable: 
false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "7"} 
}, Field { name: "value", data_type: Utf8, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {"PARQUET:field_id": "8"} }]), nullable: 
false, dict_id: 0, dict_is_ordered: false, metadata: {} }, false), nullable: 
true, dict_id: 0, dict_is_ordered: false [...]
             expect![[r#"
-                committed_at: PrimitiveArray<Timestamp(Millisecond, 
Some("+00:00"))>
+                committed_at: PrimitiveArray<Timestamp(Microsecond, 
Some("+00:00"))>
                 [
                   2018-01-04T21:22:35.770+00:00,
                   2019-04-12T20:29:15.770+00:00,
@@ -158,11 +185,11 @@ mod tests {
                 [
                 ]
                 [
-                -- child 0: "keys" (Utf8)
+                -- child 0: "key" (Utf8)
                 StringArray
                 [
                 ]
-                -- child 1: "values" (Utf8)
+                -- child 1: "value" (Utf8)
                 StringArray
                 [
                 ]
@@ -172,11 +199,11 @@ mod tests {
                 [
                 ]
                 [
-                -- child 0: "keys" (Utf8)
+                -- child 0: "key" (Utf8)
                 StringArray
                 [
                 ]
-                -- child 1: "values" (Utf8)
+                -- child 1: "value" (Utf8)
                 StringArray
                 [
                 ]

Reply via email to