This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 36cc1208 feat(datafusion): support metadata tables for Datafusion 
(#879)
36cc1208 is described below

commit 36cc12087bc118f1fe10efa6b30db98bd9655ba7
Author: xxchan <[email protected]>
AuthorDate: Wed Jul 2 00:16:43 2025 +0800

    feat(datafusion): support metadata tables for Datafusion (#879)
    
    Use the newly added metadata tables #823 to support this feature in
    Datafusion engine to demonstrate its usage.
    
    This may also potentially help design and stablize the interface of
    metadata tables in the `iceberg` crate.
    
    ---------
    
    Signed-off-by: xxchan <[email protected]>
---
 Cargo.lock                                         |  28 ++++-
 Cargo.toml                                         |   1 +
 crates/iceberg/Cargo.toml                          |   2 +
 crates/iceberg/src/inspect/manifests.rs            |   7 +-
 crates/iceberg/src/inspect/metadata_table.rs       | 103 ++++++---------
 crates/iceberg/src/inspect/snapshots.rs            |   7 +-
 crates/iceberg/src/lib.rs                          |   1 +
 crates/iceberg/src/test_utils.rs                   |  78 ++++++++++++
 crates/integrations/datafusion/Cargo.toml          |   1 +
 .../datafusion/src/physical_plan/metadata_scan.rs  |  92 ++++++++++++++
 .../datafusion/src/physical_plan/mod.rs            |   1 +
 crates/integrations/datafusion/src/schema.rs       |  47 +++++--
 .../datafusion/src/table/metadata_table.rs         |  87 +++++++++++++
 crates/integrations/datafusion/src/table/mod.rs    |  10 ++
 .../tests/integration_datafusion_test.rs           | 138 ++++++++++++++++++++-
 15 files changed, 517 insertions(+), 86 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index ca88923c..a356bb38 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -181,8 +181,8 @@ dependencies = [
  "serde_bytes",
  "serde_json",
  "snap",
- "strum",
- "strum_macros",
+ "strum 0.26.3",
+ "strum_macros 0.26.4",
  "thiserror 1.0.69",
  "typed-builder 0.19.1",
  "uuid",
@@ -3529,6 +3529,7 @@ dependencies = [
  "serde_json",
  "serde_repr",
  "serde_with",
+ "strum 0.27.1",
  "tempfile",
  "tera",
  "thrift",
@@ -3671,6 +3672,7 @@ dependencies = [
  "anyhow",
  "async-trait",
  "datafusion",
+ "expect-test",
  "futures",
  "iceberg",
  "iceberg-catalog-memory",
@@ -6732,6 +6734,15 @@ version = "0.26.3"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "8fec0f0aef304996cf250b31b5a10dee7980c85da9d759361292b8bca5a18f06"
 
+[[package]]
+name = "strum"
+version = "0.27.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "f64def088c51c9510a8579e3c5d67c65349dcf755e5479ad3d010aa6454e2c32"
+dependencies = [
+ "strum_macros 0.27.1",
+]
+
 [[package]]
 name = "strum_macros"
 version = "0.26.4"
@@ -6745,6 +6756,19 @@ dependencies = [
  "syn 2.0.101",
 ]
 
+[[package]]
+name = "strum_macros"
+version = "0.27.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "c77a8c5abcaf0f9ce05d62342b7d298c346515365c36b673df4ebe3ced01fde8"
+dependencies = [
+ "heck",
+ "proc-macro2",
+ "quote",
+ "rustversion",
+ "syn 2.0.101",
+]
+
 [[package]]
 name = "subst"
 version = "0.3.7"
diff --git a/Cargo.toml b/Cargo.toml
index 43d03520..fd83fd55 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -107,6 +107,7 @@ serde_repr = "0.1.16"
 serde_with = "3.4"
 sqllogictest = "0.28"
 stacker = "0.1.20"
+strum = "0.27"
 tempfile = "3.18"
 tera = "1"
 thrift = "0.17.0"
diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml
index 9f5d4901..52f5b4d8 100644
--- a/crates/iceberg/Cargo.toml
+++ b/crates/iceberg/Cargo.toml
@@ -62,6 +62,7 @@ bimap = { workspace = true }
 bytes = { workspace = true }
 chrono = { workspace = true }
 derive_builder = { workspace = true }
+expect-test = { workspace = true }
 fnv = { workspace = true }
 futures = { workspace = true }
 itertools = { workspace = true }
@@ -82,6 +83,7 @@ serde_derive = { workspace = true }
 serde_json = { workspace = true }
 serde_repr = { workspace = true }
 serde_with = { workspace = true }
+strum = { workspace = true, features = ["derive"] }
 thrift = { workspace = true }
 tokio = { workspace = true, optional = false, features = ["sync"] }
 typed-builder = { workspace = true }
diff --git a/crates/iceberg/src/inspect/manifests.rs 
b/crates/iceberg/src/inspect/manifests.rs
index 9fd732ea..60854b8b 100644
--- a/crates/iceberg/src/inspect/manifests.rs
+++ b/crates/iceberg/src/inspect/manifests.rs
@@ -281,9 +281,10 @@ impl<'a> ManifestsTable<'a> {
 #[cfg(test)]
 mod tests {
     use expect_test::expect;
+    use futures::TryStreamExt;
 
-    use crate::inspect::metadata_table::tests::check_record_batches;
     use crate::scan::tests::TableTestFixture;
+    use crate::test_utils::check_record_batches;
 
     #[tokio::test]
     async fn test_manifests_table() {
@@ -293,7 +294,7 @@ mod tests {
         let record_batch = 
fixture.table.inspect().manifests().scan().await.unwrap();
 
         check_record_batches(
-            record_batch,
+            record_batch.try_collect::<Vec<_>>().await.unwrap(),
             expect![[r#"
                 Field { name: "content", data_type: Int32, nullable: false, 
dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "14"} },
                 Field { name: "path", data_type: Utf8, nullable: false, 
dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1"} },
@@ -378,6 +379,6 @@ mod tests {
                 ]"#]],
             &["path", "length"],
             Some("path"),
-        ).await;
+        );
     }
 }
diff --git a/crates/iceberg/src/inspect/metadata_table.rs 
b/crates/iceberg/src/inspect/metadata_table.rs
index b2dfbb7b..92571db1 100644
--- a/crates/iceberg/src/inspect/metadata_table.rs
+++ b/crates/iceberg/src/inspect/metadata_table.rs
@@ -27,6 +27,43 @@ use crate::table::Table;
 #[derive(Debug)]
 pub struct MetadataTable<'a>(&'a Table);
 
+/// Metadata table type.
+#[derive(Debug, Clone, strum::EnumIter)]
+pub enum MetadataTableType {
+    /// [`SnapshotsTable`]
+    Snapshots,
+    /// [`ManifestsTable`]
+    Manifests,
+}
+
+impl MetadataTableType {
+    /// Returns the string representation of the metadata table type.
+    pub fn as_str(&self) -> &str {
+        match self {
+            MetadataTableType::Snapshots => "snapshots",
+            MetadataTableType::Manifests => "manifests",
+        }
+    }
+
+    /// Returns all the metadata table types.
+    pub fn all_types() -> impl Iterator<Item = Self> {
+        use strum::IntoEnumIterator;
+        Self::iter()
+    }
+}
+
+impl TryFrom<&str> for MetadataTableType {
+    type Error = String;
+
+    fn try_from(value: &str) -> std::result::Result<Self, String> {
+        match value {
+            "snapshots" => Ok(Self::Snapshots),
+            "manifests" => Ok(Self::Manifests),
+            _ => Err(format!("invalid metadata table type: {value}")),
+        }
+    }
+}
+
 impl<'a> MetadataTable<'a> {
     /// Creates a new metadata scan.
     pub fn new(table: &'a Table) -> Self {
@@ -43,69 +80,3 @@ impl<'a> MetadataTable<'a> {
         ManifestsTable::new(self.0)
     }
 }
-
-#[cfg(test)]
-pub mod tests {
-    //! Sharable tests for the metadata table.
-
-    use expect_test::Expect;
-    use futures::TryStreamExt;
-    use itertools::Itertools;
-
-    use crate::scan::ArrowRecordBatchStream;
-
-    /// Snapshot testing to check the resulting record batch.
-    ///
-    /// - `expected_schema/data`: put `expect![[""]]` as a placeholder,
-    ///   and then run test with `UPDATE_EXPECT=1 cargo test` to automatically 
update the result,
-    ///   or use rust-analyzer (see 
[video](https://github.com/rust-analyzer/expect-test)).
-    ///   Check the doc of [`expect_test`] for more details.
-    /// - `ignore_check_columns`: Some columns are not stable, so we can skip 
them.
-    /// - `sort_column`: The order of the data might be non-deterministic, so 
we can sort it by a column.
-    pub async fn check_record_batches(
-        batch_stream: ArrowRecordBatchStream,
-        expected_schema: Expect,
-        expected_data: Expect,
-        ignore_check_columns: &[&str],
-        sort_column: Option<&str>,
-    ) {
-        let record_batches = 
batch_stream.try_collect::<Vec<_>>().await.unwrap();
-        assert!(!record_batches.is_empty(), "Empty record batches");
-
-        // Combine record batches using the first batch's schema
-        let first_batch = record_batches.first().unwrap();
-        let record_batch =
-            arrow_select::concat::concat_batches(&first_batch.schema(), 
&record_batches).unwrap();
-
-        let mut columns = record_batch.columns().to_vec();
-        if let Some(sort_column) = sort_column {
-            let column = record_batch.column_by_name(sort_column).unwrap();
-            let indices = arrow_ord::sort::sort_to_indices(column, None, 
None).unwrap();
-            columns = columns
-                .iter()
-                .map(|column| arrow_select::take::take(column.as_ref(), 
&indices, None).unwrap())
-                .collect_vec();
-        }
-
-        expected_schema.assert_eq(&format!(
-            "{}",
-            record_batch.schema().fields().iter().format(",\n")
-        ));
-        expected_data.assert_eq(&format!(
-            "{}",
-            record_batch
-                .schema()
-                .fields()
-                .iter()
-                .zip_eq(columns)
-                .map(|(field, column)| {
-                    if ignore_check_columns.contains(&field.name().as_str()) {
-                        format!("{}: (skipped)", field.name())
-                    } else {
-                        format!("{}: {:?}", field.name(), column)
-                    }
-                })
-                .format(",\n")
-        ));
-    }
-}
diff --git a/crates/iceberg/src/inspect/snapshots.rs 
b/crates/iceberg/src/inspect/snapshots.rs
index 002cc8eb..6081ec16 100644
--- a/crates/iceberg/src/inspect/snapshots.rs
+++ b/crates/iceberg/src/inspect/snapshots.rs
@@ -137,9 +137,10 @@ impl<'a> SnapshotsTable<'a> {
 #[cfg(test)]
 mod tests {
     use expect_test::expect;
+    use futures::TryStreamExt;
 
-    use crate::inspect::metadata_table::tests::check_record_batches;
     use crate::scan::tests::TableTestFixture;
+    use crate::test_utils::check_record_batches;
 
     #[tokio::test]
     async fn test_snapshots_table() {
@@ -148,7 +149,7 @@ mod tests {
         let batch_stream = table.inspect().snapshots().scan().await.unwrap();
 
         check_record_batches(
-            batch_stream,
+            batch_stream.try_collect::<Vec<_>>().await.unwrap(),
             expect![[r#"
                 Field { name: "committed_at", data_type: 
Timestamp(Microsecond, Some("+00:00")), nullable: false, dict_id: 0, 
dict_is_ordered: false, metadata: {"PARQUET:field_id": "1"} },
                 Field { name: "snapshot_id", data_type: Int64, nullable: 
false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "2"} 
},
@@ -211,6 +212,6 @@ mod tests {
                 ]"#]],
             &["manifest_list"],
             Some("committed_at"),
-        ).await;
+        );
     }
 }
diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs
index 06e39156..63af25e9 100644
--- a/crates/iceberg/src/lib.rs
+++ b/crates/iceberg/src/lib.rs
@@ -84,6 +84,7 @@ mod runtime;
 
 pub mod arrow;
 pub(crate) mod delete_file_index;
+pub mod test_utils;
 mod utils;
 pub mod writer;
 
diff --git a/crates/iceberg/src/test_utils.rs b/crates/iceberg/src/test_utils.rs
new file mode 100644
index 00000000..527d37bb
--- /dev/null
+++ b/crates/iceberg/src/test_utils.rs
@@ -0,0 +1,78 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Test utilities.
+//! This module is pub just for internal testing.
+//! It is subject to change and is not intended to be used by external users.
+
+use arrow_array::RecordBatch;
+use expect_test::Expect;
+use itertools::Itertools;
+
+/// Snapshot testing to check the resulting record batch.
+///
+/// - `expected_schema/data`: put `expect![[""]]` as a placeholder,
+///   and then run test with `UPDATE_EXPECT=1 cargo test` to automatically 
update the result,
+///   or use rust-analyzer (see 
[video](https://github.com/rust-analyzer/expect-test)).
+///   Check the doc of [`expect_test`] for more details.
+/// - `ignore_check_columns`: Some columns are not stable, so we can skip them.
+/// - `sort_column`: The order of the data might be non-deterministic, so we 
can sort it by a column.
+pub fn check_record_batches(
+    record_batches: Vec<RecordBatch>,
+    expected_schema: Expect,
+    expected_data: Expect,
+    ignore_check_columns: &[&str],
+    sort_column: Option<&str>,
+) {
+    assert!(!record_batches.is_empty(), "Empty record batches");
+
+    // Combine record batches using the first batch's schema
+    let first_batch = record_batches.first().unwrap();
+    let record_batch =
+        arrow_select::concat::concat_batches(&first_batch.schema(), 
&record_batches).unwrap();
+
+    let mut columns = record_batch.columns().to_vec();
+    if let Some(sort_column) = sort_column {
+        let column = record_batch.column_by_name(sort_column).unwrap();
+        let indices = arrow_ord::sort::sort_to_indices(column, None, 
None).unwrap();
+        columns = columns
+            .iter()
+            .map(|column| arrow_select::take::take(column.as_ref(), &indices, 
None).unwrap())
+            .collect_vec();
+    }
+
+    expected_schema.assert_eq(&format!(
+        "{}",
+        record_batch.schema().fields().iter().format(",\n")
+    ));
+    expected_data.assert_eq(&format!(
+        "{}",
+        record_batch
+            .schema()
+            .fields()
+            .iter()
+            .zip_eq(columns)
+            .map(|(field, column)| {
+                if ignore_check_columns.contains(&field.name().as_str()) {
+                    format!("{}: (skipped)", field.name())
+                } else {
+                    format!("{}: {:?}", field.name(), column)
+                }
+            })
+            .format(",\n")
+    ));
+}
diff --git a/crates/integrations/datafusion/Cargo.toml 
b/crates/integrations/datafusion/Cargo.toml
index a152b9d2..0ff370cf 100644
--- a/crates/integrations/datafusion/Cargo.toml
+++ b/crates/integrations/datafusion/Cargo.toml
@@ -37,6 +37,7 @@ iceberg = { workspace = true }
 tokio = { workspace = true }
 
 [dev-dependencies]
+expect-test = { workspace = true }
 iceberg-catalog-memory = { workspace = true }
 parquet = { workspace = true }
 tempfile = { workspace = true }
diff --git a/crates/integrations/datafusion/src/physical_plan/metadata_scan.rs 
b/crates/integrations/datafusion/src/physical_plan/metadata_scan.rs
new file mode 100644
index 00000000..9a9d0aa0
--- /dev/null
+++ b/crates/integrations/datafusion/src/physical_plan/metadata_scan.rs
@@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use datafusion::catalog::TableProvider;
+use datafusion::physical_expr::EquivalenceProperties;
+use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
+use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
+use datafusion::physical_plan::{DisplayAs, ExecutionPlan, Partitioning, 
PlanProperties};
+use futures::TryStreamExt;
+
+use crate::metadata_table::IcebergMetadataTableProvider;
+
+#[derive(Debug)]
+pub struct IcebergMetadataScan {
+    provider: IcebergMetadataTableProvider,
+    properties: PlanProperties,
+}
+
+impl IcebergMetadataScan {
+    pub fn new(provider: IcebergMetadataTableProvider) -> Self {
+        let properties = PlanProperties::new(
+            EquivalenceProperties::new(provider.schema()),
+            Partitioning::UnknownPartitioning(1),
+            EmissionType::Incremental,
+            Boundedness::Bounded,
+        );
+        Self {
+            provider,
+            properties,
+        }
+    }
+}
+
+impl DisplayAs for IcebergMetadataScan {
+    fn fmt_as(
+        &self,
+        _t: datafusion::physical_plan::DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        write!(f, "IcebergMetadataScan")
+    }
+}
+
+impl ExecutionPlan for IcebergMetadataScan {
+    fn name(&self) -> &str {
+        "IcebergMetadataScan"
+    }
+
+    fn as_any(&self) -> &dyn std::any::Any {
+        self
+    }
+
+    fn properties(&self) -> &PlanProperties {
+        &self.properties
+    }
+
+    fn children(&self) -> Vec<&std::sync::Arc<dyn ExecutionPlan>> {
+        vec![]
+    }
+
+    fn with_new_children(
+        self: std::sync::Arc<Self>,
+        _children: Vec<std::sync::Arc<dyn ExecutionPlan>>,
+    ) -> datafusion::error::Result<std::sync::Arc<dyn ExecutionPlan>> {
+        Ok(self)
+    }
+
+    fn execute(
+        &self,
+        _partition: usize,
+        _context: std::sync::Arc<datafusion::execution::TaskContext>,
+    ) -> 
datafusion::error::Result<datafusion::execution::SendableRecordBatchStream> {
+        let fut = self.provider.clone().scan();
+        let stream = futures::stream::once(fut).try_flatten();
+        let schema = self.provider.schema();
+        Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
+    }
+}
diff --git a/crates/integrations/datafusion/src/physical_plan/mod.rs 
b/crates/integrations/datafusion/src/physical_plan/mod.rs
index 2fab109d..58fb065d 100644
--- a/crates/integrations/datafusion/src/physical_plan/mod.rs
+++ b/crates/integrations/datafusion/src/physical_plan/mod.rs
@@ -16,4 +16,5 @@
 // under the License.
 
 pub(crate) mod expr_to_predicate;
+pub(crate) mod metadata_scan;
 pub(crate) mod scan;
diff --git a/crates/integrations/datafusion/src/schema.rs 
b/crates/integrations/datafusion/src/schema.rs
index 3be6da42..3920ee73 100644
--- a/crates/integrations/datafusion/src/schema.rs
+++ b/crates/integrations/datafusion/src/schema.rs
@@ -22,8 +22,9 @@ use std::sync::Arc;
 use async_trait::async_trait;
 use datafusion::catalog::SchemaProvider;
 use datafusion::datasource::TableProvider;
-use datafusion::error::Result as DFResult;
+use datafusion::error::{DataFusionError, Result as DFResult};
 use futures::future::try_join_all;
+use iceberg::inspect::MetadataTableType;
 use iceberg::{Catalog, NamespaceIdent, Result};
 
 use crate::table::IcebergTableProvider;
@@ -35,7 +36,7 @@ pub(crate) struct IcebergSchemaProvider {
     /// A `HashMap` where keys are table names
     /// and values are dynamic references to objects implementing the
     /// [`TableProvider`] trait.
-    tables: HashMap<String, Arc<dyn TableProvider>>,
+    tables: HashMap<String, Arc<IcebergTableProvider>>,
 }
 
 impl IcebergSchemaProvider {
@@ -69,13 +70,10 @@ impl IcebergSchemaProvider {
         )
         .await?;
 
-        let tables: HashMap<String, Arc<dyn TableProvider>> = table_names
+        let tables: HashMap<String, Arc<IcebergTableProvider>> = table_names
             .into_iter()
             .zip(providers.into_iter())
-            .map(|(name, provider)| {
-                let provider = Arc::new(provider) as Arc<dyn TableProvider>;
-                (name, provider)
-            })
+            .map(|(name, provider)| (name, Arc::new(provider)))
             .collect();
 
         Ok(IcebergSchemaProvider { tables })
@@ -89,14 +87,43 @@ impl SchemaProvider for IcebergSchemaProvider {
     }
 
     fn table_names(&self) -> Vec<String> {
-        self.tables.keys().cloned().collect()
+        self.tables
+            .keys()
+            .flat_map(|table_name| {
+                [table_name.clone()]
+                    .into_iter()
+                    
.chain(MetadataTableType::all_types().map(|metadata_table_name| {
+                        format!("{}${}", table_name.clone(), 
metadata_table_name.as_str())
+                    }))
+            })
+            .collect()
     }
 
     fn table_exist(&self, name: &str) -> bool {
-        self.tables.contains_key(name)
+        if let Some((table_name, metadata_table_name)) = name.split_once('$') {
+            self.tables.contains_key(table_name)
+                && MetadataTableType::try_from(metadata_table_name).is_ok()
+        } else {
+            self.tables.contains_key(name)
+        }
     }
 
     async fn table(&self, name: &str) -> DFResult<Option<Arc<dyn 
TableProvider>>> {
-        Ok(self.tables.get(name).cloned())
+        if let Some((table_name, metadata_table_name)) = name.split_once('$') {
+            let metadata_table_type =
+                
MetadataTableType::try_from(metadata_table_name).map_err(DataFusionError::Plan)?;
+            if let Some(table) = self.tables.get(table_name) {
+                let metadata_table = table.metadata_table(metadata_table_type);
+                return Ok(Some(Arc::new(metadata_table)));
+            } else {
+                return Ok(None);
+            }
+        }
+
+        Ok(self
+            .tables
+            .get(name)
+            .cloned()
+            .map(|t| t as Arc<dyn TableProvider>))
     }
 }
diff --git a/crates/integrations/datafusion/src/table/metadata_table.rs 
b/crates/integrations/datafusion/src/table/metadata_table.rs
new file mode 100644
index 00000000..38148b40
--- /dev/null
+++ b/crates/integrations/datafusion/src/table/metadata_table.rs
@@ -0,0 +1,87 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::any::Any;
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use datafusion::arrow::array::RecordBatch;
+use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef;
+use datafusion::catalog::Session;
+use datafusion::datasource::{TableProvider, TableType};
+use datafusion::error::Result as DFResult;
+use datafusion::logical_expr::Expr;
+use datafusion::physical_plan::ExecutionPlan;
+use futures::TryStreamExt;
+use futures::stream::BoxStream;
+use iceberg::arrow::schema_to_arrow_schema;
+use iceberg::inspect::MetadataTableType;
+use iceberg::table::Table;
+
+use crate::physical_plan::metadata_scan::IcebergMetadataScan;
+use crate::to_datafusion_error;
+
+/// Represents a [`TableProvider`] for the Iceberg [`Catalog`],
+/// managing access to a [`MetadataTable`].
+#[derive(Debug, Clone)]
+pub struct IcebergMetadataTableProvider {
+    pub(crate) table: Table,
+    pub(crate) r#type: MetadataTableType,
+}
+
+#[async_trait]
+impl TableProvider for IcebergMetadataTableProvider {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn schema(&self) -> ArrowSchemaRef {
+        let metadata_table = self.table.inspect();
+        let schema = match self.r#type {
+            MetadataTableType::Snapshots => 
metadata_table.snapshots().schema(),
+            MetadataTableType::Manifests => 
metadata_table.manifests().schema(),
+        };
+        schema_to_arrow_schema(&schema).unwrap().into()
+    }
+
+    fn table_type(&self) -> TableType {
+        TableType::Base
+    }
+
+    async fn scan(
+        &self,
+        _state: &dyn Session,
+        _projection: Option<&Vec<usize>>,
+        _filters: &[Expr],
+        _limit: Option<usize>,
+    ) -> DFResult<Arc<dyn ExecutionPlan>> {
+        Ok(Arc::new(IcebergMetadataScan::new(self.clone())))
+    }
+}
+
+impl IcebergMetadataTableProvider {
+    pub async fn scan(self) -> DFResult<BoxStream<'static, 
DFResult<RecordBatch>>> {
+        let metadata_table = self.table.inspect();
+        let stream = match self.r#type {
+            MetadataTableType::Snapshots => 
metadata_table.snapshots().scan().await,
+            MetadataTableType::Manifests => 
metadata_table.manifests().scan().await,
+        }
+        .map_err(to_datafusion_error)?;
+        let stream = stream.map_err(to_datafusion_error);
+        Ok(Box::pin(stream))
+    }
+}
diff --git a/crates/integrations/datafusion/src/table/mod.rs 
b/crates/integrations/datafusion/src/table/mod.rs
index df81688d..7f741a53 100644
--- a/crates/integrations/datafusion/src/table/mod.rs
+++ b/crates/integrations/datafusion/src/table/mod.rs
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+pub mod metadata_table;
 pub mod table_provider_factory;
 
 use std::any::Any;
@@ -28,8 +29,10 @@ use datafusion::error::Result as DFResult;
 use datafusion::logical_expr::{Expr, TableProviderFilterPushDown};
 use datafusion::physical_plan::ExecutionPlan;
 use iceberg::arrow::schema_to_arrow_schema;
+use iceberg::inspect::MetadataTableType;
 use iceberg::table::Table;
 use iceberg::{Catalog, Error, ErrorKind, NamespaceIdent, Result, TableIdent};
+use metadata_table::IcebergMetadataTableProvider;
 
 use crate::physical_plan::scan::IcebergTableScan;
 
@@ -107,6 +110,13 @@ impl IcebergTableProvider {
             schema,
         })
     }
+
+    pub(crate) fn metadata_table(&self, r#type: MetadataTableType) -> 
IcebergMetadataTableProvider {
+        IcebergMetadataTableProvider {
+            table: self.table.clone(),
+            r#type,
+        }
+    }
 }
 
 #[async_trait]
diff --git 
a/crates/integrations/datafusion/tests/integration_datafusion_test.rs 
b/crates/integrations/datafusion/tests/integration_datafusion_test.rs
index c21b72b3..e9b0eb3c 100644
--- a/crates/integrations/datafusion/tests/integration_datafusion_test.rs
+++ b/crates/integrations/datafusion/tests/integration_datafusion_test.rs
@@ -25,8 +25,10 @@ use datafusion::arrow::array::{Array, StringArray};
 use datafusion::arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
 use datafusion::execution::context::SessionContext;
 use datafusion::parquet::arrow::PARQUET_FIELD_ID_META_KEY;
+use expect_test::expect;
 use iceberg::io::FileIOBuilder;
 use iceberg::spec::{NestedField, PrimitiveType, Schema, StructType, Type};
+use iceberg::test_utils::check_record_batches;
 use iceberg::{Catalog, NamespaceIdent, Result, TableCreation};
 use iceberg_catalog_memory::MemoryCatalog;
 use iceberg_datafusion::IcebergCatalogProvider;
@@ -154,10 +156,16 @@ async fn test_provider_list_table_names() -> Result<()> {
     let provider = ctx.catalog("catalog").unwrap();
     let schema = provider.schema("test_provider_list_table_names").unwrap();
 
-    let expected = vec!["my_table"];
     let result = schema.table_names();
 
-    assert_eq!(result, expected);
+    expect![[r#"
+        [
+            "my_table",
+            "my_table$snapshots",
+            "my_table$manifests",
+        ]
+    "#]]
+    .assert_debug_eq(&result);
 
     Ok(())
 }
@@ -299,3 +307,129 @@ async fn test_table_predict_pushdown() -> Result<()> {
     assert!(s.value(1).trim().contains(expected));
     Ok(())
 }
+
+#[tokio::test]
+async fn test_metadata_table() -> Result<()> {
+    let iceberg_catalog = get_iceberg_catalog();
+    let namespace = NamespaceIdent::new("ns".to_string());
+    set_test_namespace(&iceberg_catalog, &namespace).await?;
+
+    let schema = Schema::builder()
+        .with_schema_id(0)
+        .with_fields(vec![
+            NestedField::required(1, "foo", 
Type::Primitive(PrimitiveType::Int)).into(),
+            NestedField::optional(2, "bar", 
Type::Primitive(PrimitiveType::String)).into(),
+        ])
+        .build()?;
+    let creation = get_table_creation(temp_path(), "t1", Some(schema))?;
+    iceberg_catalog.create_table(&namespace, creation).await?;
+
+    let client = Arc::new(iceberg_catalog);
+    let catalog = Arc::new(IcebergCatalogProvider::try_new(client).await?);
+
+    let ctx = SessionContext::new();
+    ctx.register_catalog("catalog", catalog);
+    let snapshots = ctx
+        .sql("select * from catalog.ns.t1$snapshots")
+        .await
+        .unwrap()
+        .collect()
+        .await
+        .unwrap();
+    check_record_batches(
+        snapshots,
+        expect![[r#"
+            Field { name: "committed_at", data_type: Timestamp(Microsecond, 
Some("+00:00")), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: 
{"PARQUET:field_id": "1"} },
+            Field { name: "snapshot_id", data_type: Int64, nullable: false, 
dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "2"} },
+            Field { name: "parent_id", data_type: Int64, nullable: true, 
dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "3"} },
+            Field { name: "operation", data_type: Utf8, nullable: true, 
dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "4"} },
+            Field { name: "manifest_list", data_type: Utf8, nullable: true, 
dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "5"} },
+            Field { name: "summary", data_type: Map(Field { name: "key_value", 
data_type: Struct([Field { name: "key", data_type: Utf8, nullable: false, 
dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "7"} }, 
Field { name: "value", data_type: Utf8, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {"PARQUET:field_id": "8"} }]), nullable: 
false, dict_id: 0, dict_is_ordered: false, metadata: {} }, false), nullable: 
true, dict_id: 0, dict_is_ordered: false, me [...]
+        expect![[r#"
+            committed_at: PrimitiveArray<Timestamp(Microsecond, 
Some("+00:00"))>
+            [
+            ],
+            snapshot_id: PrimitiveArray<Int64>
+            [
+            ],
+            parent_id: PrimitiveArray<Int64>
+            [
+            ],
+            operation: StringArray
+            [
+            ],
+            manifest_list: StringArray
+            [
+            ],
+            summary: MapArray
+            [
+            ]"#]],
+        &[],
+        None,
+    );
+
+    let manifests = ctx
+        .sql("select * from catalog.ns.t1$manifests")
+        .await
+        .unwrap()
+        .collect()
+        .await
+        .unwrap();
+    check_record_batches(
+        manifests,
+        expect![[r#"
+            Field { name: "content", data_type: Int32, nullable: false, 
dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "14"} },
+            Field { name: "path", data_type: Utf8, nullable: false, dict_id: 
0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1"} },
+            Field { name: "length", data_type: Int64, nullable: false, 
dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "2"} },
+            Field { name: "partition_spec_id", data_type: Int32, nullable: 
false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "3"} 
},
+            Field { name: "added_snapshot_id", data_type: Int64, nullable: 
false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "4"} 
},
+            Field { name: "added_data_files_count", data_type: Int32, 
nullable: false, dict_id: 0, dict_is_ordered: false, metadata: 
{"PARQUET:field_id": "5"} },
+            Field { name: "existing_data_files_count", data_type: Int32, 
nullable: false, dict_id: 0, dict_is_ordered: false, metadata: 
{"PARQUET:field_id": "6"} },
+            Field { name: "deleted_data_files_count", data_type: Int32, 
nullable: false, dict_id: 0, dict_is_ordered: false, metadata: 
{"PARQUET:field_id": "7"} },
+            Field { name: "added_delete_files_count", data_type: Int32, 
nullable: false, dict_id: 0, dict_is_ordered: false, metadata: 
{"PARQUET:field_id": "15"} },
+            Field { name: "existing_delete_files_count", data_type: Int32, 
nullable: false, dict_id: 0, dict_is_ordered: false, metadata: 
{"PARQUET:field_id": "16"} },
+            Field { name: "deleted_delete_files_count", data_type: Int32, 
nullable: false, dict_id: 0, dict_is_ordered: false, metadata: 
{"PARQUET:field_id": "17"} },
+            Field { name: "partition_summaries", data_type: List(Field { name: 
"item", data_type: Struct([Field { name: "contains_null", data_type: Boolean, 
nullable: false, dict_id: 0, dict_is_ordered: false, metadata: 
{"PARQUET:field_id": "10"} }, Field { name: "contains_nan", data_type: Boolean, 
nullable: true, dict_id: 0, dict_is_ordered: false, metadata: 
{"PARQUET:field_id": "11"} }, Field { name: "lower_bound", data_type: Utf8, 
nullable: true, dict_id: 0, dict_is_ordered: false, me [...]
+        expect![[r#"
+            content: PrimitiveArray<Int32>
+            [
+            ],
+            path: StringArray
+            [
+            ],
+            length: PrimitiveArray<Int64>
+            [
+            ],
+            partition_spec_id: PrimitiveArray<Int32>
+            [
+            ],
+            added_snapshot_id: PrimitiveArray<Int64>
+            [
+            ],
+            added_data_files_count: PrimitiveArray<Int32>
+            [
+            ],
+            existing_data_files_count: PrimitiveArray<Int32>
+            [
+            ],
+            deleted_data_files_count: PrimitiveArray<Int32>
+            [
+            ],
+            added_delete_files_count: PrimitiveArray<Int32>
+            [
+            ],
+            existing_delete_files_count: PrimitiveArray<Int32>
+            [
+            ],
+            deleted_delete_files_count: PrimitiveArray<Int32>
+            [
+            ],
+            partition_summaries: ListArray
+            [
+            ]"#]],
+        &[],
+        None,
+    );
+
+    Ok(())
+}


Reply via email to