This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 5cdd6eb1 fix(metadata): export iceberg schema in snapshots table
(#1135)
5cdd6eb1 is described below
commit 5cdd6eb1d6f7352e6da6e63406bd488c7d322858
Author: xxchan <[email protected]>
AuthorDate: Mon Mar 31 21:20:06 2025 +0800
fix(metadata): export iceberg schema in snapshots table (#1135)
same as #871
---------
Signed-off-by: xxchan <[email protected]>
---
Cargo.toml | 18 ++---
crates/iceberg/src/inspect/snapshots.rs | 115 ++++++++++++++++++++------------
2 files changed, 80 insertions(+), 53 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
index 26a0ae22..730d156c 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -42,14 +42,14 @@ rust-version = "1.77.1"
anyhow = "1.0.72"
apache-avro = "0.17"
array-init = "2"
-arrow-arith = { version = "54.1.0" }
-arrow-array = { version = "54.1.0" }
-arrow-buffer = { version = "54.1.0" }
-arrow-cast = { version = "54.1.0" }
-arrow-ord = { version = "54.1.0" }
-arrow-schema = { version = "54.1.0" }
-arrow-select = { version = "54.1.0" }
-arrow-string = { version = "54.1.0" }
+arrow-arith = { version = "54.2.0" }
+arrow-array = { version = "54.2.0" }
+arrow-buffer = { version = "54.2.0" }
+arrow-cast = { version = "54.2.0" }
+arrow-ord = { version = "54.2.0" }
+arrow-schema = { version = "54.2.0" }
+arrow-select = { version = "54.2.0" }
+arrow-string = { version = "54.2.0" }
async-trait = "0.1.86"
async-std = "1.12"
aws-config = "1"
@@ -76,7 +76,7 @@ num-bigint = "0.4.6"
once_cell = "1.20"
opendal = "0.51.2"
ordered-float = "4"
-parquet = "54.1.0"
+parquet = "54.2.0"
pilota = "0.11.2"
pretty_assertions = "1.4"
port_scanner = "0.1.5"
diff --git a/crates/iceberg/src/inspect/snapshots.rs
b/crates/iceberg/src/inspect/snapshots.rs
index 1ee89963..63175562 100644
--- a/crates/iceberg/src/inspect/snapshots.rs
+++ b/crates/iceberg/src/inspect/snapshots.rs
@@ -15,15 +15,21 @@
// specific language governing permissions and limitations
// under the License.
+use std::collections::HashMap;
use std::sync::Arc;
-use arrow_array::builder::{MapBuilder, PrimitiveBuilder, StringBuilder};
-use arrow_array::types::{Int64Type, TimestampMillisecondType};
+use arrow_array::builder::{MapBuilder, MapFieldNames, PrimitiveBuilder,
StringBuilder};
+use arrow_array::types::{Int64Type, TimestampMicrosecondType};
use arrow_array::RecordBatch;
-use arrow_schema::{DataType, Field, Schema, TimeUnit};
+use arrow_schema::{DataType, Field};
use futures::{stream, StreamExt};
+use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
+use crate::arrow::{schema_to_arrow_schema, DEFAULT_MAP_FIELD_NAME};
use crate::scan::ArrowRecordBatchStream;
+use crate::spec::{
+ MapType, NestedField, PrimitiveType, Type, MAP_KEY_FIELD_NAME,
MAP_VALUE_FIELD_NAME,
+};
use crate::table::Table;
use crate::Result;
@@ -38,51 +44,72 @@ impl<'a> SnapshotsTable<'a> {
Self { table }
}
- /// Returns the schema of the snapshots table.
- pub fn schema(&self) -> Schema {
- Schema::new(vec![
- Field::new(
+ /// Returns the iceberg schema of the snapshots table.
+ pub fn schema(&self) -> crate::spec::Schema {
+ let fields = vec![
+ NestedField::required(
+ 1,
"committed_at",
- DataType::Timestamp(TimeUnit::Millisecond,
Some("+00:00".into())),
- false,
+ Type::Primitive(PrimitiveType::Timestamptz),
),
- Field::new("snapshot_id", DataType::Int64, false),
- Field::new("parent_id", DataType::Int64, true),
- Field::new("operation", DataType::Utf8, false),
- Field::new("manifest_list", DataType::Utf8, false),
- Field::new(
+ NestedField::required(2, "snapshot_id",
Type::Primitive(PrimitiveType::Long)),
+ NestedField::optional(3, "parent_id",
Type::Primitive(PrimitiveType::Long)),
+ NestedField::optional(4, "operation",
Type::Primitive(PrimitiveType::String)),
+ NestedField::optional(5, "manifest_list",
Type::Primitive(PrimitiveType::String)),
+ NestedField::optional(
+ 6,
"summary",
- DataType::Map(
- Arc::new(Field::new(
- "entries",
- DataType::Struct(
- vec![
- Field::new("keys", DataType::Utf8, false),
- Field::new("values", DataType::Utf8, true),
- ]
- .into(),
- ),
+ Type::Map(MapType {
+ key_field: Arc::new(NestedField::map_key_element(
+ 7,
+ Type::Primitive(PrimitiveType::String),
+ )),
+ value_field: Arc::new(NestedField::map_value_element(
+ 8,
+ Type::Primitive(PrimitiveType::String),
false,
)),
- false,
- ),
- false,
+ }),
),
- ])
+ ];
+ crate::spec::Schema::builder()
+ .with_fields(fields.into_iter().map(|f| f.into()))
+ .build()
+ .unwrap()
}
/// Scans the snapshots table.
pub async fn scan(&self) -> Result<ArrowRecordBatchStream> {
+ let schema = schema_to_arrow_schema(&self.schema())?;
+
let mut committed_at =
-
PrimitiveBuilder::<TimestampMillisecondType>::new().with_timezone("+00:00");
+
PrimitiveBuilder::<TimestampMicrosecondType>::new().with_timezone("+00:00");
let mut snapshot_id = PrimitiveBuilder::<Int64Type>::new();
let mut parent_id = PrimitiveBuilder::<Int64Type>::new();
let mut operation = StringBuilder::new();
let mut manifest_list = StringBuilder::new();
- let mut summary = MapBuilder::new(None, StringBuilder::new(),
StringBuilder::new());
-
+ let mut summary = MapBuilder::new(
+ Some(MapFieldNames {
+ entry: DEFAULT_MAP_FIELD_NAME.to_string(),
+ key: MAP_KEY_FIELD_NAME.to_string(),
+ value: MAP_VALUE_FIELD_NAME.to_string(),
+ }),
+ StringBuilder::new(),
+ StringBuilder::new(),
+ )
+ .with_keys_field(Arc::new(
+ Field::new(MAP_KEY_FIELD_NAME, DataType::Utf8,
false).with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "7".to_string(),
+ )])),
+ ))
+ .with_values_field(Arc::new(
+ Field::new(MAP_VALUE_FIELD_NAME, DataType::Utf8,
true).with_metadata(HashMap::from([
+ (PARQUET_FIELD_ID_META_KEY.to_string(), "8".to_string()),
+ ])),
+ ));
for snapshot in self.table.metadata().snapshots() {
- committed_at.append_value(snapshot.timestamp_ms());
+ committed_at.append_value(snapshot.timestamp_ms() * 1000);
snapshot_id.append_value(snapshot.snapshot_id());
parent_id.append_option(snapshot.parent_snapshot_id());
manifest_list.append_value(snapshot.manifest_list());
@@ -94,7 +121,7 @@ impl<'a> SnapshotsTable<'a> {
summary.append(true)?;
}
- let batch = RecordBatch::try_new(Arc::new(self.schema()), vec![
+ let batch = RecordBatch::try_new(Arc::new(schema), vec![
Arc::new(committed_at.finish()),
Arc::new(snapshot_id.finish()),
Arc::new(parent_id.finish()),
@@ -123,14 +150,14 @@ mod tests {
check_record_batches(
batch_stream,
expect![[r#"
- Field { name: "committed_at", data_type:
Timestamp(Millisecond, Some("+00:00")), nullable: false, dict_id: 0,
dict_is_ordered: false, metadata: {} },
- Field { name: "snapshot_id", data_type: Int64, nullable:
false, dict_id: 0, dict_is_ordered: false, metadata: {} },
- Field { name: "parent_id", data_type: Int64, nullable: true,
dict_id: 0, dict_is_ordered: false, metadata: {} },
- Field { name: "operation", data_type: Utf8, nullable: false,
dict_id: 0, dict_is_ordered: false, metadata: {} },
- Field { name: "manifest_list", data_type: Utf8, nullable:
false, dict_id: 0, dict_is_ordered: false, metadata: {} },
- Field { name: "summary", data_type: Map(Field { name:
"entries", data_type: Struct([Field { name: "keys", data_type: Utf8, nullable:
false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name:
"values", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false,
metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false,
metadata: {} }, false), nullable: false, dict_id: 0, dict_is_ordered: false,
metadata: {} }"#]],
+ Field { name: "committed_at", data_type:
Timestamp(Microsecond, Some("+00:00")), nullable: false, dict_id: 0,
dict_is_ordered: false, metadata: {"PARQUET:field_id": "1"} },
+ Field { name: "snapshot_id", data_type: Int64, nullable:
false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "2"}
},
+ Field { name: "parent_id", data_type: Int64, nullable: true,
dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "3"} },
+ Field { name: "operation", data_type: Utf8, nullable: true,
dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "4"} },
+ Field { name: "manifest_list", data_type: Utf8, nullable:
true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "5"} },
+ Field { name: "summary", data_type: Map(Field { name:
"key_value", data_type: Struct([Field { name: "key", data_type: Utf8, nullable:
false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "7"}
}, Field { name: "value", data_type: Utf8, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {"PARQUET:field_id": "8"} }]), nullable:
false, dict_id: 0, dict_is_ordered: false, metadata: {} }, false), nullable:
true, dict_id: 0, dict_is_ordered: false [...]
expect![[r#"
- committed_at: PrimitiveArray<Timestamp(Millisecond,
Some("+00:00"))>
+ committed_at: PrimitiveArray<Timestamp(Microsecond,
Some("+00:00"))>
[
2018-01-04T21:22:35.770+00:00,
2019-04-12T20:29:15.770+00:00,
@@ -158,11 +185,11 @@ mod tests {
[
]
[
- -- child 0: "keys" (Utf8)
+ -- child 0: "key" (Utf8)
StringArray
[
]
- -- child 1: "values" (Utf8)
+ -- child 1: "value" (Utf8)
StringArray
[
]
@@ -172,11 +199,11 @@ mod tests {
[
]
[
- -- child 0: "keys" (Utf8)
+ -- child 0: "key" (Utf8)
StringArray
[
]
- -- child 1: "values" (Utf8)
+ -- child 1: "value" (Utf8)
StringArray
[
]