This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new bbd042d Basic Integration with Datafusion (#324)
bbd042d is described below
commit bbd042da5835b3e2e8ce7c05d45377c367464efd
Author: Marvin Lanhenke <[email protected]>
AuthorDate: Thu May 2 05:33:02 2024 +0200
Basic Integration with Datafusion (#324)
* chore: basic structure
* feat: add IcebergCatalogProvider
* feat: add IcebergSchemaProvider
* feat: add IcebergTableProvider
* chore: add integration test infr
* fix: remove old test
* chore: update crate structure
* fix: remove workspace dep
* refactor: use try_join_all
* chore: remove feature flag
* chore: rename package
* chore: update readme
* feat: add TableType
* fix: import + async_trait
* fix: imports + async_trait
* chore: remove feature flag
* fix: cargo sort
* refactor: CatalogProvider `fn try_new`
* refactor: SchemaProvider `fn try_new`
* chore: update docs
* chore: update docs
* chore: update doc
* feat: impl `fn schema` on TableProvider
* chore: rename ArrowSchema
* refactor: remove DashMap
* feat: add basic IcebergTableScan
* chore: fix docs
* chore: add comments
* fix: clippy
* fix: typo
* fix: license
* chore: update docs
* chore: move derive stmt
* fix: collect into hashmap
* chore: use DFResult
* Update crates/integrations/datafusion/README.md
Co-authored-by: Liang-Chi Hsieh <[email protected]>
---------
Co-authored-by: Renjie Liu <[email protected]>
Co-authored-by: Liang-Chi Hsieh <[email protected]>
---
Cargo.toml | 2 +
crates/iceberg/src/table.rs | 2 +-
crates/integrations/datafusion/Cargo.toml | 43 +++++
crates/integrations/datafusion/README.md | 22 +++
crates/integrations/datafusion/src/catalog.rs | 95 ++++++++++
crates/integrations/datafusion/src/error.rs | 32 ++++
crates/integrations/datafusion/src/lib.rs | 26 +++
.../datafusion/src/physical_plan/mod.rs | 18 ++
.../datafusion/src/physical_plan/scan.rs | 136 +++++++++++++++
crates/integrations/datafusion/src/schema.rs | 98 +++++++++++
crates/integrations/datafusion/src/table.rs | 88 ++++++++++
.../datafusion/testdata/docker-compose.yaml | 50 ++++++
.../datafusion/testdata/hms_catalog/Dockerfile | 34 ++++
.../datafusion/testdata/hms_catalog/core-site.xml | 51 ++++++
.../tests/integration_datafusion_hms_test.rs | 193 +++++++++++++++++++++
15 files changed, 889 insertions(+), 1 deletion(-)
diff --git a/Cargo.toml b/Cargo.toml
index 3c2923d..d1894c1 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -21,6 +21,7 @@ members = [
"crates/catalog/*",
"crates/examples",
"crates/iceberg",
+ "crates/integrations/*",
"crates/test_utils",
]
@@ -56,6 +57,7 @@ fnv = "1"
futures = "0.3"
iceberg = { version = "0.2.0", path = "./crates/iceberg" }
iceberg-catalog-rest = { version = "0.2.0", path = "./crates/catalog/rest" }
+iceberg-catalog-hms = { version = "0.2.0", path = "./crates/catalog/hms" }
itertools = "0.12"
lazy_static = "1"
log = "^0.4"
diff --git a/crates/iceberg/src/table.rs b/crates/iceberg/src/table.rs
index 839969d..f38d771 100644
--- a/crates/iceberg/src/table.rs
+++ b/crates/iceberg/src/table.rs
@@ -25,7 +25,7 @@ use futures::AsyncReadExt;
use typed_builder::TypedBuilder;
/// Table represents a table in the catalog.
-#[derive(TypedBuilder, Debug)]
+#[derive(TypedBuilder, Debug, Clone)]
pub struct Table {
file_io: FileIO,
#[builder(default, setter(strip_option, into))]
diff --git a/crates/integrations/datafusion/Cargo.toml
b/crates/integrations/datafusion/Cargo.toml
new file mode 100644
index 0000000..9f895ab
--- /dev/null
+++ b/crates/integrations/datafusion/Cargo.toml
@@ -0,0 +1,43 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[package]
+name = "iceberg-datafusion"
+version = { workspace = true }
+edition = { workspace = true }
+homepage = { workspace = true }
+rust-version = { workspace = true }
+
+categories = ["database"]
+description = "Apache Iceberg Datafusion Integration"
+repository = { workspace = true }
+license = { workspace = true }
+keywords = ["iceberg", "integrations", "datafusion"]
+
+[dependencies]
+anyhow = { workspace = true }
+async-trait = { workspace = true }
+datafusion = { version = "37.0.0" }
+futures = { workspace = true }
+iceberg = { workspace = true }
+log = { workspace = true }
+tokio = { workspace = true }
+
+[dev-dependencies]
+iceberg-catalog-hms = { workspace = true }
+iceberg_test_utils = { path = "../../test_utils", features = ["tests"] }
+port_scanner = { workspace = true }
diff --git a/crates/integrations/datafusion/README.md
b/crates/integrations/datafusion/README.md
new file mode 100644
index 0000000..134a8ef
--- /dev/null
+++ b/crates/integrations/datafusion/README.md
@@ -0,0 +1,22 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+-->
+
+# Apache Iceberg DataFusion Integration
+
+This crate contains the integration of Apache DataFusion and Apache Iceberg.
diff --git a/crates/integrations/datafusion/src/catalog.rs
b/crates/integrations/datafusion/src/catalog.rs
new file mode 100644
index 0000000..deddde9
--- /dev/null
+++ b/crates/integrations/datafusion/src/catalog.rs
@@ -0,0 +1,95 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{any::Any, collections::HashMap, sync::Arc};
+
+use datafusion::catalog::{schema::SchemaProvider, CatalogProvider};
+use futures::future::try_join_all;
+use iceberg::{Catalog, NamespaceIdent, Result};
+
+use crate::schema::IcebergSchemaProvider;
+
+/// Provides an interface to manage and access multiple schemas
+/// within an Iceberg [`Catalog`].
+///
+/// Acts as a centralized catalog provider that aggregates
+/// multiple [`SchemaProvider`], each associated with distinct namespaces.
+pub struct IcebergCatalogProvider {
+ /// A `HashMap` where keys are namespace names
+ /// and values are dynamic references to objects implementing the
+ /// [`SchemaProvider`] trait.
+ schemas: HashMap<String, Arc<dyn SchemaProvider>>,
+}
+
+impl IcebergCatalogProvider {
+ /// Asynchronously tries to construct a new [`IcebergCatalogProvider`]
+ /// using the given client to fetch and initialize schema providers for
+ /// each namespace in the Iceberg [`Catalog`].
+ ///
+ /// This method retrieves the list of namespace names
+ /// attempts to create a schema provider for each namespace, and
+ /// collects these providers into a `HashMap`.
+ pub async fn try_new(client: Arc<dyn Catalog>) -> Result<Self> {
+ // TODO:
+ // Schemas and providers should be cached and evicted based on time
+ // As of right now; schemas might become stale.
+ let schema_names: Vec<_> = client
+ .list_namespaces(None)
+ .await?
+ .iter()
+ .flat_map(|ns| ns.as_ref().clone())
+ .collect();
+
+ let providers = try_join_all(
+ schema_names
+ .iter()
+ .map(|name| {
+ IcebergSchemaProvider::try_new(
+ client.clone(),
+ NamespaceIdent::new(name.clone()),
+ )
+ })
+ .collect::<Vec<_>>(),
+ )
+ .await?;
+
+ let schemas: HashMap<String, Arc<dyn SchemaProvider>> = schema_names
+ .into_iter()
+ .zip(providers.into_iter())
+ .map(|(name, provider)| {
+ let provider = Arc::new(provider) as Arc<dyn SchemaProvider>;
+ (name, provider)
+ })
+ .collect();
+
+ Ok(IcebergCatalogProvider { schemas })
+ }
+}
+
+impl CatalogProvider for IcebergCatalogProvider {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn schema_names(&self) -> Vec<String> {
+ self.schemas.keys().cloned().collect()
+ }
+
+ fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
+ self.schemas.get(name).cloned()
+ }
+}
diff --git a/crates/integrations/datafusion/src/error.rs
b/crates/integrations/datafusion/src/error.rs
new file mode 100644
index 0000000..273d92f
--- /dev/null
+++ b/crates/integrations/datafusion/src/error.rs
@@ -0,0 +1,32 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use anyhow::anyhow;
+use iceberg::{Error, ErrorKind};
+
+/// Converts a datafusion error into an iceberg error.
+pub fn from_datafusion_error(error: datafusion::error::DataFusionError) ->
Error {
+ Error::new(
+ ErrorKind::Unexpected,
+ "Operation failed for hitting datafusion error".to_string(),
+ )
+ .with_source(anyhow!("datafusion error: {:?}", error))
+}
+/// Converts an iceberg error into a datafusion error.
+pub fn to_datafusion_error(error: Error) -> datafusion::error::DataFusionError
{
+ datafusion::error::DataFusionError::External(error.into())
+}
diff --git a/crates/integrations/datafusion/src/lib.rs
b/crates/integrations/datafusion/src/lib.rs
new file mode 100644
index 0000000..c402901
--- /dev/null
+++ b/crates/integrations/datafusion/src/lib.rs
@@ -0,0 +1,26 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+mod catalog;
+pub use catalog::*;
+
+mod error;
+pub use error::*;
+
+mod physical_plan;
+mod schema;
+mod table;
diff --git a/crates/integrations/datafusion/src/physical_plan/mod.rs
b/crates/integrations/datafusion/src/physical_plan/mod.rs
new file mode 100644
index 0000000..5ae586a
--- /dev/null
+++ b/crates/integrations/datafusion/src/physical_plan/mod.rs
@@ -0,0 +1,18 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+pub(crate) mod scan;
diff --git a/crates/integrations/datafusion/src/physical_plan/scan.rs
b/crates/integrations/datafusion/src/physical_plan/scan.rs
new file mode 100644
index 0000000..cc01148
--- /dev/null
+++ b/crates/integrations/datafusion/src/physical_plan/scan.rs
@@ -0,0 +1,136 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{any::Any, pin::Pin, sync::Arc};
+
+use datafusion::error::Result as DFResult;
+
+use datafusion::{
+ arrow::{array::RecordBatch, datatypes::SchemaRef as ArrowSchemaRef},
+ execution::{SendableRecordBatchStream, TaskContext},
+ physical_expr::EquivalenceProperties,
+ physical_plan::{
+ stream::RecordBatchStreamAdapter, DisplayAs, ExecutionMode,
ExecutionPlan, Partitioning,
+ PlanProperties,
+ },
+};
+use futures::{Stream, TryStreamExt};
+use iceberg::table::Table;
+
+use crate::to_datafusion_error;
+
+/// Manages the scanning process of an Iceberg [`Table`], encapsulating the
+/// necessary details and computed properties required for execution planning.
+#[derive(Debug)]
+pub(crate) struct IcebergTableScan {
+ /// A table in the catalog.
+ table: Table,
+ /// A reference-counted arrow `Schema`.
+ schema: ArrowSchemaRef,
+ /// Stores certain, often expensive to compute,
+ /// plan properties used in query optimization.
+ plan_properties: PlanProperties,
+}
+
+impl IcebergTableScan {
+ /// Creates a new [`IcebergTableScan`] object.
+ pub(crate) fn new(table: Table, schema: ArrowSchemaRef) -> Self {
+ let plan_properties = Self::compute_properties(schema.clone());
+
+ Self {
+ table,
+ schema,
+ plan_properties,
+ }
+ }
+
+ /// Computes [`PlanProperties`] used in query optimization.
+ fn compute_properties(schema: ArrowSchemaRef) -> PlanProperties {
+ // TODO:
+ // This is more or less a placeholder, to be replaced
+ // once we support output-partitioning
+ PlanProperties::new(
+ EquivalenceProperties::new(schema),
+ Partitioning::UnknownPartitioning(1),
+ ExecutionMode::Bounded,
+ )
+ }
+}
+
+impl ExecutionPlan for IcebergTableScan {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+ vec![]
+ }
+
+ fn with_new_children(
+ self: Arc<Self>,
+ _children: Vec<Arc<dyn ExecutionPlan>>,
+ ) -> DFResult<Arc<dyn ExecutionPlan>> {
+ Ok(self)
+ }
+
+ fn properties(&self) -> &PlanProperties {
+ &self.plan_properties
+ }
+
+ fn execute(
+ &self,
+ _partition: usize,
+ _context: Arc<TaskContext>,
+ ) -> DFResult<SendableRecordBatchStream> {
+ let fut = get_batch_stream(self.table.clone());
+ let stream = futures::stream::once(fut).try_flatten();
+
+ Ok(Box::pin(RecordBatchStreamAdapter::new(
+ self.schema.clone(),
+ stream,
+ )))
+ }
+}
+
+impl DisplayAs for IcebergTableScan {
+ fn fmt_as(
+ &self,
+ _t: datafusion::physical_plan::DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ write!(f, "IcebergTableScan")
+ }
+}
+
+/// Asynchronously retrieves a stream of [`RecordBatch`] instances
+/// from a given table.
+///
+/// This function initializes a [`TableScan`], builds it,
+/// and then converts it into a stream of Arrow [`RecordBatch`]es.
+async fn get_batch_stream(
+ table: Table,
+) -> DFResult<Pin<Box<dyn Stream<Item = DFResult<RecordBatch>> + Send>>> {
+ let table_scan = table.scan().build().map_err(to_datafusion_error)?;
+
+ let stream = table_scan
+ .to_arrow()
+ .await
+ .map_err(to_datafusion_error)?
+ .map_err(to_datafusion_error);
+
+ Ok(Box::pin(stream))
+}
diff --git a/crates/integrations/datafusion/src/schema.rs
b/crates/integrations/datafusion/src/schema.rs
new file mode 100644
index 0000000..2ba6962
--- /dev/null
+++ b/crates/integrations/datafusion/src/schema.rs
@@ -0,0 +1,98 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{any::Any, collections::HashMap, sync::Arc};
+
+use async_trait::async_trait;
+use datafusion::error::Result as DFResult;
+use datafusion::{catalog::schema::SchemaProvider, datasource::TableProvider};
+use futures::future::try_join_all;
+use iceberg::{Catalog, NamespaceIdent, Result};
+
+use crate::table::IcebergTableProvider;
+
+/// Represents a [`SchemaProvider`] for the Iceberg [`Catalog`], managing
+/// access to table providers within a specific namespace.
+pub(crate) struct IcebergSchemaProvider {
+ /// A `HashMap` where keys are table names
+ /// and values are dynamic references to objects implementing the
+ /// [`TableProvider`] trait.
+ tables: HashMap<String, Arc<dyn TableProvider>>,
+}
+
+impl IcebergSchemaProvider {
+ /// Asynchronously tries to construct a new [`IcebergSchemaProvider`]
+ /// using the given client to fetch and initialize table providers for
+ /// the provided namespace in the Iceberg [`Catalog`].
+ ///
+ /// This method retrieves a list of table names
+ /// attempts to create a table provider for each table name, and
+ /// collects these providers into a `HashMap`.
+ pub(crate) async fn try_new(
+ client: Arc<dyn Catalog>,
+ namespace: NamespaceIdent,
+ ) -> Result<Self> {
+ // TODO:
+ // Tables and providers should be cached based on table_name
+ // if we have a cache miss; we update our internal cache & check again
+ // As of right now; tables might become stale.
+ let table_names: Vec<_> = client
+ .list_tables(&namespace)
+ .await?
+ .iter()
+ .map(|tbl| tbl.name().to_string())
+ .collect();
+
+ let providers = try_join_all(
+ table_names
+ .iter()
+ .map(|name| IcebergTableProvider::try_new(client.clone(),
namespace.clone(), name))
+ .collect::<Vec<_>>(),
+ )
+ .await?;
+
+ let tables: HashMap<String, Arc<dyn TableProvider>> = table_names
+ .into_iter()
+ .zip(providers.into_iter())
+ .map(|(name, provider)| {
+ let provider = Arc::new(provider) as Arc<dyn TableProvider>;
+ (name, provider)
+ })
+ .collect();
+
+ Ok(IcebergSchemaProvider { tables })
+ }
+}
+
+#[async_trait]
+impl SchemaProvider for IcebergSchemaProvider {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn table_names(&self) -> Vec<String> {
+ self.tables.keys().cloned().collect()
+ }
+
+ fn table_exist(&self, name: &str) -> bool {
+ self.tables.get(name).is_some()
+ }
+
+ async fn table(&self, name: &str) -> DFResult<Option<Arc<dyn
TableProvider>>> {
+ Ok(self.tables.get(name).cloned())
+ }
+}
diff --git a/crates/integrations/datafusion/src/table.rs
b/crates/integrations/datafusion/src/table.rs
new file mode 100644
index 0000000..46a15f6
--- /dev/null
+++ b/crates/integrations/datafusion/src/table.rs
@@ -0,0 +1,88 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{any::Any, sync::Arc};
+
+use async_trait::async_trait;
+use datafusion::error::Result as DFResult;
+use datafusion::{
+ arrow::datatypes::SchemaRef as ArrowSchemaRef,
+ datasource::{TableProvider, TableType},
+ execution::context,
+ logical_expr::Expr,
+ physical_plan::ExecutionPlan,
+};
+use iceberg::{
+ arrow::schema_to_arrow_schema, table::Table, Catalog, NamespaceIdent,
Result, TableIdent,
+};
+
+use crate::physical_plan::scan::IcebergTableScan;
+
+/// Represents a [`TableProvider`] for the Iceberg [`Catalog`],
+/// managing access to a [`Table`].
+pub(crate) struct IcebergTableProvider {
+ /// A table in the catalog.
+ table: Table,
+ /// A reference-counted arrow `Schema`.
+ schema: ArrowSchemaRef,
+}
+
+impl IcebergTableProvider {
+ /// Asynchronously tries to construct a new [`IcebergTableProvider`]
+ /// using the given client and table name to fetch an actual [`Table`]
+ /// in the provided namespace.
+ pub(crate) async fn try_new(
+ client: Arc<dyn Catalog>,
+ namespace: NamespaceIdent,
+ name: impl Into<String>,
+ ) -> Result<Self> {
+ let ident = TableIdent::new(namespace, name.into());
+ let table = client.load_table(&ident).await?;
+
+ let schema =
Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
+
+ Ok(IcebergTableProvider { table, schema })
+ }
+}
+
+#[async_trait]
+impl TableProvider for IcebergTableProvider {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn schema(&self) -> ArrowSchemaRef {
+ self.schema.clone()
+ }
+
+ fn table_type(&self) -> TableType {
+ TableType::Base
+ }
+
+ async fn scan(
+ &self,
+ _state: &context::SessionState,
+ _projection: Option<&Vec<usize>>,
+ _filters: &[Expr],
+ _limit: Option<usize>,
+ ) -> DFResult<Arc<dyn ExecutionPlan>> {
+ Ok(Arc::new(IcebergTableScan::new(
+ self.table.clone(),
+ self.schema.clone(),
+ )))
+ }
+}
diff --git a/crates/integrations/datafusion/testdata/docker-compose.yaml
b/crates/integrations/datafusion/testdata/docker-compose.yaml
new file mode 100644
index 0000000..282dc66
--- /dev/null
+++ b/crates/integrations/datafusion/testdata/docker-compose.yaml
@@ -0,0 +1,50 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+version: '3.8'
+
+services:
+ minio:
+ image: minio/minio:RELEASE.2024-03-07T00-43-48Z
+ expose:
+ - 9000
+ - 9001
+ environment:
+ - MINIO_ROOT_USER=admin
+ - MINIO_ROOT_PASSWORD=password
+ - MINIO_DOMAIN=minio
+ command: [ "server", "/data", "--console-address", ":9001" ]
+
+ mc:
+ depends_on:
+ - minio
+ image: minio/mc:RELEASE.2024-03-07T00-31-49Z
+ environment:
+ - AWS_ACCESS_KEY_ID=admin
+ - AWS_SECRET_ACCESS_KEY=password
+ - AWS_REGION=us-east-1
+ entrypoint: >
+ /bin/sh -c " until (/usr/bin/mc config host add minio http://minio:9000
admin password) do echo '...waiting...' && sleep 1; done; /usr/bin/mc mb
minio/warehouse; /usr/bin/mc policy set public minio/warehouse; tail -f
/dev/null "
+
+ hive-metastore:
+ image: iceberg-hive-metastore
+ build: ./hms_catalog/
+ expose:
+ - 9083
+ environment:
+ SERVICE_NAME: "metastore"
+ SERVICE_OPTS: "-Dmetastore.warehouse.dir=s3a://warehouse/hive/"
diff --git a/crates/integrations/datafusion/testdata/hms_catalog/Dockerfile
b/crates/integrations/datafusion/testdata/hms_catalog/Dockerfile
new file mode 100644
index 0000000..ff8c9fa
--- /dev/null
+++ b/crates/integrations/datafusion/testdata/hms_catalog/Dockerfile
@@ -0,0 +1,34 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+FROM openjdk:8-jre-slim AS build
+
+RUN apt-get update -qq && apt-get -qq -y install curl
+
+ENV AWSSDK_VERSION=2.20.18
+ENV HADOOP_VERSION=3.1.0
+
+RUN curl
https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.11.271/aws-java-sdk-bundle-1.11.271.jar
-Lo /tmp/aws-java-sdk-bundle-1.11.271.jar
+RUN curl
https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/${HADOOP_VERSION}/hadoop-aws-${HADOOP_VERSION}.jar
-Lo /tmp/hadoop-aws-${HADOOP_VERSION}.jar
+
+
+FROM apache/hive:3.1.3
+
+ENV AWSSDK_VERSION=2.20.18
+ENV HADOOP_VERSION=3.1.0
+
+COPY --from=build /tmp/hadoop-aws-${HADOOP_VERSION}.jar
/opt/hive/lib/hadoop-aws-${HADOOP_VERSION}.jar
+COPY --from=build /tmp/aws-java-sdk-bundle-1.11.271.jar
/opt/hive/lib/aws-java-sdk-bundle-1.11.271.jar
+COPY core-site.xml /opt/hadoop/etc/hadoop/core-site.xml
\ No newline at end of file
diff --git a/crates/integrations/datafusion/testdata/hms_catalog/core-site.xml
b/crates/integrations/datafusion/testdata/hms_catalog/core-site.xml
new file mode 100644
index 0000000..f0583a0
--- /dev/null
+++ b/crates/integrations/datafusion/testdata/hms_catalog/core-site.xml
@@ -0,0 +1,51 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<configuration>
+ <property>
+ <name>fs.defaultFS</name>
+ <value>s3a://warehouse/hive</value>
+ </property>
+ <property>
+ <name>fs.s3a.impl</name>
+ <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
+ </property>
+ <property>
+ <name>fs.s3a.fast.upload</name>
+ <value>true</value>
+ </property>
+ <property>
+ <name>fs.s3a.endpoint</name>
+ <value>http://minio:9000</value>
+ </property>
+ <property>
+ <name>fs.s3a.access.key</name>
+ <value>admin</value>
+ </property>
+ <property>
+ <name>fs.s3a.secret.key</name>
+ <value>password</value>
+ </property>
+ <property>
+ <name>fs.s3a.connection.ssl.enabled</name>
+ <value>false</value>
+ </property>
+ <property>
+ <name>fs.s3a.path.style.access</name>
+ <value>true</value>
+ </property>
+</configuration>
\ No newline at end of file
diff --git
a/crates/integrations/datafusion/tests/integration_datafusion_hms_test.rs
b/crates/integrations/datafusion/tests/integration_datafusion_hms_test.rs
new file mode 100644
index 0000000..20c5cc8
--- /dev/null
+++ b/crates/integrations/datafusion/tests/integration_datafusion_hms_test.rs
@@ -0,0 +1,193 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Integration tests for Iceberg Datafusion with Hive Metastore.
+
+use std::collections::HashMap;
+use std::sync::Arc;
+
+use datafusion::arrow::datatypes::DataType;
+use datafusion::execution::context::SessionContext;
+use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION,
S3_SECRET_ACCESS_KEY};
+use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
+use iceberg::{Catalog, NamespaceIdent, Result, TableCreation};
+use iceberg_catalog_hms::{HmsCatalog, HmsCatalogConfig, HmsThriftTransport};
+use iceberg_datafusion::IcebergCatalogProvider;
+use iceberg_test_utils::docker::DockerCompose;
+use iceberg_test_utils::{normalize_test_name, set_up};
+use port_scanner::scan_port_addr;
+use tokio::time::sleep;
+
+const HMS_CATALOG_PORT: u16 = 9083;
+const MINIO_PORT: u16 = 9000;
+
+struct TestFixture {
+ _docker_compose: DockerCompose,
+ hms_catalog: HmsCatalog,
+}
+
+async fn set_test_fixture(func: &str) -> TestFixture {
+ set_up();
+
+ let docker_compose = DockerCompose::new(
+ normalize_test_name(format!("{}_{func}", module_path!())),
+ format!("{}/testdata", env!("CARGO_MANIFEST_DIR")),
+ );
+
+ docker_compose.run();
+
+ let hms_catalog_ip = docker_compose.get_container_ip("hive-metastore");
+ let minio_ip = docker_compose.get_container_ip("minio");
+
+ let read_port = format!("{}:{}", hms_catalog_ip, HMS_CATALOG_PORT);
+ loop {
+ if !scan_port_addr(&read_port) {
+ log::info!("Waiting for 1s hms catalog to ready...");
+ sleep(std::time::Duration::from_millis(1000)).await;
+ } else {
+ break;
+ }
+ }
+
+ let props = HashMap::from([
+ (
+ S3_ENDPOINT.to_string(),
+ format!("http://{}:{}", minio_ip, MINIO_PORT),
+ ),
+ (S3_ACCESS_KEY_ID.to_string(), "admin".to_string()),
+ (S3_SECRET_ACCESS_KEY.to_string(), "password".to_string()),
+ (S3_REGION.to_string(), "us-east-1".to_string()),
+ ]);
+
+ let config = HmsCatalogConfig::builder()
+ .address(format!("{}:{}", hms_catalog_ip, HMS_CATALOG_PORT))
+ .thrift_transport(HmsThriftTransport::Buffered)
+ .warehouse("s3a://warehouse/hive".to_string())
+ .props(props)
+ .build();
+
+ let hms_catalog = HmsCatalog::new(config).unwrap();
+
+ TestFixture {
+ _docker_compose: docker_compose,
+ hms_catalog,
+ }
+}
+
+fn set_table_creation(location: impl ToString, name: impl ToString) ->
Result<TableCreation> {
+ let schema = Schema::builder()
+ .with_schema_id(0)
+ .with_fields(vec![
+ NestedField::required(1, "foo",
Type::Primitive(PrimitiveType::Int)).into(),
+ NestedField::required(2, "bar",
Type::Primitive(PrimitiveType::String)).into(),
+ ])
+ .build()?;
+
+ let creation = TableCreation::builder()
+ .location(location.to_string())
+ .name(name.to_string())
+ .properties(HashMap::new())
+ .schema(schema)
+ .build();
+
+ Ok(creation)
+}
+
+#[tokio::test]
+async fn test_provider_get_table_schema() -> Result<()> {
+ let fixture = set_test_fixture("test_provider_get_table_schema").await;
+
+ let namespace = NamespaceIdent::new("default".to_string());
+ let creation = set_table_creation("s3a://warehouse/hive", "my_table")?;
+
+ fixture
+ .hms_catalog
+ .create_table(&namespace, creation)
+ .await?;
+
+ let client = Arc::new(fixture.hms_catalog);
+ let catalog = Arc::new(IcebergCatalogProvider::try_new(client).await?);
+
+ let ctx = SessionContext::new();
+ ctx.register_catalog("hive", catalog);
+
+ let provider = ctx.catalog("hive").unwrap();
+ let schema = provider.schema("default").unwrap();
+
+ let table = schema.table("my_table").await.unwrap().unwrap();
+ let table_schema = table.schema();
+
+ let expected = [("foo", &DataType::Int32), ("bar", &DataType::Utf8)];
+
+ for (field, exp) in table_schema.fields().iter().zip(expected.iter()) {
+ assert_eq!(field.name(), exp.0);
+ assert_eq!(field.data_type(), exp.1);
+ assert!(!field.is_nullable())
+ }
+
+ Ok(())
+}
+
+#[tokio::test]
+async fn test_provider_list_table_names() -> Result<()> {
+ let fixture = set_test_fixture("test_provider_list_table_names").await;
+
+ let namespace = NamespaceIdent::new("default".to_string());
+ let creation = set_table_creation("s3a://warehouse/hive", "my_table")?;
+
+ fixture
+ .hms_catalog
+ .create_table(&namespace, creation)
+ .await?;
+
+ let client = Arc::new(fixture.hms_catalog);
+ let catalog = Arc::new(IcebergCatalogProvider::try_new(client).await?);
+
+ let ctx = SessionContext::new();
+ ctx.register_catalog("hive", catalog);
+
+ let provider = ctx.catalog("hive").unwrap();
+ let schema = provider.schema("default").unwrap();
+
+ let expected = vec!["my_table"];
+ let result = schema.table_names();
+
+ assert_eq!(result, expected);
+
+ Ok(())
+}
+
+#[tokio::test]
+async fn test_provider_list_schema_names() -> Result<()> {
+ let fixture = set_test_fixture("test_provider_list_schema_names").await;
+ set_table_creation("default", "my_table")?;
+
+ let client = Arc::new(fixture.hms_catalog);
+ let catalog = Arc::new(IcebergCatalogProvider::try_new(client).await?);
+
+ let ctx = SessionContext::new();
+ ctx.register_catalog("hive", catalog);
+
+ let provider = ctx.catalog("hive").unwrap();
+
+ let expected = vec!["default"];
+ let result = provider.schema_names();
+
+ assert_eq!(result, expected);
+
+ Ok(())
+}