This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 36cc1208 feat(datafusion): support metadata tables for Datafusion
(#879)
36cc1208 is described below
commit 36cc12087bc118f1fe10efa6b30db98bd9655ba7
Author: xxchan <[email protected]>
AuthorDate: Wed Jul 2 00:16:43 2025 +0800
feat(datafusion): support metadata tables for Datafusion (#879)
Use the newly added metadata tables #823 to support this feature in
Datafusion engine to demonstrate its usage.
This may also potentially help design and stablize the interface of
metadata tables in the `iceberg` crate.
---------
Signed-off-by: xxchan <[email protected]>
---
Cargo.lock | 28 ++++-
Cargo.toml | 1 +
crates/iceberg/Cargo.toml | 2 +
crates/iceberg/src/inspect/manifests.rs | 7 +-
crates/iceberg/src/inspect/metadata_table.rs | 103 ++++++---------
crates/iceberg/src/inspect/snapshots.rs | 7 +-
crates/iceberg/src/lib.rs | 1 +
crates/iceberg/src/test_utils.rs | 78 ++++++++++++
crates/integrations/datafusion/Cargo.toml | 1 +
.../datafusion/src/physical_plan/metadata_scan.rs | 92 ++++++++++++++
.../datafusion/src/physical_plan/mod.rs | 1 +
crates/integrations/datafusion/src/schema.rs | 47 +++++--
.../datafusion/src/table/metadata_table.rs | 87 +++++++++++++
crates/integrations/datafusion/src/table/mod.rs | 10 ++
.../tests/integration_datafusion_test.rs | 138 ++++++++++++++++++++-
15 files changed, 517 insertions(+), 86 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index ca88923c..a356bb38 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -181,8 +181,8 @@ dependencies = [
"serde_bytes",
"serde_json",
"snap",
- "strum",
- "strum_macros",
+ "strum 0.26.3",
+ "strum_macros 0.26.4",
"thiserror 1.0.69",
"typed-builder 0.19.1",
"uuid",
@@ -3529,6 +3529,7 @@ dependencies = [
"serde_json",
"serde_repr",
"serde_with",
+ "strum 0.27.1",
"tempfile",
"tera",
"thrift",
@@ -3671,6 +3672,7 @@ dependencies = [
"anyhow",
"async-trait",
"datafusion",
+ "expect-test",
"futures",
"iceberg",
"iceberg-catalog-memory",
@@ -6732,6 +6734,15 @@ version = "0.26.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8fec0f0aef304996cf250b31b5a10dee7980c85da9d759361292b8bca5a18f06"
+[[package]]
+name = "strum"
+version = "0.27.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f64def088c51c9510a8579e3c5d67c65349dcf755e5479ad3d010aa6454e2c32"
+dependencies = [
+ "strum_macros 0.27.1",
+]
+
[[package]]
name = "strum_macros"
version = "0.26.4"
@@ -6745,6 +6756,19 @@ dependencies = [
"syn 2.0.101",
]
+[[package]]
+name = "strum_macros"
+version = "0.27.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c77a8c5abcaf0f9ce05d62342b7d298c346515365c36b673df4ebe3ced01fde8"
+dependencies = [
+ "heck",
+ "proc-macro2",
+ "quote",
+ "rustversion",
+ "syn 2.0.101",
+]
+
[[package]]
name = "subst"
version = "0.3.7"
diff --git a/Cargo.toml b/Cargo.toml
index 43d03520..fd83fd55 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -107,6 +107,7 @@ serde_repr = "0.1.16"
serde_with = "3.4"
sqllogictest = "0.28"
stacker = "0.1.20"
+strum = "0.27"
tempfile = "3.18"
tera = "1"
thrift = "0.17.0"
diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml
index 9f5d4901..52f5b4d8 100644
--- a/crates/iceberg/Cargo.toml
+++ b/crates/iceberg/Cargo.toml
@@ -62,6 +62,7 @@ bimap = { workspace = true }
bytes = { workspace = true }
chrono = { workspace = true }
derive_builder = { workspace = true }
+expect-test = { workspace = true }
fnv = { workspace = true }
futures = { workspace = true }
itertools = { workspace = true }
@@ -82,6 +83,7 @@ serde_derive = { workspace = true }
serde_json = { workspace = true }
serde_repr = { workspace = true }
serde_with = { workspace = true }
+strum = { workspace = true, features = ["derive"] }
thrift = { workspace = true }
tokio = { workspace = true, optional = false, features = ["sync"] }
typed-builder = { workspace = true }
diff --git a/crates/iceberg/src/inspect/manifests.rs
b/crates/iceberg/src/inspect/manifests.rs
index 9fd732ea..60854b8b 100644
--- a/crates/iceberg/src/inspect/manifests.rs
+++ b/crates/iceberg/src/inspect/manifests.rs
@@ -281,9 +281,10 @@ impl<'a> ManifestsTable<'a> {
#[cfg(test)]
mod tests {
use expect_test::expect;
+ use futures::TryStreamExt;
- use crate::inspect::metadata_table::tests::check_record_batches;
use crate::scan::tests::TableTestFixture;
+ use crate::test_utils::check_record_batches;
#[tokio::test]
async fn test_manifests_table() {
@@ -293,7 +294,7 @@ mod tests {
let record_batch =
fixture.table.inspect().manifests().scan().await.unwrap();
check_record_batches(
- record_batch,
+ record_batch.try_collect::<Vec<_>>().await.unwrap(),
expect![[r#"
Field { name: "content", data_type: Int32, nullable: false,
dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "14"} },
Field { name: "path", data_type: Utf8, nullable: false,
dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1"} },
@@ -378,6 +379,6 @@ mod tests {
]"#]],
&["path", "length"],
Some("path"),
- ).await;
+ );
}
}
diff --git a/crates/iceberg/src/inspect/metadata_table.rs
b/crates/iceberg/src/inspect/metadata_table.rs
index b2dfbb7b..92571db1 100644
--- a/crates/iceberg/src/inspect/metadata_table.rs
+++ b/crates/iceberg/src/inspect/metadata_table.rs
@@ -27,6 +27,43 @@ use crate::table::Table;
#[derive(Debug)]
pub struct MetadataTable<'a>(&'a Table);
+/// Metadata table type.
+#[derive(Debug, Clone, strum::EnumIter)]
+pub enum MetadataTableType {
+ /// [`SnapshotsTable`]
+ Snapshots,
+ /// [`ManifestsTable`]
+ Manifests,
+}
+
+impl MetadataTableType {
+ /// Returns the string representation of the metadata table type.
+ pub fn as_str(&self) -> &str {
+ match self {
+ MetadataTableType::Snapshots => "snapshots",
+ MetadataTableType::Manifests => "manifests",
+ }
+ }
+
+ /// Returns all the metadata table types.
+ pub fn all_types() -> impl Iterator<Item = Self> {
+ use strum::IntoEnumIterator;
+ Self::iter()
+ }
+}
+
+impl TryFrom<&str> for MetadataTableType {
+ type Error = String;
+
+ fn try_from(value: &str) -> std::result::Result<Self, String> {
+ match value {
+ "snapshots" => Ok(Self::Snapshots),
+ "manifests" => Ok(Self::Manifests),
+ _ => Err(format!("invalid metadata table type: {value}")),
+ }
+ }
+}
+
impl<'a> MetadataTable<'a> {
/// Creates a new metadata scan.
pub fn new(table: &'a Table) -> Self {
@@ -43,69 +80,3 @@ impl<'a> MetadataTable<'a> {
ManifestsTable::new(self.0)
}
}
-
-#[cfg(test)]
-pub mod tests {
- //! Sharable tests for the metadata table.
-
- use expect_test::Expect;
- use futures::TryStreamExt;
- use itertools::Itertools;
-
- use crate::scan::ArrowRecordBatchStream;
-
- /// Snapshot testing to check the resulting record batch.
- ///
- /// - `expected_schema/data`: put `expect![[""]]` as a placeholder,
- /// and then run test with `UPDATE_EXPECT=1 cargo test` to automatically
update the result,
- /// or use rust-analyzer (see
[video](https://github.com/rust-analyzer/expect-test)).
- /// Check the doc of [`expect_test`] for more details.
- /// - `ignore_check_columns`: Some columns are not stable, so we can skip
them.
- /// - `sort_column`: The order of the data might be non-deterministic, so
we can sort it by a column.
- pub async fn check_record_batches(
- batch_stream: ArrowRecordBatchStream,
- expected_schema: Expect,
- expected_data: Expect,
- ignore_check_columns: &[&str],
- sort_column: Option<&str>,
- ) {
- let record_batches =
batch_stream.try_collect::<Vec<_>>().await.unwrap();
- assert!(!record_batches.is_empty(), "Empty record batches");
-
- // Combine record batches using the first batch's schema
- let first_batch = record_batches.first().unwrap();
- let record_batch =
- arrow_select::concat::concat_batches(&first_batch.schema(),
&record_batches).unwrap();
-
- let mut columns = record_batch.columns().to_vec();
- if let Some(sort_column) = sort_column {
- let column = record_batch.column_by_name(sort_column).unwrap();
- let indices = arrow_ord::sort::sort_to_indices(column, None,
None).unwrap();
- columns = columns
- .iter()
- .map(|column| arrow_select::take::take(column.as_ref(),
&indices, None).unwrap())
- .collect_vec();
- }
-
- expected_schema.assert_eq(&format!(
- "{}",
- record_batch.schema().fields().iter().format(",\n")
- ));
- expected_data.assert_eq(&format!(
- "{}",
- record_batch
- .schema()
- .fields()
- .iter()
- .zip_eq(columns)
- .map(|(field, column)| {
- if ignore_check_columns.contains(&field.name().as_str()) {
- format!("{}: (skipped)", field.name())
- } else {
- format!("{}: {:?}", field.name(), column)
- }
- })
- .format(",\n")
- ));
- }
-}
diff --git a/crates/iceberg/src/inspect/snapshots.rs
b/crates/iceberg/src/inspect/snapshots.rs
index 002cc8eb..6081ec16 100644
--- a/crates/iceberg/src/inspect/snapshots.rs
+++ b/crates/iceberg/src/inspect/snapshots.rs
@@ -137,9 +137,10 @@ impl<'a> SnapshotsTable<'a> {
#[cfg(test)]
mod tests {
use expect_test::expect;
+ use futures::TryStreamExt;
- use crate::inspect::metadata_table::tests::check_record_batches;
use crate::scan::tests::TableTestFixture;
+ use crate::test_utils::check_record_batches;
#[tokio::test]
async fn test_snapshots_table() {
@@ -148,7 +149,7 @@ mod tests {
let batch_stream = table.inspect().snapshots().scan().await.unwrap();
check_record_batches(
- batch_stream,
+ batch_stream.try_collect::<Vec<_>>().await.unwrap(),
expect![[r#"
Field { name: "committed_at", data_type:
Timestamp(Microsecond, Some("+00:00")), nullable: false, dict_id: 0,
dict_is_ordered: false, metadata: {"PARQUET:field_id": "1"} },
Field { name: "snapshot_id", data_type: Int64, nullable:
false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "2"}
},
@@ -211,6 +212,6 @@ mod tests {
]"#]],
&["manifest_list"],
Some("committed_at"),
- ).await;
+ );
}
}
diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs
index 06e39156..63af25e9 100644
--- a/crates/iceberg/src/lib.rs
+++ b/crates/iceberg/src/lib.rs
@@ -84,6 +84,7 @@ mod runtime;
pub mod arrow;
pub(crate) mod delete_file_index;
+pub mod test_utils;
mod utils;
pub mod writer;
diff --git a/crates/iceberg/src/test_utils.rs b/crates/iceberg/src/test_utils.rs
new file mode 100644
index 00000000..527d37bb
--- /dev/null
+++ b/crates/iceberg/src/test_utils.rs
@@ -0,0 +1,78 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Test utilities.
+//! This module is pub just for internal testing.
+//! It is subject to change and is not intended to be used by external users.
+
+use arrow_array::RecordBatch;
+use expect_test::Expect;
+use itertools::Itertools;
+
+/// Snapshot testing to check the resulting record batch.
+///
+/// - `expected_schema/data`: put `expect![[""]]` as a placeholder,
+/// and then run test with `UPDATE_EXPECT=1 cargo test` to automatically
update the result,
+/// or use rust-analyzer (see
[video](https://github.com/rust-analyzer/expect-test)).
+/// Check the doc of [`expect_test`] for more details.
+/// - `ignore_check_columns`: Some columns are not stable, so we can skip them.
+/// - `sort_column`: The order of the data might be non-deterministic, so we
can sort it by a column.
+pub fn check_record_batches(
+ record_batches: Vec<RecordBatch>,
+ expected_schema: Expect,
+ expected_data: Expect,
+ ignore_check_columns: &[&str],
+ sort_column: Option<&str>,
+) {
+ assert!(!record_batches.is_empty(), "Empty record batches");
+
+ // Combine record batches using the first batch's schema
+ let first_batch = record_batches.first().unwrap();
+ let record_batch =
+ arrow_select::concat::concat_batches(&first_batch.schema(),
&record_batches).unwrap();
+
+ let mut columns = record_batch.columns().to_vec();
+ if let Some(sort_column) = sort_column {
+ let column = record_batch.column_by_name(sort_column).unwrap();
+ let indices = arrow_ord::sort::sort_to_indices(column, None,
None).unwrap();
+ columns = columns
+ .iter()
+ .map(|column| arrow_select::take::take(column.as_ref(), &indices,
None).unwrap())
+ .collect_vec();
+ }
+
+ expected_schema.assert_eq(&format!(
+ "{}",
+ record_batch.schema().fields().iter().format(",\n")
+ ));
+ expected_data.assert_eq(&format!(
+ "{}",
+ record_batch
+ .schema()
+ .fields()
+ .iter()
+ .zip_eq(columns)
+ .map(|(field, column)| {
+ if ignore_check_columns.contains(&field.name().as_str()) {
+ format!("{}: (skipped)", field.name())
+ } else {
+ format!("{}: {:?}", field.name(), column)
+ }
+ })
+ .format(",\n")
+ ));
+}
diff --git a/crates/integrations/datafusion/Cargo.toml
b/crates/integrations/datafusion/Cargo.toml
index a152b9d2..0ff370cf 100644
--- a/crates/integrations/datafusion/Cargo.toml
+++ b/crates/integrations/datafusion/Cargo.toml
@@ -37,6 +37,7 @@ iceberg = { workspace = true }
tokio = { workspace = true }
[dev-dependencies]
+expect-test = { workspace = true }
iceberg-catalog-memory = { workspace = true }
parquet = { workspace = true }
tempfile = { workspace = true }
diff --git a/crates/integrations/datafusion/src/physical_plan/metadata_scan.rs
b/crates/integrations/datafusion/src/physical_plan/metadata_scan.rs
new file mode 100644
index 00000000..9a9d0aa0
--- /dev/null
+++ b/crates/integrations/datafusion/src/physical_plan/metadata_scan.rs
@@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use datafusion::catalog::TableProvider;
+use datafusion::physical_expr::EquivalenceProperties;
+use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
+use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
+use datafusion::physical_plan::{DisplayAs, ExecutionPlan, Partitioning,
PlanProperties};
+use futures::TryStreamExt;
+
+use crate::metadata_table::IcebergMetadataTableProvider;
+
+#[derive(Debug)]
+pub struct IcebergMetadataScan {
+ provider: IcebergMetadataTableProvider,
+ properties: PlanProperties,
+}
+
+impl IcebergMetadataScan {
+ pub fn new(provider: IcebergMetadataTableProvider) -> Self {
+ let properties = PlanProperties::new(
+ EquivalenceProperties::new(provider.schema()),
+ Partitioning::UnknownPartitioning(1),
+ EmissionType::Incremental,
+ Boundedness::Bounded,
+ );
+ Self {
+ provider,
+ properties,
+ }
+ }
+}
+
+impl DisplayAs for IcebergMetadataScan {
+ fn fmt_as(
+ &self,
+ _t: datafusion::physical_plan::DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ write!(f, "IcebergMetadataScan")
+ }
+}
+
+impl ExecutionPlan for IcebergMetadataScan {
+ fn name(&self) -> &str {
+ "IcebergMetadataScan"
+ }
+
+ fn as_any(&self) -> &dyn std::any::Any {
+ self
+ }
+
+ fn properties(&self) -> &PlanProperties {
+ &self.properties
+ }
+
+ fn children(&self) -> Vec<&std::sync::Arc<dyn ExecutionPlan>> {
+ vec![]
+ }
+
+ fn with_new_children(
+ self: std::sync::Arc<Self>,
+ _children: Vec<std::sync::Arc<dyn ExecutionPlan>>,
+ ) -> datafusion::error::Result<std::sync::Arc<dyn ExecutionPlan>> {
+ Ok(self)
+ }
+
+ fn execute(
+ &self,
+ _partition: usize,
+ _context: std::sync::Arc<datafusion::execution::TaskContext>,
+ ) ->
datafusion::error::Result<datafusion::execution::SendableRecordBatchStream> {
+ let fut = self.provider.clone().scan();
+ let stream = futures::stream::once(fut).try_flatten();
+ let schema = self.provider.schema();
+ Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
+ }
+}
diff --git a/crates/integrations/datafusion/src/physical_plan/mod.rs
b/crates/integrations/datafusion/src/physical_plan/mod.rs
index 2fab109d..58fb065d 100644
--- a/crates/integrations/datafusion/src/physical_plan/mod.rs
+++ b/crates/integrations/datafusion/src/physical_plan/mod.rs
@@ -16,4 +16,5 @@
// under the License.
pub(crate) mod expr_to_predicate;
+pub(crate) mod metadata_scan;
pub(crate) mod scan;
diff --git a/crates/integrations/datafusion/src/schema.rs
b/crates/integrations/datafusion/src/schema.rs
index 3be6da42..3920ee73 100644
--- a/crates/integrations/datafusion/src/schema.rs
+++ b/crates/integrations/datafusion/src/schema.rs
@@ -22,8 +22,9 @@ use std::sync::Arc;
use async_trait::async_trait;
use datafusion::catalog::SchemaProvider;
use datafusion::datasource::TableProvider;
-use datafusion::error::Result as DFResult;
+use datafusion::error::{DataFusionError, Result as DFResult};
use futures::future::try_join_all;
+use iceberg::inspect::MetadataTableType;
use iceberg::{Catalog, NamespaceIdent, Result};
use crate::table::IcebergTableProvider;
@@ -35,7 +36,7 @@ pub(crate) struct IcebergSchemaProvider {
/// A `HashMap` where keys are table names
/// and values are dynamic references to objects implementing the
/// [`TableProvider`] trait.
- tables: HashMap<String, Arc<dyn TableProvider>>,
+ tables: HashMap<String, Arc<IcebergTableProvider>>,
}
impl IcebergSchemaProvider {
@@ -69,13 +70,10 @@ impl IcebergSchemaProvider {
)
.await?;
- let tables: HashMap<String, Arc<dyn TableProvider>> = table_names
+ let tables: HashMap<String, Arc<IcebergTableProvider>> = table_names
.into_iter()
.zip(providers.into_iter())
- .map(|(name, provider)| {
- let provider = Arc::new(provider) as Arc<dyn TableProvider>;
- (name, provider)
- })
+ .map(|(name, provider)| (name, Arc::new(provider)))
.collect();
Ok(IcebergSchemaProvider { tables })
@@ -89,14 +87,43 @@ impl SchemaProvider for IcebergSchemaProvider {
}
fn table_names(&self) -> Vec<String> {
- self.tables.keys().cloned().collect()
+ self.tables
+ .keys()
+ .flat_map(|table_name| {
+ [table_name.clone()]
+ .into_iter()
+
.chain(MetadataTableType::all_types().map(|metadata_table_name| {
+ format!("{}${}", table_name.clone(),
metadata_table_name.as_str())
+ }))
+ })
+ .collect()
}
fn table_exist(&self, name: &str) -> bool {
- self.tables.contains_key(name)
+ if let Some((table_name, metadata_table_name)) = name.split_once('$') {
+ self.tables.contains_key(table_name)
+ && MetadataTableType::try_from(metadata_table_name).is_ok()
+ } else {
+ self.tables.contains_key(name)
+ }
}
async fn table(&self, name: &str) -> DFResult<Option<Arc<dyn
TableProvider>>> {
- Ok(self.tables.get(name).cloned())
+ if let Some((table_name, metadata_table_name)) = name.split_once('$') {
+ let metadata_table_type =
+
MetadataTableType::try_from(metadata_table_name).map_err(DataFusionError::Plan)?;
+ if let Some(table) = self.tables.get(table_name) {
+ let metadata_table = table.metadata_table(metadata_table_type);
+ return Ok(Some(Arc::new(metadata_table)));
+ } else {
+ return Ok(None);
+ }
+ }
+
+ Ok(self
+ .tables
+ .get(name)
+ .cloned()
+ .map(|t| t as Arc<dyn TableProvider>))
}
}
diff --git a/crates/integrations/datafusion/src/table/metadata_table.rs
b/crates/integrations/datafusion/src/table/metadata_table.rs
new file mode 100644
index 00000000..38148b40
--- /dev/null
+++ b/crates/integrations/datafusion/src/table/metadata_table.rs
@@ -0,0 +1,87 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::any::Any;
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use datafusion::arrow::array::RecordBatch;
+use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef;
+use datafusion::catalog::Session;
+use datafusion::datasource::{TableProvider, TableType};
+use datafusion::error::Result as DFResult;
+use datafusion::logical_expr::Expr;
+use datafusion::physical_plan::ExecutionPlan;
+use futures::TryStreamExt;
+use futures::stream::BoxStream;
+use iceberg::arrow::schema_to_arrow_schema;
+use iceberg::inspect::MetadataTableType;
+use iceberg::table::Table;
+
+use crate::physical_plan::metadata_scan::IcebergMetadataScan;
+use crate::to_datafusion_error;
+
+/// Represents a [`TableProvider`] for the Iceberg [`Catalog`],
+/// managing access to a [`MetadataTable`].
+#[derive(Debug, Clone)]
+pub struct IcebergMetadataTableProvider {
+ pub(crate) table: Table,
+ pub(crate) r#type: MetadataTableType,
+}
+
+#[async_trait]
+impl TableProvider for IcebergMetadataTableProvider {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn schema(&self) -> ArrowSchemaRef {
+ let metadata_table = self.table.inspect();
+ let schema = match self.r#type {
+ MetadataTableType::Snapshots =>
metadata_table.snapshots().schema(),
+ MetadataTableType::Manifests =>
metadata_table.manifests().schema(),
+ };
+ schema_to_arrow_schema(&schema).unwrap().into()
+ }
+
+ fn table_type(&self) -> TableType {
+ TableType::Base
+ }
+
+ async fn scan(
+ &self,
+ _state: &dyn Session,
+ _projection: Option<&Vec<usize>>,
+ _filters: &[Expr],
+ _limit: Option<usize>,
+ ) -> DFResult<Arc<dyn ExecutionPlan>> {
+ Ok(Arc::new(IcebergMetadataScan::new(self.clone())))
+ }
+}
+
+impl IcebergMetadataTableProvider {
+ pub async fn scan(self) -> DFResult<BoxStream<'static,
DFResult<RecordBatch>>> {
+ let metadata_table = self.table.inspect();
+ let stream = match self.r#type {
+ MetadataTableType::Snapshots =>
metadata_table.snapshots().scan().await,
+ MetadataTableType::Manifests =>
metadata_table.manifests().scan().await,
+ }
+ .map_err(to_datafusion_error)?;
+ let stream = stream.map_err(to_datafusion_error);
+ Ok(Box::pin(stream))
+ }
+}
diff --git a/crates/integrations/datafusion/src/table/mod.rs
b/crates/integrations/datafusion/src/table/mod.rs
index df81688d..7f741a53 100644
--- a/crates/integrations/datafusion/src/table/mod.rs
+++ b/crates/integrations/datafusion/src/table/mod.rs
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+pub mod metadata_table;
pub mod table_provider_factory;
use std::any::Any;
@@ -28,8 +29,10 @@ use datafusion::error::Result as DFResult;
use datafusion::logical_expr::{Expr, TableProviderFilterPushDown};
use datafusion::physical_plan::ExecutionPlan;
use iceberg::arrow::schema_to_arrow_schema;
+use iceberg::inspect::MetadataTableType;
use iceberg::table::Table;
use iceberg::{Catalog, Error, ErrorKind, NamespaceIdent, Result, TableIdent};
+use metadata_table::IcebergMetadataTableProvider;
use crate::physical_plan::scan::IcebergTableScan;
@@ -107,6 +110,13 @@ impl IcebergTableProvider {
schema,
})
}
+
+ pub(crate) fn metadata_table(&self, r#type: MetadataTableType) ->
IcebergMetadataTableProvider {
+ IcebergMetadataTableProvider {
+ table: self.table.clone(),
+ r#type,
+ }
+ }
}
#[async_trait]
diff --git
a/crates/integrations/datafusion/tests/integration_datafusion_test.rs
b/crates/integrations/datafusion/tests/integration_datafusion_test.rs
index c21b72b3..e9b0eb3c 100644
--- a/crates/integrations/datafusion/tests/integration_datafusion_test.rs
+++ b/crates/integrations/datafusion/tests/integration_datafusion_test.rs
@@ -25,8 +25,10 @@ use datafusion::arrow::array::{Array, StringArray};
use datafusion::arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
use datafusion::execution::context::SessionContext;
use datafusion::parquet::arrow::PARQUET_FIELD_ID_META_KEY;
+use expect_test::expect;
use iceberg::io::FileIOBuilder;
use iceberg::spec::{NestedField, PrimitiveType, Schema, StructType, Type};
+use iceberg::test_utils::check_record_batches;
use iceberg::{Catalog, NamespaceIdent, Result, TableCreation};
use iceberg_catalog_memory::MemoryCatalog;
use iceberg_datafusion::IcebergCatalogProvider;
@@ -154,10 +156,16 @@ async fn test_provider_list_table_names() -> Result<()> {
let provider = ctx.catalog("catalog").unwrap();
let schema = provider.schema("test_provider_list_table_names").unwrap();
- let expected = vec!["my_table"];
let result = schema.table_names();
- assert_eq!(result, expected);
+ expect![[r#"
+ [
+ "my_table",
+ "my_table$snapshots",
+ "my_table$manifests",
+ ]
+ "#]]
+ .assert_debug_eq(&result);
Ok(())
}
@@ -299,3 +307,129 @@ async fn test_table_predict_pushdown() -> Result<()> {
assert!(s.value(1).trim().contains(expected));
Ok(())
}
+
+#[tokio::test]
+async fn test_metadata_table() -> Result<()> {
+ let iceberg_catalog = get_iceberg_catalog();
+ let namespace = NamespaceIdent::new("ns".to_string());
+ set_test_namespace(&iceberg_catalog, &namespace).await?;
+
+ let schema = Schema::builder()
+ .with_schema_id(0)
+ .with_fields(vec![
+ NestedField::required(1, "foo",
Type::Primitive(PrimitiveType::Int)).into(),
+ NestedField::optional(2, "bar",
Type::Primitive(PrimitiveType::String)).into(),
+ ])
+ .build()?;
+ let creation = get_table_creation(temp_path(), "t1", Some(schema))?;
+ iceberg_catalog.create_table(&namespace, creation).await?;
+
+ let client = Arc::new(iceberg_catalog);
+ let catalog = Arc::new(IcebergCatalogProvider::try_new(client).await?);
+
+ let ctx = SessionContext::new();
+ ctx.register_catalog("catalog", catalog);
+ let snapshots = ctx
+ .sql("select * from catalog.ns.t1$snapshots")
+ .await
+ .unwrap()
+ .collect()
+ .await
+ .unwrap();
+ check_record_batches(
+ snapshots,
+ expect![[r#"
+ Field { name: "committed_at", data_type: Timestamp(Microsecond,
Some("+00:00")), nullable: false, dict_id: 0, dict_is_ordered: false, metadata:
{"PARQUET:field_id": "1"} },
+ Field { name: "snapshot_id", data_type: Int64, nullable: false,
dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "2"} },
+ Field { name: "parent_id", data_type: Int64, nullable: true,
dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "3"} },
+ Field { name: "operation", data_type: Utf8, nullable: true,
dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "4"} },
+ Field { name: "manifest_list", data_type: Utf8, nullable: true,
dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "5"} },
+ Field { name: "summary", data_type: Map(Field { name: "key_value",
data_type: Struct([Field { name: "key", data_type: Utf8, nullable: false,
dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "7"} },
Field { name: "value", data_type: Utf8, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {"PARQUET:field_id": "8"} }]), nullable:
false, dict_id: 0, dict_is_ordered: false, metadata: {} }, false), nullable:
true, dict_id: 0, dict_is_ordered: false, me [...]
+ expect![[r#"
+ committed_at: PrimitiveArray<Timestamp(Microsecond,
Some("+00:00"))>
+ [
+ ],
+ snapshot_id: PrimitiveArray<Int64>
+ [
+ ],
+ parent_id: PrimitiveArray<Int64>
+ [
+ ],
+ operation: StringArray
+ [
+ ],
+ manifest_list: StringArray
+ [
+ ],
+ summary: MapArray
+ [
+ ]"#]],
+ &[],
+ None,
+ );
+
+ let manifests = ctx
+ .sql("select * from catalog.ns.t1$manifests")
+ .await
+ .unwrap()
+ .collect()
+ .await
+ .unwrap();
+ check_record_batches(
+ manifests,
+ expect![[r#"
+ Field { name: "content", data_type: Int32, nullable: false,
dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "14"} },
+ Field { name: "path", data_type: Utf8, nullable: false, dict_id:
0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1"} },
+ Field { name: "length", data_type: Int64, nullable: false,
dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "2"} },
+ Field { name: "partition_spec_id", data_type: Int32, nullable:
false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "3"}
},
+ Field { name: "added_snapshot_id", data_type: Int64, nullable:
false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "4"}
},
+ Field { name: "added_data_files_count", data_type: Int32,
nullable: false, dict_id: 0, dict_is_ordered: false, metadata:
{"PARQUET:field_id": "5"} },
+ Field { name: "existing_data_files_count", data_type: Int32,
nullable: false, dict_id: 0, dict_is_ordered: false, metadata:
{"PARQUET:field_id": "6"} },
+ Field { name: "deleted_data_files_count", data_type: Int32,
nullable: false, dict_id: 0, dict_is_ordered: false, metadata:
{"PARQUET:field_id": "7"} },
+ Field { name: "added_delete_files_count", data_type: Int32,
nullable: false, dict_id: 0, dict_is_ordered: false, metadata:
{"PARQUET:field_id": "15"} },
+ Field { name: "existing_delete_files_count", data_type: Int32,
nullable: false, dict_id: 0, dict_is_ordered: false, metadata:
{"PARQUET:field_id": "16"} },
+ Field { name: "deleted_delete_files_count", data_type: Int32,
nullable: false, dict_id: 0, dict_is_ordered: false, metadata:
{"PARQUET:field_id": "17"} },
+ Field { name: "partition_summaries", data_type: List(Field { name:
"item", data_type: Struct([Field { name: "contains_null", data_type: Boolean,
nullable: false, dict_id: 0, dict_is_ordered: false, metadata:
{"PARQUET:field_id": "10"} }, Field { name: "contains_nan", data_type: Boolean,
nullable: true, dict_id: 0, dict_is_ordered: false, metadata:
{"PARQUET:field_id": "11"} }, Field { name: "lower_bound", data_type: Utf8,
nullable: true, dict_id: 0, dict_is_ordered: false, me [...]
+ expect![[r#"
+ content: PrimitiveArray<Int32>
+ [
+ ],
+ path: StringArray
+ [
+ ],
+ length: PrimitiveArray<Int64>
+ [
+ ],
+ partition_spec_id: PrimitiveArray<Int32>
+ [
+ ],
+ added_snapshot_id: PrimitiveArray<Int64>
+ [
+ ],
+ added_data_files_count: PrimitiveArray<Int32>
+ [
+ ],
+ existing_data_files_count: PrimitiveArray<Int32>
+ [
+ ],
+ deleted_data_files_count: PrimitiveArray<Int32>
+ [
+ ],
+ added_delete_files_count: PrimitiveArray<Int32>
+ [
+ ],
+ existing_delete_files_count: PrimitiveArray<Int32>
+ [
+ ],
+ deleted_delete_files_count: PrimitiveArray<Int32>
+ [
+ ],
+ partition_summaries: ListArray
+ [
+ ]"#]],
+ &[],
+ None,
+ );
+
+ Ok(())
+}