This is an automated email from the ASF dual-hosted git repository.

liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new bbd042d  Basic Integration with Datafusion (#324)
bbd042d is described below

commit bbd042da5835b3e2e8ce7c05d45377c367464efd
Author: Marvin Lanhenke <[email protected]>
AuthorDate: Thu May 2 05:33:02 2024 +0200

    Basic Integration with Datafusion (#324)
    
    * chore: basic structure
    
    * feat: add IcebergCatalogProvider
    
    * feat: add IcebergSchemaProvider
    
    * feat: add IcebergTableProvider
    
    * chore: add integration test infr
    
    * fix: remove old test
    
    * chore: update crate structure
    
    * fix: remove workspace dep
    
    * refactor: use try_join_all
    
    * chore: remove feature flag
    
    * chore: rename package
    
    * chore: update readme
    
    * feat: add TableType
    
    * fix: import + async_trait
    
    * fix: imports + async_trait
    
    * chore: remove feature flag
    
    * fix: cargo sort
    
    * refactor: CatalogProvider `fn try_new`
    
    * refactor: SchemaProvider `fn try_new`
    
    * chore: update docs
    
    * chore: update docs
    
    * chore: update doc
    
    * feat: impl `fn schema` on TableProvider
    
    * chore: rename ArrowSchema
    
    * refactor: remove DashMap
    
    * feat: add basic IcebergTableScan
    
    * chore: fix docs
    
    * chore: add comments
    
    * fix: clippy
    
    * fix: typo
    
    * fix: license
    
    * chore: update docs
    
    * chore: move derive stmt
    
    * fix: collect into hashmap
    
    * chore: use DFResult
    
    * Update crates/integrations/datafusion/README.md
    
    Co-authored-by: Liang-Chi Hsieh <[email protected]>
    
    ---------
    
    Co-authored-by: Renjie Liu <[email protected]>
    Co-authored-by: Liang-Chi Hsieh <[email protected]>
---
 Cargo.toml                                         |   2 +
 crates/iceberg/src/table.rs                        |   2 +-
 crates/integrations/datafusion/Cargo.toml          |  43 +++++
 crates/integrations/datafusion/README.md           |  22 +++
 crates/integrations/datafusion/src/catalog.rs      |  95 ++++++++++
 crates/integrations/datafusion/src/error.rs        |  32 ++++
 crates/integrations/datafusion/src/lib.rs          |  26 +++
 .../datafusion/src/physical_plan/mod.rs            |  18 ++
 .../datafusion/src/physical_plan/scan.rs           | 136 +++++++++++++++
 crates/integrations/datafusion/src/schema.rs       |  98 +++++++++++
 crates/integrations/datafusion/src/table.rs        |  88 ++++++++++
 .../datafusion/testdata/docker-compose.yaml        |  50 ++++++
 .../datafusion/testdata/hms_catalog/Dockerfile     |  34 ++++
 .../datafusion/testdata/hms_catalog/core-site.xml  |  51 ++++++
 .../tests/integration_datafusion_hms_test.rs       | 193 +++++++++++++++++++++
 15 files changed, 889 insertions(+), 1 deletion(-)

diff --git a/Cargo.toml b/Cargo.toml
index 3c2923d..d1894c1 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -21,6 +21,7 @@ members = [
     "crates/catalog/*",
     "crates/examples",
     "crates/iceberg",
+    "crates/integrations/*",
     "crates/test_utils",
 ]
 
@@ -56,6 +57,7 @@ fnv = "1"
 futures = "0.3"
 iceberg = { version = "0.2.0", path = "./crates/iceberg" }
 iceberg-catalog-rest = { version = "0.2.0", path = "./crates/catalog/rest" }
+iceberg-catalog-hms = { version = "0.2.0", path = "./crates/catalog/hms" }
 itertools = "0.12"
 lazy_static = "1"
 log = "^0.4"
diff --git a/crates/iceberg/src/table.rs b/crates/iceberg/src/table.rs
index 839969d..f38d771 100644
--- a/crates/iceberg/src/table.rs
+++ b/crates/iceberg/src/table.rs
@@ -25,7 +25,7 @@ use futures::AsyncReadExt;
 use typed_builder::TypedBuilder;
 
 /// Table represents a table in the catalog.
-#[derive(TypedBuilder, Debug)]
+#[derive(TypedBuilder, Debug, Clone)]
 pub struct Table {
     file_io: FileIO,
     #[builder(default, setter(strip_option, into))]
diff --git a/crates/integrations/datafusion/Cargo.toml 
b/crates/integrations/datafusion/Cargo.toml
new file mode 100644
index 0000000..9f895ab
--- /dev/null
+++ b/crates/integrations/datafusion/Cargo.toml
@@ -0,0 +1,43 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[package]
+name = "iceberg-datafusion"
+version = { workspace = true }
+edition = { workspace = true }
+homepage = { workspace = true }
+rust-version = { workspace = true }
+
+categories = ["database"]
+description = "Apache Iceberg Datafusion Integration"
+repository = { workspace = true }
+license = { workspace = true }
+keywords = ["iceberg", "integrations", "datafusion"]
+
+[dependencies]
+anyhow = { workspace = true }
+async-trait = { workspace = true }
+datafusion = { version = "37.0.0" }
+futures = { workspace = true }
+iceberg = { workspace = true }
+log = { workspace = true }
+tokio = { workspace = true }
+
+[dev-dependencies]
+iceberg-catalog-hms = { workspace = true }
+iceberg_test_utils = { path = "../../test_utils", features = ["tests"] }
+port_scanner = { workspace = true }
diff --git a/crates/integrations/datafusion/README.md 
b/crates/integrations/datafusion/README.md
new file mode 100644
index 0000000..134a8ef
--- /dev/null
+++ b/crates/integrations/datafusion/README.md
@@ -0,0 +1,22 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+-->
+
+# Apache Iceberg DataFusion Integration
+
+This crate contains the integration of Apache DataFusion and Apache Iceberg.
diff --git a/crates/integrations/datafusion/src/catalog.rs 
b/crates/integrations/datafusion/src/catalog.rs
new file mode 100644
index 0000000..deddde9
--- /dev/null
+++ b/crates/integrations/datafusion/src/catalog.rs
@@ -0,0 +1,95 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{any::Any, collections::HashMap, sync::Arc};
+
+use datafusion::catalog::{schema::SchemaProvider, CatalogProvider};
+use futures::future::try_join_all;
+use iceberg::{Catalog, NamespaceIdent, Result};
+
+use crate::schema::IcebergSchemaProvider;
+
+/// Provides an interface to manage and access multiple schemas
+/// within an Iceberg [`Catalog`].
+///
+/// Acts as a centralized catalog provider that aggregates
+/// multiple [`SchemaProvider`], each associated with distinct namespaces.
+pub struct IcebergCatalogProvider {
+    /// A `HashMap` where keys are namespace names
+    /// and values are dynamic references to objects implementing the
+    /// [`SchemaProvider`] trait.
+    schemas: HashMap<String, Arc<dyn SchemaProvider>>,
+}
+
+impl IcebergCatalogProvider {
+    /// Asynchronously tries to construct a new [`IcebergCatalogProvider`]
+    /// using the given client to fetch and initialize schema providers for
+    /// each namespace in the Iceberg [`Catalog`].
+    ///
+    /// This method retrieves the list of namespace names
+    /// attempts to create a schema provider for each namespace, and
+    /// collects these providers into a `HashMap`.
+    pub async fn try_new(client: Arc<dyn Catalog>) -> Result<Self> {
+        // TODO:
+        // Schemas and providers should be cached and evicted based on time
+        // As of right now; schemas might become stale.
+        let schema_names: Vec<_> = client
+            .list_namespaces(None)
+            .await?
+            .iter()
+            .flat_map(|ns| ns.as_ref().clone())
+            .collect();
+
+        let providers = try_join_all(
+            schema_names
+                .iter()
+                .map(|name| {
+                    IcebergSchemaProvider::try_new(
+                        client.clone(),
+                        NamespaceIdent::new(name.clone()),
+                    )
+                })
+                .collect::<Vec<_>>(),
+        )
+        .await?;
+
+        let schemas: HashMap<String, Arc<dyn SchemaProvider>> = schema_names
+            .into_iter()
+            .zip(providers.into_iter())
+            .map(|(name, provider)| {
+                let provider = Arc::new(provider) as Arc<dyn SchemaProvider>;
+                (name, provider)
+            })
+            .collect();
+
+        Ok(IcebergCatalogProvider { schemas })
+    }
+}
+
+impl CatalogProvider for IcebergCatalogProvider {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn schema_names(&self) -> Vec<String> {
+        self.schemas.keys().cloned().collect()
+    }
+
+    fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
+        self.schemas.get(name).cloned()
+    }
+}
diff --git a/crates/integrations/datafusion/src/error.rs 
b/crates/integrations/datafusion/src/error.rs
new file mode 100644
index 0000000..273d92f
--- /dev/null
+++ b/crates/integrations/datafusion/src/error.rs
@@ -0,0 +1,32 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use anyhow::anyhow;
+use iceberg::{Error, ErrorKind};
+
+/// Converts a datafusion error into an iceberg error.
+pub fn from_datafusion_error(error: datafusion::error::DataFusionError) -> 
Error {
+    Error::new(
+        ErrorKind::Unexpected,
+        "Operation failed for hitting datafusion error".to_string(),
+    )
+    .with_source(anyhow!("datafusion error: {:?}", error))
+}
+/// Converts an iceberg error into a datafusion error.
+pub fn to_datafusion_error(error: Error) -> datafusion::error::DataFusionError 
{
+    datafusion::error::DataFusionError::External(error.into())
+}
diff --git a/crates/integrations/datafusion/src/lib.rs 
b/crates/integrations/datafusion/src/lib.rs
new file mode 100644
index 0000000..c402901
--- /dev/null
+++ b/crates/integrations/datafusion/src/lib.rs
@@ -0,0 +1,26 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+mod catalog;
+pub use catalog::*;
+
+mod error;
+pub use error::*;
+
+mod physical_plan;
+mod schema;
+mod table;
diff --git a/crates/integrations/datafusion/src/physical_plan/mod.rs 
b/crates/integrations/datafusion/src/physical_plan/mod.rs
new file mode 100644
index 0000000..5ae586a
--- /dev/null
+++ b/crates/integrations/datafusion/src/physical_plan/mod.rs
@@ -0,0 +1,18 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+pub(crate) mod scan;
diff --git a/crates/integrations/datafusion/src/physical_plan/scan.rs 
b/crates/integrations/datafusion/src/physical_plan/scan.rs
new file mode 100644
index 0000000..cc01148
--- /dev/null
+++ b/crates/integrations/datafusion/src/physical_plan/scan.rs
@@ -0,0 +1,136 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{any::Any, pin::Pin, sync::Arc};
+
+use datafusion::error::Result as DFResult;
+
+use datafusion::{
+    arrow::{array::RecordBatch, datatypes::SchemaRef as ArrowSchemaRef},
+    execution::{SendableRecordBatchStream, TaskContext},
+    physical_expr::EquivalenceProperties,
+    physical_plan::{
+        stream::RecordBatchStreamAdapter, DisplayAs, ExecutionMode, 
ExecutionPlan, Partitioning,
+        PlanProperties,
+    },
+};
+use futures::{Stream, TryStreamExt};
+use iceberg::table::Table;
+
+use crate::to_datafusion_error;
+
+/// Manages the scanning process of an Iceberg [`Table`], encapsulating the
+/// necessary details and computed properties required for execution planning.
+#[derive(Debug)]
+pub(crate) struct IcebergTableScan {
+    /// A table in the catalog.
+    table: Table,
+    /// A reference-counted arrow `Schema`.
+    schema: ArrowSchemaRef,
+    /// Stores certain, often expensive to compute,
+    /// plan properties used in query optimization.
+    plan_properties: PlanProperties,
+}
+
+impl IcebergTableScan {
+    /// Creates a new [`IcebergTableScan`] object.
+    pub(crate) fn new(table: Table, schema: ArrowSchemaRef) -> Self {
+        let plan_properties = Self::compute_properties(schema.clone());
+
+        Self {
+            table,
+            schema,
+            plan_properties,
+        }
+    }
+
+    /// Computes [`PlanProperties`] used in query optimization.
+    fn compute_properties(schema: ArrowSchemaRef) -> PlanProperties {
+        // TODO:
+        // This is more or less a placeholder, to be replaced
+        // once we support output-partitioning
+        PlanProperties::new(
+            EquivalenceProperties::new(schema),
+            Partitioning::UnknownPartitioning(1),
+            ExecutionMode::Bounded,
+        )
+    }
+}
+
+impl ExecutionPlan for IcebergTableScan {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+        vec![]
+    }
+
+    fn with_new_children(
+        self: Arc<Self>,
+        _children: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> DFResult<Arc<dyn ExecutionPlan>> {
+        Ok(self)
+    }
+
+    fn properties(&self) -> &PlanProperties {
+        &self.plan_properties
+    }
+
+    fn execute(
+        &self,
+        _partition: usize,
+        _context: Arc<TaskContext>,
+    ) -> DFResult<SendableRecordBatchStream> {
+        let fut = get_batch_stream(self.table.clone());
+        let stream = futures::stream::once(fut).try_flatten();
+
+        Ok(Box::pin(RecordBatchStreamAdapter::new(
+            self.schema.clone(),
+            stream,
+        )))
+    }
+}
+
+impl DisplayAs for IcebergTableScan {
+    fn fmt_as(
+        &self,
+        _t: datafusion::physical_plan::DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        write!(f, "IcebergTableScan")
+    }
+}
+
+/// Asynchronously retrieves a stream of [`RecordBatch`] instances
+/// from a given table.
+///
+/// This function initializes a [`TableScan`], builds it,
+/// and then converts it into a stream of Arrow [`RecordBatch`]es.
+async fn get_batch_stream(
+    table: Table,
+) -> DFResult<Pin<Box<dyn Stream<Item = DFResult<RecordBatch>> + Send>>> {
+    let table_scan = table.scan().build().map_err(to_datafusion_error)?;
+
+    let stream = table_scan
+        .to_arrow()
+        .await
+        .map_err(to_datafusion_error)?
+        .map_err(to_datafusion_error);
+
+    Ok(Box::pin(stream))
+}
diff --git a/crates/integrations/datafusion/src/schema.rs 
b/crates/integrations/datafusion/src/schema.rs
new file mode 100644
index 0000000..2ba6962
--- /dev/null
+++ b/crates/integrations/datafusion/src/schema.rs
@@ -0,0 +1,98 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{any::Any, collections::HashMap, sync::Arc};
+
+use async_trait::async_trait;
+use datafusion::error::Result as DFResult;
+use datafusion::{catalog::schema::SchemaProvider, datasource::TableProvider};
+use futures::future::try_join_all;
+use iceberg::{Catalog, NamespaceIdent, Result};
+
+use crate::table::IcebergTableProvider;
+
+/// Represents a [`SchemaProvider`] for the Iceberg [`Catalog`], managing
+/// access to table providers within a specific namespace.
+pub(crate) struct IcebergSchemaProvider {
+    /// A `HashMap` where keys are table names
+    /// and values are dynamic references to objects implementing the
+    /// [`TableProvider`] trait.
+    tables: HashMap<String, Arc<dyn TableProvider>>,
+}
+
+impl IcebergSchemaProvider {
+    /// Asynchronously tries to construct a new [`IcebergSchemaProvider`]
+    /// using the given client to fetch and initialize table providers for
+    /// the provided namespace in the Iceberg [`Catalog`].
+    ///
+    /// This method retrieves a list of table names
+    /// attempts to create a table provider for each table name, and
+    /// collects these providers into a `HashMap`.
+    pub(crate) async fn try_new(
+        client: Arc<dyn Catalog>,
+        namespace: NamespaceIdent,
+    ) -> Result<Self> {
+        // TODO:
+        // Tables and providers should be cached based on table_name
+        // if we have a cache miss; we update our internal cache & check again
+        // As of right now; tables might become stale.
+        let table_names: Vec<_> = client
+            .list_tables(&namespace)
+            .await?
+            .iter()
+            .map(|tbl| tbl.name().to_string())
+            .collect();
+
+        let providers = try_join_all(
+            table_names
+                .iter()
+                .map(|name| IcebergTableProvider::try_new(client.clone(), 
namespace.clone(), name))
+                .collect::<Vec<_>>(),
+        )
+        .await?;
+
+        let tables: HashMap<String, Arc<dyn TableProvider>> = table_names
+            .into_iter()
+            .zip(providers.into_iter())
+            .map(|(name, provider)| {
+                let provider = Arc::new(provider) as Arc<dyn TableProvider>;
+                (name, provider)
+            })
+            .collect();
+
+        Ok(IcebergSchemaProvider { tables })
+    }
+}
+
+#[async_trait]
+impl SchemaProvider for IcebergSchemaProvider {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn table_names(&self) -> Vec<String> {
+        self.tables.keys().cloned().collect()
+    }
+
+    fn table_exist(&self, name: &str) -> bool {
+        self.tables.get(name).is_some()
+    }
+
+    async fn table(&self, name: &str) -> DFResult<Option<Arc<dyn 
TableProvider>>> {
+        Ok(self.tables.get(name).cloned())
+    }
+}
diff --git a/crates/integrations/datafusion/src/table.rs 
b/crates/integrations/datafusion/src/table.rs
new file mode 100644
index 0000000..46a15f6
--- /dev/null
+++ b/crates/integrations/datafusion/src/table.rs
@@ -0,0 +1,88 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{any::Any, sync::Arc};
+
+use async_trait::async_trait;
+use datafusion::error::Result as DFResult;
+use datafusion::{
+    arrow::datatypes::SchemaRef as ArrowSchemaRef,
+    datasource::{TableProvider, TableType},
+    execution::context,
+    logical_expr::Expr,
+    physical_plan::ExecutionPlan,
+};
+use iceberg::{
+    arrow::schema_to_arrow_schema, table::Table, Catalog, NamespaceIdent, 
Result, TableIdent,
+};
+
+use crate::physical_plan::scan::IcebergTableScan;
+
+/// Represents a [`TableProvider`] for the Iceberg [`Catalog`],
+/// managing access to a [`Table`].
+pub(crate) struct IcebergTableProvider {
+    /// A table in the catalog.
+    table: Table,
+    /// A reference-counted arrow `Schema`.
+    schema: ArrowSchemaRef,
+}
+
+impl IcebergTableProvider {
+    /// Asynchronously tries to construct a new [`IcebergTableProvider`]
+    /// using the given client and table name to fetch an actual [`Table`]
+    /// in the provided namespace.
+    pub(crate) async fn try_new(
+        client: Arc<dyn Catalog>,
+        namespace: NamespaceIdent,
+        name: impl Into<String>,
+    ) -> Result<Self> {
+        let ident = TableIdent::new(namespace, name.into());
+        let table = client.load_table(&ident).await?;
+
+        let schema = 
Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
+
+        Ok(IcebergTableProvider { table, schema })
+    }
+}
+
+#[async_trait]
+impl TableProvider for IcebergTableProvider {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn schema(&self) -> ArrowSchemaRef {
+        self.schema.clone()
+    }
+
+    fn table_type(&self) -> TableType {
+        TableType::Base
+    }
+
+    async fn scan(
+        &self,
+        _state: &context::SessionState,
+        _projection: Option<&Vec<usize>>,
+        _filters: &[Expr],
+        _limit: Option<usize>,
+    ) -> DFResult<Arc<dyn ExecutionPlan>> {
+        Ok(Arc::new(IcebergTableScan::new(
+            self.table.clone(),
+            self.schema.clone(),
+        )))
+    }
+}
diff --git a/crates/integrations/datafusion/testdata/docker-compose.yaml 
b/crates/integrations/datafusion/testdata/docker-compose.yaml
new file mode 100644
index 0000000..282dc66
--- /dev/null
+++ b/crates/integrations/datafusion/testdata/docker-compose.yaml
@@ -0,0 +1,50 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+version: '3.8'
+
+services:
+  minio:
+    image: minio/minio:RELEASE.2024-03-07T00-43-48Z
+    expose:
+      - 9000
+      - 9001
+    environment:
+      - MINIO_ROOT_USER=admin
+      - MINIO_ROOT_PASSWORD=password
+      - MINIO_DOMAIN=minio
+    command: [ "server", "/data", "--console-address", ":9001" ]
+
+  mc:
+    depends_on:
+      - minio
+    image: minio/mc:RELEASE.2024-03-07T00-31-49Z
+    environment:
+      - AWS_ACCESS_KEY_ID=admin
+      - AWS_SECRET_ACCESS_KEY=password
+      - AWS_REGION=us-east-1
+    entrypoint: >
+      /bin/sh -c " until (/usr/bin/mc config host add minio http://minio:9000 
admin password) do echo '...waiting...' && sleep 1; done; /usr/bin/mc mb 
minio/warehouse; /usr/bin/mc policy set public minio/warehouse; tail -f 
/dev/null "
+
+  hive-metastore:
+    image: iceberg-hive-metastore
+    build: ./hms_catalog/
+    expose:
+      - 9083
+    environment:
+      SERVICE_NAME: "metastore"
+      SERVICE_OPTS: "-Dmetastore.warehouse.dir=s3a://warehouse/hive/"
diff --git a/crates/integrations/datafusion/testdata/hms_catalog/Dockerfile 
b/crates/integrations/datafusion/testdata/hms_catalog/Dockerfile
new file mode 100644
index 0000000..ff8c9fa
--- /dev/null
+++ b/crates/integrations/datafusion/testdata/hms_catalog/Dockerfile
@@ -0,0 +1,34 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+FROM openjdk:8-jre-slim AS build
+
+RUN apt-get update -qq && apt-get -qq -y install curl
+
+ENV AWSSDK_VERSION=2.20.18
+ENV HADOOP_VERSION=3.1.0
+
+RUN curl 
https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.11.271/aws-java-sdk-bundle-1.11.271.jar
 -Lo /tmp/aws-java-sdk-bundle-1.11.271.jar
+RUN curl 
https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/${HADOOP_VERSION}/hadoop-aws-${HADOOP_VERSION}.jar
 -Lo /tmp/hadoop-aws-${HADOOP_VERSION}.jar
+
+
+FROM apache/hive:3.1.3
+
+ENV AWSSDK_VERSION=2.20.18
+ENV HADOOP_VERSION=3.1.0
+
+COPY --from=build /tmp/hadoop-aws-${HADOOP_VERSION}.jar 
/opt/hive/lib/hadoop-aws-${HADOOP_VERSION}.jar
+COPY --from=build /tmp/aws-java-sdk-bundle-1.11.271.jar 
/opt/hive/lib/aws-java-sdk-bundle-1.11.271.jar
+COPY core-site.xml /opt/hadoop/etc/hadoop/core-site.xml
\ No newline at end of file
diff --git a/crates/integrations/datafusion/testdata/hms_catalog/core-site.xml 
b/crates/integrations/datafusion/testdata/hms_catalog/core-site.xml
new file mode 100644
index 0000000..f0583a0
--- /dev/null
+++ b/crates/integrations/datafusion/testdata/hms_catalog/core-site.xml
@@ -0,0 +1,51 @@
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<configuration>
+    <property>
+        <name>fs.defaultFS</name>
+        <value>s3a://warehouse/hive</value>
+    </property>
+    <property>
+        <name>fs.s3a.impl</name>
+        <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
+    </property>
+    <property>
+        <name>fs.s3a.fast.upload</name>
+        <value>true</value>
+    </property>
+    <property>
+      <name>fs.s3a.endpoint</name>
+      <value>http://minio:9000</value>
+    </property>
+    <property>
+      <name>fs.s3a.access.key</name>
+      <value>admin</value>
+    </property>
+    <property>
+      <name>fs.s3a.secret.key</name>
+      <value>password</value>
+    </property>
+    <property>
+      <name>fs.s3a.connection.ssl.enabled</name>
+      <value>false</value>
+    </property>
+    <property>
+      <name>fs.s3a.path.style.access</name>
+      <value>true</value>
+    </property>
+</configuration>
\ No newline at end of file
diff --git 
a/crates/integrations/datafusion/tests/integration_datafusion_hms_test.rs 
b/crates/integrations/datafusion/tests/integration_datafusion_hms_test.rs
new file mode 100644
index 0000000..20c5cc8
--- /dev/null
+++ b/crates/integrations/datafusion/tests/integration_datafusion_hms_test.rs
@@ -0,0 +1,193 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Integration tests for Iceberg Datafusion with Hive Metastore.
+
+use std::collections::HashMap;
+use std::sync::Arc;
+
+use datafusion::arrow::datatypes::DataType;
+use datafusion::execution::context::SessionContext;
+use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, 
S3_SECRET_ACCESS_KEY};
+use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
+use iceberg::{Catalog, NamespaceIdent, Result, TableCreation};
+use iceberg_catalog_hms::{HmsCatalog, HmsCatalogConfig, HmsThriftTransport};
+use iceberg_datafusion::IcebergCatalogProvider;
+use iceberg_test_utils::docker::DockerCompose;
+use iceberg_test_utils::{normalize_test_name, set_up};
+use port_scanner::scan_port_addr;
+use tokio::time::sleep;
+
+const HMS_CATALOG_PORT: u16 = 9083;
+const MINIO_PORT: u16 = 9000;
+
+struct TestFixture {
+    _docker_compose: DockerCompose,
+    hms_catalog: HmsCatalog,
+}
+
+async fn set_test_fixture(func: &str) -> TestFixture {
+    set_up();
+
+    let docker_compose = DockerCompose::new(
+        normalize_test_name(format!("{}_{func}", module_path!())),
+        format!("{}/testdata", env!("CARGO_MANIFEST_DIR")),
+    );
+
+    docker_compose.run();
+
+    let hms_catalog_ip = docker_compose.get_container_ip("hive-metastore");
+    let minio_ip = docker_compose.get_container_ip("minio");
+
+    let read_port = format!("{}:{}", hms_catalog_ip, HMS_CATALOG_PORT);
+    loop {
+        if !scan_port_addr(&read_port) {
+            log::info!("Waiting for 1s hms catalog to ready...");
+            sleep(std::time::Duration::from_millis(1000)).await;
+        } else {
+            break;
+        }
+    }
+
+    let props = HashMap::from([
+        (
+            S3_ENDPOINT.to_string(),
+            format!("http://{}:{}";, minio_ip, MINIO_PORT),
+        ),
+        (S3_ACCESS_KEY_ID.to_string(), "admin".to_string()),
+        (S3_SECRET_ACCESS_KEY.to_string(), "password".to_string()),
+        (S3_REGION.to_string(), "us-east-1".to_string()),
+    ]);
+
+    let config = HmsCatalogConfig::builder()
+        .address(format!("{}:{}", hms_catalog_ip, HMS_CATALOG_PORT))
+        .thrift_transport(HmsThriftTransport::Buffered)
+        .warehouse("s3a://warehouse/hive".to_string())
+        .props(props)
+        .build();
+
+    let hms_catalog = HmsCatalog::new(config).unwrap();
+
+    TestFixture {
+        _docker_compose: docker_compose,
+        hms_catalog,
+    }
+}
+
+fn set_table_creation(location: impl ToString, name: impl ToString) -> 
Result<TableCreation> {
+    let schema = Schema::builder()
+        .with_schema_id(0)
+        .with_fields(vec![
+            NestedField::required(1, "foo", 
Type::Primitive(PrimitiveType::Int)).into(),
+            NestedField::required(2, "bar", 
Type::Primitive(PrimitiveType::String)).into(),
+        ])
+        .build()?;
+
+    let creation = TableCreation::builder()
+        .location(location.to_string())
+        .name(name.to_string())
+        .properties(HashMap::new())
+        .schema(schema)
+        .build();
+
+    Ok(creation)
+}
+
+#[tokio::test]
+async fn test_provider_get_table_schema() -> Result<()> {
+    let fixture = set_test_fixture("test_provider_get_table_schema").await;
+
+    let namespace = NamespaceIdent::new("default".to_string());
+    let creation = set_table_creation("s3a://warehouse/hive", "my_table")?;
+
+    fixture
+        .hms_catalog
+        .create_table(&namespace, creation)
+        .await?;
+
+    let client = Arc::new(fixture.hms_catalog);
+    let catalog = Arc::new(IcebergCatalogProvider::try_new(client).await?);
+
+    let ctx = SessionContext::new();
+    ctx.register_catalog("hive", catalog);
+
+    let provider = ctx.catalog("hive").unwrap();
+    let schema = provider.schema("default").unwrap();
+
+    let table = schema.table("my_table").await.unwrap().unwrap();
+    let table_schema = table.schema();
+
+    let expected = [("foo", &DataType::Int32), ("bar", &DataType::Utf8)];
+
+    for (field, exp) in table_schema.fields().iter().zip(expected.iter()) {
+        assert_eq!(field.name(), exp.0);
+        assert_eq!(field.data_type(), exp.1);
+        assert!(!field.is_nullable())
+    }
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn test_provider_list_table_names() -> Result<()> {
+    let fixture = set_test_fixture("test_provider_list_table_names").await;
+
+    let namespace = NamespaceIdent::new("default".to_string());
+    let creation = set_table_creation("s3a://warehouse/hive", "my_table")?;
+
+    fixture
+        .hms_catalog
+        .create_table(&namespace, creation)
+        .await?;
+
+    let client = Arc::new(fixture.hms_catalog);
+    let catalog = Arc::new(IcebergCatalogProvider::try_new(client).await?);
+
+    let ctx = SessionContext::new();
+    ctx.register_catalog("hive", catalog);
+
+    let provider = ctx.catalog("hive").unwrap();
+    let schema = provider.schema("default").unwrap();
+
+    let expected = vec!["my_table"];
+    let result = schema.table_names();
+
+    assert_eq!(result, expected);
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn test_provider_list_schema_names() -> Result<()> {
+    let fixture = set_test_fixture("test_provider_list_schema_names").await;
+    set_table_creation("default", "my_table")?;
+
+    let client = Arc::new(fixture.hms_catalog);
+    let catalog = Arc::new(IcebergCatalogProvider::try_new(client).await?);
+
+    let ctx = SessionContext::new();
+    ctx.register_catalog("hive", catalog);
+
+    let provider = ctx.catalog("hive").unwrap();
+
+    let expected = vec!["default"];
+    let result = provider.schema_names();
+
+    assert_eq!(result, expected);
+
+    Ok(())
+}


Reply via email to