This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new c3c60d05 feat(catalog): implement catalog loader for glue (#1603)
c3c60d05 is described below
commit c3c60d055d1e6b28253a848986a818224425a43a
Author: Leon Lin <[email protected]>
AuthorDate: Tue Aug 19 03:25:29 2025 -0700
feat(catalog): implement catalog loader for glue (#1603)
## Which issue does this PR close?
- Closes [#1259](https://github.com/apache/iceberg-rust/issues/1259).
## What changes are included in this PR?
* Added `GlueCatalogBuilder`
* Implement `CatalogBuilder` trait for `GlueCatalogBuilder`
* Include glue in loader
## Are these changes tested?
Added in loader tests and updated glue integration tests
---
Cargo.lock | 1 +
Cargo.toml | 1 +
crates/catalog/glue/Cargo.toml | 1 -
crates/catalog/glue/src/catalog.rs | 92 +++++++++++++++++++++++---
crates/catalog/glue/src/lib.rs | 24 +++++++
crates/catalog/glue/tests/glue_catalog_test.rs | 28 +++++---
crates/catalog/loader/Cargo.toml | 5 +-
crates/catalog/loader/src/lib.rs | 27 +++++++-
8 files changed, 157 insertions(+), 22 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index dbe04015..0985c71f 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -3613,6 +3613,7 @@ version = "0.6.0"
dependencies = [
"async-trait",
"iceberg",
+ "iceberg-catalog-glue",
"iceberg-catalog-rest",
"tokio",
]
diff --git a/Cargo.toml b/Cargo.toml
index a585be7d..4f03a202 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -77,6 +77,7 @@ hive_metastore = "0.1"
http = "1.2"
iceberg = { version = "0.6.0", path = "./crates/iceberg" }
iceberg-catalog-rest = { version = "0.6.0", path = "./crates/catalog/rest" }
+iceberg-catalog-glue = { version = "0.6.0", path = "./crates/catalog/glue" }
iceberg-datafusion = { version = "0.6.0", path =
"./crates/integrations/datafusion" }
indicatif = "0.17"
itertools = "0.13"
diff --git a/crates/catalog/glue/Cargo.toml b/crates/catalog/glue/Cargo.toml
index 613160e4..b6126021 100644
--- a/crates/catalog/glue/Cargo.toml
+++ b/crates/catalog/glue/Cargo.toml
@@ -37,7 +37,6 @@ iceberg = { workspace = true }
serde_json = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
-typed-builder = { workspace = true }
[dev-dependencies]
ctor = { workspace = true }
diff --git a/crates/catalog/glue/src/catalog.rs
b/crates/catalog/glue/src/catalog.rs
index fb4bd36b..c7584b59 100644
--- a/crates/catalog/glue/src/catalog.rs
+++ b/crates/catalog/glue/src/catalog.rs
@@ -26,10 +26,9 @@ use iceberg::io::{
use iceberg::spec::{TableMetadata, TableMetadataBuilder};
use iceberg::table::Table;
use iceberg::{
- Catalog, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent,
Result, TableCommit,
- TableCreation, TableIdent,
+ Catalog, CatalogBuilder, Error, ErrorKind, MetadataLocation, Namespace,
NamespaceIdent, Result,
+ TableCommit, TableCreation, TableIdent,
};
-use typed_builder::TypedBuilder;
use crate::error::{from_aws_build_error, from_aws_sdk_error};
use crate::utils::{
@@ -40,15 +39,90 @@ use crate::{
AWS_ACCESS_KEY_ID, AWS_REGION_NAME, AWS_SECRET_ACCESS_KEY,
AWS_SESSION_TOKEN, with_catalog_id,
};
-#[derive(Debug, TypedBuilder)]
+/// Glue catalog URI
+pub const GLUE_CATALOG_PROP_URI: &str = "uri";
+/// Glue catalog id
+pub const GLUE_CATALOG_PROP_CATALOG_ID: &str = "catalog_id";
+/// Glue catalog warehouse location
+pub const GLUE_CATALOG_PROP_WAREHOUSE: &str = "warehouse";
+
+/// Builder for [`GlueCatalog`].
+#[derive(Debug)]
+pub struct GlueCatalogBuilder(GlueCatalogConfig);
+
+impl Default for GlueCatalogBuilder {
+ fn default() -> Self {
+ Self(GlueCatalogConfig {
+ name: None,
+ uri: None,
+ catalog_id: None,
+ warehouse: "".to_string(),
+ props: HashMap::new(),
+ })
+ }
+}
+
+impl CatalogBuilder for GlueCatalogBuilder {
+ type C = GlueCatalog;
+
+ fn load(
+ mut self,
+ name: impl Into<String>,
+ props: HashMap<String, String>,
+ ) -> impl Future<Output = Result<Self::C>> + Send {
+ self.0.name = Some(name.into());
+
+ if props.contains_key(GLUE_CATALOG_PROP_URI) {
+ self.0.uri = props.get(GLUE_CATALOG_PROP_URI).cloned()
+ }
+
+ if props.contains_key(GLUE_CATALOG_PROP_CATALOG_ID) {
+ self.0.catalog_id =
props.get(GLUE_CATALOG_PROP_CATALOG_ID).cloned()
+ }
+
+ if props.contains_key(GLUE_CATALOG_PROP_WAREHOUSE) {
+ self.0.warehouse = props
+ .get(GLUE_CATALOG_PROP_WAREHOUSE)
+ .cloned()
+ .unwrap_or_default();
+ }
+
+ // Collect other remaining properties
+ self.0.props = props
+ .into_iter()
+ .filter(|(k, _)| {
+ k != GLUE_CATALOG_PROP_URI
+ && k != GLUE_CATALOG_PROP_CATALOG_ID
+ && k != GLUE_CATALOG_PROP_WAREHOUSE
+ })
+ .collect();
+
+ async move {
+ if self.0.name.is_none() {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ "Catalog name is required",
+ ));
+ }
+ if self.0.warehouse.is_empty() {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ "Catalog warehouse is required",
+ ));
+ }
+
+ GlueCatalog::new(self.0).await
+ }
+ }
+}
+
+#[derive(Debug)]
/// Glue Catalog configuration
-pub struct GlueCatalogConfig {
- #[builder(default, setter(strip_option(fallback = uri_opt)))]
+pub(crate) struct GlueCatalogConfig {
+ name: Option<String>,
uri: Option<String>,
- #[builder(default, setter(strip_option(fallback = catalog_id_opt)))]
catalog_id: Option<String>,
warehouse: String,
- #[builder(default)]
props: HashMap<String, String>,
}
@@ -71,7 +145,7 @@ impl Debug for GlueCatalog {
impl GlueCatalog {
/// Create a new glue catalog
- pub async fn new(config: GlueCatalogConfig) -> Result<Self> {
+ async fn new(config: GlueCatalogConfig) -> Result<Self> {
let sdk_config = create_sdk_config(&config.props,
config.uri.as_ref()).await;
let mut file_io_props = config.props.clone();
if !file_io_props.contains_key(S3_ACCESS_KEY_ID) {
diff --git a/crates/catalog/glue/src/lib.rs b/crates/catalog/glue/src/lib.rs
index 23765733..1b9efe37 100644
--- a/crates/catalog/glue/src/lib.rs
+++ b/crates/catalog/glue/src/lib.rs
@@ -16,6 +16,30 @@
// under the License.
//! Iceberg Glue Catalog implementation.
+//!
+//! To build a glue catalog with configurations
+//! # Example
+//!
+//! ```rust, no_run
+//! use std::collections::HashMap;
+//!
+//! use iceberg::CatalogBuilder;
+//! use iceberg_catalog_glue::{GLUE_CATALOG_PROP_WAREHOUSE,
GlueCatalogBuilder};
+//!
+//! #[tokio::main]
+//! async fn main() {
+//! let catalog = GlueCatalogBuilder::default()
+//! .load(
+//! "glue",
+//! HashMap::from([(
+//! GLUE_CATALOG_PROP_WAREHOUSE.to_string(),
+//! "s3://warehouse".to_string(),
+//! )]),
+//! )
+//! .await
+//! .unwrap();
+//! }
+//! ```
#![deny(missing_docs)]
diff --git a/crates/catalog/glue/tests/glue_catalog_test.rs
b/crates/catalog/glue/tests/glue_catalog_test.rs
index 2f7b1052..14d894ae 100644
--- a/crates/catalog/glue/tests/glue_catalog_test.rs
+++ b/crates/catalog/glue/tests/glue_catalog_test.rs
@@ -24,9 +24,12 @@ use std::sync::RwLock;
use ctor::{ctor, dtor};
use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION,
S3_SECRET_ACCESS_KEY};
use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
-use iceberg::{Catalog, Namespace, NamespaceIdent, Result, TableCreation,
TableIdent};
+use iceberg::{
+ Catalog, CatalogBuilder, Namespace, NamespaceIdent, Result, TableCreation,
TableIdent,
+};
use iceberg_catalog_glue::{
- AWS_ACCESS_KEY_ID, AWS_REGION_NAME, AWS_SECRET_ACCESS_KEY, GlueCatalog,
GlueCatalogConfig,
+ AWS_ACCESS_KEY_ID, AWS_REGION_NAME, AWS_SECRET_ACCESS_KEY,
GLUE_CATALOG_PROP_URI,
+ GLUE_CATALOG_PROP_WAREHOUSE, GlueCatalog, GlueCatalogBuilder,
};
use iceberg_test_utils::docker::DockerCompose;
use iceberg_test_utils::{normalize_test_name, set_up};
@@ -112,13 +115,22 @@ async fn get_catalog() -> GlueCatalog {
retries += 1;
}
- let config = GlueCatalogConfig::builder()
- .uri(format!("http://{}", glue_socket_addr))
- .warehouse("s3a://warehouse/hive".to_string())
- .props(props.clone())
- .build();
+ let mut glue_props = HashMap::from([
+ (
+ GLUE_CATALOG_PROP_URI.to_string(),
+ format!("http://{}", glue_socket_addr),
+ ),
+ (
+ GLUE_CATALOG_PROP_WAREHOUSE.to_string(),
+ "s3a://warehouse/hive".to_string(),
+ ),
+ ]);
+ glue_props.extend(props.clone());
- GlueCatalog::new(config).await.unwrap()
+ GlueCatalogBuilder::default()
+ .load("glue", glue_props)
+ .await
+ .unwrap()
}
async fn set_test_namespace(catalog: &GlueCatalog, namespace: &NamespaceIdent)
-> Result<()> {
diff --git a/crates/catalog/loader/Cargo.toml b/crates/catalog/loader/Cargo.toml
index d29edad0..136847d9 100644
--- a/crates/catalog/loader/Cargo.toml
+++ b/crates/catalog/loader/Cargo.toml
@@ -30,6 +30,7 @@ repository = { workspace = true }
[dependencies]
iceberg = { workspace = true }
-iceberg-catalog-rest = {workspace = true}
+iceberg-catalog-rest = { workspace = true }
+iceberg-catalog-glue = { workspace = true }
tokio = { workspace = true }
-async-trait = {workspace = true}
+async-trait = { workspace = true }
diff --git a/crates/catalog/loader/src/lib.rs b/crates/catalog/loader/src/lib.rs
index e5fce468..c1e88b0c 100644
--- a/crates/catalog/loader/src/lib.rs
+++ b/crates/catalog/loader/src/lib.rs
@@ -20,6 +20,7 @@ use std::sync::Arc;
use async_trait::async_trait;
use iceberg::{Catalog, CatalogBuilder, Error, ErrorKind, Result};
+use iceberg_catalog_glue::GlueCatalogBuilder;
use iceberg_catalog_rest::RestCatalogBuilder;
#[async_trait]
@@ -46,6 +47,7 @@ impl<T: CatalogBuilder + 'static> BoxedCatalogBuilder for T {
pub fn load(r#type: &str) -> Result<Box<dyn BoxedCatalogBuilder>> {
match r#type {
"rest" => Ok(Box::new(RestCatalogBuilder::default()) as Box<dyn
BoxedCatalogBuilder>),
+ "glue" => Ok(Box::new(GlueCatalogBuilder::default()) as Box<dyn
BoxedCatalogBuilder>),
_ => Err(Error::new(
ErrorKind::FeatureUnsupported,
format!("Unsupported catalog type: {}", r#type),
@@ -57,12 +59,12 @@ pub fn load(r#type: &str) -> Result<Box<dyn
BoxedCatalogBuilder>> {
mod tests {
use std::collections::HashMap;
- use iceberg_catalog_rest::REST_CATALOG_PROP_URI;
-
use crate::load;
#[tokio::test]
async fn test_load_rest_catalog() {
+ use iceberg_catalog_rest::REST_CATALOG_PROP_URI;
+
let catalog_loader = load("rest").unwrap();
let catalog = catalog_loader
.load(
@@ -79,4 +81,25 @@ mod tests {
assert!(catalog.is_ok());
}
+
+ #[tokio::test]
+ async fn test_load_glue_catalog() {
+ use iceberg_catalog_glue::GLUE_CATALOG_PROP_WAREHOUSE;
+
+ let catalog_loader = load("glue").unwrap();
+ let catalog = catalog_loader
+ .load(
+ "glue".to_string(),
+ HashMap::from([
+ (
+ GLUE_CATALOG_PROP_WAREHOUSE.to_string(),
+ "s3://test".to_string(),
+ ),
+ ("key".to_string(), "value".to_string()),
+ ]),
+ )
+ .await;
+
+ assert!(catalog.is_ok());
+ }
}