liurenjie1024 commented on code in PR #229:
URL: https://github.com/apache/iceberg-rust/pull/229#discussion_r1579259139


##########
crates/catalog/sql/Cargo.toml:
##########
@@ -0,0 +1,53 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[package]
+name = "iceberg-catalog-sql"
+version = { workspace = true }
+edition = { workspace = true }
+homepage = { workspace = true }
+rust-version = { workspace = true }
+
+categories = ["database"]
+description = "Apache Iceberg Rust Sql Catalog"
+repository = { workspace = true }
+license = { workspace = true }
+keywords = ["iceberg", "sql", "catalog"]
+
+[dependencies]
+anyhow = { workspace = true }
+async-trait = { workspace = true }
+chrono = { workspace = true }
+dashmap = "5.5.3"
+futures = { workspace = true }
+iceberg = { workspace = true }
+log = { workspace = true }
+opendal = { workspace = true }
+serde = { workspace = true }
+serde_derive = { workspace = true }
+serde_json = { workspace = true }
+sqlx = { version = "0.7.4", features = ["tls-rustls", "any", "sqlite", 
"postgres", "mysql"], default-features = false }
+typed-builder = { workspace = true }
+url = { workspace = true }
+urlencoding = { workspace = true }
+uuid = { workspace = true, features = ["v4"] }
+
+[dev-dependencies]
+iceberg_test_utils = { path = "../../test_utils", features = ["tests"] }
+sqlx = { version = "0.7.4", features = ["tls-rustls", "runtime-tokio", "any", 
"sqlite", "postgres", "mysql","migrate"], default-features = false }

Review Comment:
   Ditto.



##########
crates/catalog/sql/Cargo.toml:
##########
@@ -0,0 +1,53 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[package]
+name = "iceberg-catalog-sql"
+version = { workspace = true }
+edition = { workspace = true }
+homepage = { workspace = true }
+rust-version = { workspace = true }
+
+categories = ["database"]
+description = "Apache Iceberg Rust Sql Catalog"
+repository = { workspace = true }
+license = { workspace = true }
+keywords = ["iceberg", "sql", "catalog"]
+
+[dependencies]
+anyhow = { workspace = true }
+async-trait = { workspace = true }
+chrono = { workspace = true }
+dashmap = "5.5.3"
+futures = { workspace = true }
+iceberg = { workspace = true }
+log = { workspace = true }
+opendal = { workspace = true }
+serde = { workspace = true }
+serde_derive = { workspace = true }
+serde_json = { workspace = true }
+sqlx = { version = "0.7.4", features = ["tls-rustls", "any", "sqlite", 
"postgres", "mysql"], default-features = false }

Review Comment:
   Is it possible to add features for each db so that the binary doesn't 
contains all implementations for all dbs?



##########
crates/catalog/sql/src/catalog.rs:
##########
@@ -0,0 +1,517 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use dashmap::DashMap;
+use futures::{AsyncReadExt, AsyncWriteExt};
+use sqlx::any::AnyPoolOptions;
+use sqlx::{
+    any::{install_default_drivers, AnyRow},
+    AnyPool, Row,
+};
+use std::collections::HashMap;
+
+use iceberg::{
+    io::FileIO,
+    spec::{TableMetadata, TableMetadataBuilder},
+    table::Table,
+    Catalog, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit, 
TableCreation,
+    TableIdent,
+};
+use std::time::Duration;
+use typed_builder::TypedBuilder;
+use uuid::Uuid;
+
+use crate::error::from_sqlx_error;
+
+static CATALOG_TABLE_VIEW_NAME: &str = "iceberg_tables";
+static CATALOG_NAME: &str = "catalog_name";
+static TABLE_NAME: &str = "table_name";
+static TABLE_NAMESPACE: &str = "table_namespace";
+static METADATA_LOCATION_PROP: &str = "metadata_location";
+static PREVIOUS_METADATA_LOCATION_PROP: &str = "previous_metadata_location";
+static RECORD_TYPE: &str = "iceberg_type";
+
+/// Sql catalog config
+#[derive(Debug, TypedBuilder)]
+pub struct SqlCatalogConfig {
+    url: String,
+    name: String,
+    warehouse: String,
+    #[builder(default)]
+    props: HashMap<String, String>,
+}
+
+#[derive(Debug)]
+/// Sql catalog implementation.
+pub struct SqlCatalog {
+    name: String,
+    connection: AnyPool,
+    storage: FileIO,
+    cache: Arc<DashMap<TableIdent, (String, TableMetadata)>>,
+}
+
+impl SqlCatalog {
+    /// Create new sql catalog instance
+    pub async fn new(config: SqlCatalogConfig) -> Result<Self> {
+        install_default_drivers();
+        let max_connections: u32 = config
+            .props
+            .get("pool.max-connections")
+            .map(|v| v.parse().unwrap())
+            .unwrap_or(10);
+        let idle_timeout: u64 = config
+            .props
+            .get("pool.idle-timeout")
+            .map(|v| v.parse().unwrap())
+            .unwrap_or(10);
+        let test_before_acquire: bool = config
+            .props
+            .get("pool.test-before-acquire")
+            .map(|v| v.parse().unwrap())
+            .unwrap_or(true);
+
+        let pool = AnyPoolOptions::new()
+            .max_connections(max_connections)
+            .idle_timeout(Duration::from_secs(idle_timeout))
+            .test_before_acquire(test_before_acquire)
+            .connect(&config.url)
+            .await
+            .map_err(from_sqlx_error)?;
+
+        sqlx::query(
+            &("create table if not exists ".to_string()
+                + CATALOG_TABLE_VIEW_NAME
+                + " ("
+                + CATALOG_NAME
+                + " varchar(255) not null,"
+                + TABLE_NAMESPACE
+                + " varchar(255) not null,"
+                + TABLE_NAME
+                + " varchar(255) not null,"
+                + METADATA_LOCATION_PROP
+                + " varchar(255),"
+                + PREVIOUS_METADATA_LOCATION_PROP
+                + " varchar(255),"
+                + RECORD_TYPE
+                + " varchar(5), primary key ("
+                + CATALOG_NAME
+                + ", "
+                + TABLE_NAMESPACE
+                + ", "
+                + TABLE_NAME
+                + ")
+                        );"),
+        )
+        .execute(&pool)
+        .await
+        .map_err(from_sqlx_error)?;
+
+        sqlx::query(
+            "create table if not exists iceberg_namespace_properties (
+                            catalog_name varchar(255) not null,
+                            namespace varchar(255) not null,
+                            property_key varchar(255),
+                            property_value varchar(255),
+                            primary key (catalog_name, namespace, property_key)
+                        );",
+        )
+        .execute(&pool)
+        .await
+        .map_err(from_sqlx_error)?;
+
+        let file_io = FileIO::from_path(&config.warehouse)?
+            .with_props(&config.props)
+            .build()?;
+
+        Ok(SqlCatalog {
+            name: config.name.to_owned(),
+            connection: pool,
+            storage: file_io,
+            cache: Arc::new(DashMap::new()),
+        })
+    }
+}
+
+#[derive(Debug)]
+struct TableRef {
+    table_namespace: String,
+    table_name: String,
+    metadata_location: String,
+    _previous_metadata_location: Option<String>,
+}
+
+fn query_map(row: &AnyRow) -> std::result::Result<TableRef, sqlx::Error> {
+    Ok(TableRef {
+        table_namespace: row.try_get(0)?,
+        table_name: row.try_get(1)?,
+        metadata_location: row.try_get(2)?,
+        _previous_metadata_location: row.try_get::<String, 
_>(3).map(Some).or_else(|err| {
+            if let sqlx::Error::ColumnDecode {
+                index: _,
+                source: _,
+            } = err
+            {
+                Ok(None)
+            } else {
+                Err(err)
+            }
+        })?,
+    })
+}
+
+#[async_trait]
+impl Catalog for SqlCatalog {
+    async fn list_namespaces(
+        &self,
+        _parent: Option<&NamespaceIdent>,

Review Comment:
   The `_parent` should not be ignored.



##########
crates/catalog/sql/src/catalog.rs:
##########
@@ -0,0 +1,517 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use dashmap::DashMap;
+use futures::{AsyncReadExt, AsyncWriteExt};
+use sqlx::any::AnyPoolOptions;
+use sqlx::{
+    any::{install_default_drivers, AnyRow},
+    AnyPool, Row,
+};
+use std::collections::HashMap;
+
+use iceberg::{
+    io::FileIO,
+    spec::{TableMetadata, TableMetadataBuilder},
+    table::Table,
+    Catalog, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit, 
TableCreation,
+    TableIdent,
+};
+use std::time::Duration;
+use typed_builder::TypedBuilder;
+use uuid::Uuid;
+
+use crate::error::from_sqlx_error;
+
+static CATALOG_TABLE_VIEW_NAME: &str = "iceberg_tables";
+static CATALOG_NAME: &str = "catalog_name";
+static TABLE_NAME: &str = "table_name";
+static TABLE_NAMESPACE: &str = "table_namespace";
+static METADATA_LOCATION_PROP: &str = "metadata_location";
+static PREVIOUS_METADATA_LOCATION_PROP: &str = "previous_metadata_location";
+static RECORD_TYPE: &str = "iceberg_type";
+
+/// Sql catalog config
+#[derive(Debug, TypedBuilder)]
+pub struct SqlCatalogConfig {
+    url: String,
+    name: String,
+    warehouse: String,
+    #[builder(default)]
+    props: HashMap<String, String>,
+}
+
+#[derive(Debug)]
+/// Sql catalog implementation.
+pub struct SqlCatalog {
+    name: String,
+    connection: AnyPool,
+    storage: FileIO,

Review Comment:
   ```suggestion
       fileio: FileIO,
   ```
   
   It's not blocking, but `storage` seems confusing to me.



##########
crates/catalog/sql/src/catalog.rs:
##########
@@ -0,0 +1,517 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use dashmap::DashMap;
+use futures::{AsyncReadExt, AsyncWriteExt};
+use sqlx::any::AnyPoolOptions;
+use sqlx::{
+    any::{install_default_drivers, AnyRow},
+    AnyPool, Row,
+};
+use std::collections::HashMap;
+
+use iceberg::{
+    io::FileIO,
+    spec::{TableMetadata, TableMetadataBuilder},
+    table::Table,
+    Catalog, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit, 
TableCreation,
+    TableIdent,
+};
+use std::time::Duration;
+use typed_builder::TypedBuilder;
+use uuid::Uuid;
+
+use crate::error::from_sqlx_error;
+
+static CATALOG_TABLE_VIEW_NAME: &str = "iceberg_tables";
+static CATALOG_NAME: &str = "catalog_name";
+static TABLE_NAME: &str = "table_name";
+static TABLE_NAMESPACE: &str = "table_namespace";
+static METADATA_LOCATION_PROP: &str = "metadata_location";
+static PREVIOUS_METADATA_LOCATION_PROP: &str = "previous_metadata_location";
+static RECORD_TYPE: &str = "iceberg_type";
+
+/// Sql catalog config
+#[derive(Debug, TypedBuilder)]
+pub struct SqlCatalogConfig {
+    url: String,
+    name: String,
+    warehouse: String,
+    #[builder(default)]
+    props: HashMap<String, String>,
+}
+
+#[derive(Debug)]
+/// Sql catalog implementation.
+pub struct SqlCatalog {
+    name: String,
+    connection: AnyPool,
+    storage: FileIO,
+    cache: Arc<DashMap<TableIdent, (String, TableMetadata)>>,
+}
+
+impl SqlCatalog {
+    /// Create new sql catalog instance
+    pub async fn new(config: SqlCatalogConfig) -> Result<Self> {
+        install_default_drivers();
+        let max_connections: u32 = config
+            .props
+            .get("pool.max-connections")
+            .map(|v| v.parse().unwrap())
+            .unwrap_or(10);
+        let idle_timeout: u64 = config
+            .props
+            .get("pool.idle-timeout")
+            .map(|v| v.parse().unwrap())
+            .unwrap_or(10);
+        let test_before_acquire: bool = config
+            .props
+            .get("pool.test-before-acquire")
+            .map(|v| v.parse().unwrap())
+            .unwrap_or(true);
+
+        let pool = AnyPoolOptions::new()
+            .max_connections(max_connections)
+            .idle_timeout(Duration::from_secs(idle_timeout))
+            .test_before_acquire(test_before_acquire)
+            .connect(&config.url)
+            .await
+            .map_err(from_sqlx_error)?;
+
+        sqlx::query(
+            &("create table if not exists ".to_string()
+                + CATALOG_TABLE_VIEW_NAME
+                + " ("
+                + CATALOG_NAME
+                + " varchar(255) not null,"
+                + TABLE_NAMESPACE
+                + " varchar(255) not null,"
+                + TABLE_NAME
+                + " varchar(255) not null,"
+                + METADATA_LOCATION_PROP
+                + " varchar(255),"
+                + PREVIOUS_METADATA_LOCATION_PROP
+                + " varchar(255),"
+                + RECORD_TYPE
+                + " varchar(5), primary key ("
+                + CATALOG_NAME
+                + ", "
+                + TABLE_NAMESPACE
+                + ", "
+                + TABLE_NAME
+                + ")
+                        );"),
+        )
+        .execute(&pool)
+        .await
+        .map_err(from_sqlx_error)?;
+
+        sqlx::query(
+            "create table if not exists iceberg_namespace_properties (
+                            catalog_name varchar(255) not null,
+                            namespace varchar(255) not null,
+                            property_key varchar(255),
+                            property_value varchar(255),
+                            primary key (catalog_name, namespace, property_key)
+                        );",
+        )
+        .execute(&pool)
+        .await
+        .map_err(from_sqlx_error)?;
+
+        let file_io = FileIO::from_path(&config.warehouse)?
+            .with_props(&config.props)
+            .build()?;
+
+        Ok(SqlCatalog {
+            name: config.name.to_owned(),
+            connection: pool,
+            storage: file_io,
+            cache: Arc::new(DashMap::new()),
+        })
+    }
+}
+
+#[derive(Debug)]
+struct TableRef {
+    table_namespace: String,
+    table_name: String,
+    metadata_location: String,
+    _previous_metadata_location: Option<String>,
+}
+
+fn query_map(row: &AnyRow) -> std::result::Result<TableRef, sqlx::Error> {
+    Ok(TableRef {
+        table_namespace: row.try_get(0)?,
+        table_name: row.try_get(1)?,
+        metadata_location: row.try_get(2)?,
+        _previous_metadata_location: row.try_get::<String, 
_>(3).map(Some).or_else(|err| {
+            if let sqlx::Error::ColumnDecode {
+                index: _,
+                source: _,
+            } = err
+            {
+                Ok(None)
+            } else {
+                Err(err)
+            }
+        })?,
+    })
+}
+
+#[async_trait]
+impl Catalog for SqlCatalog {
+    async fn list_namespaces(
+        &self,
+        _parent: Option<&NamespaceIdent>,
+    ) -> Result<Vec<NamespaceIdent>> {
+        let name = self.name.clone();
+        let rows = sqlx::query(
+            "select distinct table_namespace from iceberg_tables where 
catalog_name = ?;",

Review Comment:
   This seems incorrect to me, if a namespace has no table or view, it's 
missing here. We should query namespace table?



##########
crates/catalog/sql/src/catalog.rs:
##########
@@ -0,0 +1,517 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use dashmap::DashMap;
+use futures::{AsyncReadExt, AsyncWriteExt};
+use sqlx::any::AnyPoolOptions;
+use sqlx::{
+    any::{install_default_drivers, AnyRow},
+    AnyPool, Row,
+};
+use std::collections::HashMap;
+
+use iceberg::{
+    io::FileIO,
+    spec::{TableMetadata, TableMetadataBuilder},
+    table::Table,
+    Catalog, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit, 
TableCreation,
+    TableIdent,
+};
+use std::time::Duration;
+use typed_builder::TypedBuilder;
+use uuid::Uuid;
+
+use crate::error::from_sqlx_error;
+
+static CATALOG_TABLE_VIEW_NAME: &str = "iceberg_tables";
+static CATALOG_NAME: &str = "catalog_name";
+static TABLE_NAME: &str = "table_name";
+static TABLE_NAMESPACE: &str = "table_namespace";
+static METADATA_LOCATION_PROP: &str = "metadata_location";
+static PREVIOUS_METADATA_LOCATION_PROP: &str = "previous_metadata_location";
+static RECORD_TYPE: &str = "iceberg_type";
+
+/// Sql catalog config
+#[derive(Debug, TypedBuilder)]
+pub struct SqlCatalogConfig {
+    url: String,
+    name: String,
+    warehouse: String,
+    #[builder(default)]
+    props: HashMap<String, String>,
+}
+
+#[derive(Debug)]
+/// Sql catalog implementation.
+pub struct SqlCatalog {
+    name: String,
+    connection: AnyPool,
+    storage: FileIO,
+    cache: Arc<DashMap<TableIdent, (String, TableMetadata)>>,

Review Comment:
   I'm hesitating to add cache here, maybe we can add sth like `CachedCatalog` 
in java so that all catalog implementations could benefit from it?



##########
crates/catalog/sql/src/catalog.rs:
##########
@@ -0,0 +1,517 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use dashmap::DashMap;
+use futures::{AsyncReadExt, AsyncWriteExt};
+use sqlx::any::AnyPoolOptions;
+use sqlx::{
+    any::{install_default_drivers, AnyRow},
+    AnyPool, Row,
+};
+use std::collections::HashMap;
+
+use iceberg::{
+    io::FileIO,
+    spec::{TableMetadata, TableMetadataBuilder},
+    table::Table,
+    Catalog, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit, 
TableCreation,
+    TableIdent,
+};
+use std::time::Duration;
+use typed_builder::TypedBuilder;
+use uuid::Uuid;
+
+use crate::error::from_sqlx_error;
+
+static CATALOG_TABLE_VIEW_NAME: &str = "iceberg_tables";
+static CATALOG_NAME: &str = "catalog_name";
+static TABLE_NAME: &str = "table_name";
+static TABLE_NAMESPACE: &str = "table_namespace";
+static METADATA_LOCATION_PROP: &str = "metadata_location";
+static PREVIOUS_METADATA_LOCATION_PROP: &str = "previous_metadata_location";
+static RECORD_TYPE: &str = "iceberg_type";
+
+/// Sql catalog config
+#[derive(Debug, TypedBuilder)]
+pub struct SqlCatalogConfig {
+    url: String,
+    name: String,
+    warehouse: String,
+    #[builder(default)]
+    props: HashMap<String, String>,
+}
+
+#[derive(Debug)]
+/// Sql catalog implementation.
+pub struct SqlCatalog {
+    name: String,
+    connection: AnyPool,
+    storage: FileIO,
+    cache: Arc<DashMap<TableIdent, (String, TableMetadata)>>,
+}
+
+impl SqlCatalog {
+    /// Create new sql catalog instance
+    pub async fn new(config: SqlCatalogConfig) -> Result<Self> {
+        install_default_drivers();
+        let max_connections: u32 = config
+            .props
+            .get("pool.max-connections")

Review Comment:
   I would suggest to make these configurations and default values as 
constants, but it's not a blocker. 



##########
crates/catalog/sql/src/catalog.rs:
##########
@@ -0,0 +1,517 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use dashmap::DashMap;
+use futures::{AsyncReadExt, AsyncWriteExt};
+use sqlx::any::AnyPoolOptions;
+use sqlx::{
+    any::{install_default_drivers, AnyRow},
+    AnyPool, Row,
+};
+use std::collections::HashMap;
+
+use iceberg::{
+    io::FileIO,
+    spec::{TableMetadata, TableMetadataBuilder},
+    table::Table,
+    Catalog, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit, 
TableCreation,
+    TableIdent,
+};
+use std::time::Duration;
+use typed_builder::TypedBuilder;
+use uuid::Uuid;
+
+use crate::error::from_sqlx_error;
+
+static CATALOG_TABLE_VIEW_NAME: &str = "iceberg_tables";
+static CATALOG_NAME: &str = "catalog_name";
+static TABLE_NAME: &str = "table_name";
+static TABLE_NAMESPACE: &str = "table_namespace";
+static METADATA_LOCATION_PROP: &str = "metadata_location";
+static PREVIOUS_METADATA_LOCATION_PROP: &str = "previous_metadata_location";
+static RECORD_TYPE: &str = "iceberg_type";
+
+/// Sql catalog config
+#[derive(Debug, TypedBuilder)]
+pub struct SqlCatalogConfig {
+    url: String,

Review Comment:
   Other catalogs follow the pattern of `uri`, it would be better to make is 
consistent.



##########
crates/iceberg/src/spec/table_metadata.rs:
##########
@@ -867,10 +866,12 @@ pub(super) mod _serde {
 #[derive(Debug, Serialize_repr, Deserialize_repr, PartialEq, Eq, Clone, Copy)]
 #[repr(u8)]
 /// Iceberg format version
+#[derive(Default)]

Review Comment:
   Is it possible to make this same line as other derives?



##########
crates/catalog/sql/src/catalog.rs:
##########
@@ -0,0 +1,517 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use dashmap::DashMap;
+use futures::{AsyncReadExt, AsyncWriteExt};
+use sqlx::any::AnyPoolOptions;
+use sqlx::{
+    any::{install_default_drivers, AnyRow},
+    AnyPool, Row,
+};
+use std::collections::HashMap;
+
+use iceberg::{
+    io::FileIO,
+    spec::{TableMetadata, TableMetadataBuilder},
+    table::Table,
+    Catalog, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit, 
TableCreation,
+    TableIdent,
+};
+use std::time::Duration;
+use typed_builder::TypedBuilder;
+use uuid::Uuid;
+
+use crate::error::from_sqlx_error;
+
+static CATALOG_TABLE_VIEW_NAME: &str = "iceberg_tables";
+static CATALOG_NAME: &str = "catalog_name";
+static TABLE_NAME: &str = "table_name";
+static TABLE_NAMESPACE: &str = "table_namespace";
+static METADATA_LOCATION_PROP: &str = "metadata_location";
+static PREVIOUS_METADATA_LOCATION_PROP: &str = "previous_metadata_location";
+static RECORD_TYPE: &str = "iceberg_type";
+
+/// Sql catalog config
+#[derive(Debug, TypedBuilder)]
+pub struct SqlCatalogConfig {
+    url: String,
+    name: String,
+    warehouse: String,
+    #[builder(default)]
+    props: HashMap<String, String>,
+}
+
+#[derive(Debug)]
+/// Sql catalog implementation.
+pub struct SqlCatalog {
+    name: String,
+    connection: AnyPool,
+    storage: FileIO,
+    cache: Arc<DashMap<TableIdent, (String, TableMetadata)>>,
+}
+
+impl SqlCatalog {
+    /// Create new sql catalog instance
+    pub async fn new(config: SqlCatalogConfig) -> Result<Self> {
+        install_default_drivers();
+        let max_connections: u32 = config
+            .props
+            .get("pool.max-connections")
+            .map(|v| v.parse().unwrap())
+            .unwrap_or(10);
+        let idle_timeout: u64 = config
+            .props
+            .get("pool.idle-timeout")
+            .map(|v| v.parse().unwrap())
+            .unwrap_or(10);
+        let test_before_acquire: bool = config
+            .props
+            .get("pool.test-before-acquire")
+            .map(|v| v.parse().unwrap())
+            .unwrap_or(true);
+
+        let pool = AnyPoolOptions::new()
+            .max_connections(max_connections)
+            .idle_timeout(Duration::from_secs(idle_timeout))
+            .test_before_acquire(test_before_acquire)
+            .connect(&config.url)
+            .await
+            .map_err(from_sqlx_error)?;
+
+        sqlx::query(
+            &("create table if not exists ".to_string()
+                + CATALOG_TABLE_VIEW_NAME
+                + " ("
+                + CATALOG_NAME
+                + " varchar(255) not null,"
+                + TABLE_NAMESPACE
+                + " varchar(255) not null,"
+                + TABLE_NAME
+                + " varchar(255) not null,"
+                + METADATA_LOCATION_PROP
+                + " varchar(255),"
+                + PREVIOUS_METADATA_LOCATION_PROP
+                + " varchar(255),"
+                + RECORD_TYPE
+                + " varchar(5), primary key ("
+                + CATALOG_NAME
+                + ", "
+                + TABLE_NAMESPACE
+                + ", "
+                + TABLE_NAME
+                + ")
+                        );"),
+        )
+        .execute(&pool)
+        .await
+        .map_err(from_sqlx_error)?;
+
+        sqlx::query(
+            "create table if not exists iceberg_namespace_properties (
+                            catalog_name varchar(255) not null,
+                            namespace varchar(255) not null,
+                            property_key varchar(255),
+                            property_value varchar(255),
+                            primary key (catalog_name, namespace, property_key)
+                        );",
+        )
+        .execute(&pool)
+        .await
+        .map_err(from_sqlx_error)?;
+
+        let file_io = FileIO::from_path(&config.warehouse)?
+            .with_props(&config.props)
+            .build()?;
+
+        Ok(SqlCatalog {
+            name: config.name.to_owned(),
+            connection: pool,
+            storage: file_io,
+            cache: Arc::new(DashMap::new()),
+        })
+    }
+}
+
+#[derive(Debug)]
+struct TableRef {
+    table_namespace: String,
+    table_name: String,
+    metadata_location: String,
+    _previous_metadata_location: Option<String>,
+}
+
+fn query_map(row: &AnyRow) -> std::result::Result<TableRef, sqlx::Error> {
+    Ok(TableRef {
+        table_namespace: row.try_get(0)?,
+        table_name: row.try_get(1)?,
+        metadata_location: row.try_get(2)?,
+        _previous_metadata_location: row.try_get::<String, 
_>(3).map(Some).or_else(|err| {
+            if let sqlx::Error::ColumnDecode {
+                index: _,
+                source: _,
+            } = err
+            {
+                Ok(None)
+            } else {
+                Err(err)
+            }
+        })?,
+    })
+}
+
+#[async_trait]
+impl Catalog for SqlCatalog {
+    async fn list_namespaces(
+        &self,
+        _parent: Option<&NamespaceIdent>,
+    ) -> Result<Vec<NamespaceIdent>> {
+        let name = self.name.clone();
+        let rows = sqlx::query(
+            "select distinct table_namespace from iceberg_tables where 
catalog_name = ?;",
+        )
+        .bind(&name)
+        .fetch_all(&self.connection)
+        .await
+        .map_err(from_sqlx_error)?;
+        let iter = rows.iter().map(|row| row.try_get::<String, _>(0));
+
+        Ok(iter
+            .map(|x| {
+                x.and_then(|y| {
+                    NamespaceIdent::from_vec(
+                        
y.split('.').map(ToString::to_string).collect::<Vec<_>>(),
+                    )
+                    .map_err(|err| sqlx::Error::Decode(Box::new(err)))
+                })
+            })
+            .collect::<std::result::Result<_, sqlx::Error>>()
+            .map_err(from_sqlx_error)?)
+    }
+
+    async fn create_namespace(
+        &self,
+        _namespace: &NamespaceIdent,
+        _properties: HashMap<String, String>,
+    ) -> Result<Namespace> {
+        todo!()
+    }
+
+    async fn get_namespace(&self, _namespace: &NamespaceIdent) -> 
Result<Namespace> {
+        todo!()
+    }
+
+    async fn namespace_exists(&self, _namespace: &NamespaceIdent) -> 
Result<bool> {
+        todo!()
+    }
+
+    async fn update_namespace(
+        &self,
+        _namespace: &NamespaceIdent,
+        _properties: HashMap<String, String>,
+    ) -> Result<()> {
+        todo!()
+    }
+
+    async fn drop_namespace(&self, _namespace: &NamespaceIdent) -> Result<()> {
+        todo!()
+    }
+
+    async fn list_tables(&self, namespace: &NamespaceIdent) -> 
Result<Vec<TableIdent>> {
+        let name = self.name.clone();
+        let namespace = namespace.join(".");
+        let rows = sqlx::query(
+            &("select ".to_string()
+                + TABLE_NAMESPACE
+                + ", "
+                + TABLE_NAME
+                + ", "
+                + METADATA_LOCATION_PROP
+                + ", "
+                + PREVIOUS_METADATA_LOCATION_PROP
+                + " from "
+                + CATALOG_TABLE_VIEW_NAME
+                + " where "
+                + CATALOG_NAME
+                + " = ? and "
+                + TABLE_NAMESPACE
+                + "= ?;"),

Review Comment:
   Should we also add `record_type = ?`;



##########
crates/catalog/sql/src/catalog.rs:
##########
@@ -0,0 +1,517 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use dashmap::DashMap;
+use futures::{AsyncReadExt, AsyncWriteExt};
+use sqlx::any::AnyPoolOptions;
+use sqlx::{
+    any::{install_default_drivers, AnyRow},
+    AnyPool, Row,
+};
+use std::collections::HashMap;
+
+use iceberg::{
+    io::FileIO,
+    spec::{TableMetadata, TableMetadataBuilder},
+    table::Table,
+    Catalog, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit, 
TableCreation,
+    TableIdent,
+};
+use std::time::Duration;
+use typed_builder::TypedBuilder;
+use uuid::Uuid;
+
+use crate::error::from_sqlx_error;
+
+static CATALOG_TABLE_VIEW_NAME: &str = "iceberg_tables";
+static CATALOG_NAME: &str = "catalog_name";
+static TABLE_NAME: &str = "table_name";
+static TABLE_NAMESPACE: &str = "table_namespace";
+static METADATA_LOCATION_PROP: &str = "metadata_location";
+static PREVIOUS_METADATA_LOCATION_PROP: &str = "previous_metadata_location";
+static RECORD_TYPE: &str = "iceberg_type";
+
+/// Sql catalog config
+#[derive(Debug, TypedBuilder)]
+pub struct SqlCatalogConfig {
+    url: String,
+    name: String,
+    warehouse: String,
+    #[builder(default)]
+    props: HashMap<String, String>,
+}
+
+#[derive(Debug)]
+/// Sql catalog implementation.
+pub struct SqlCatalog {
+    name: String,
+    connection: AnyPool,
+    storage: FileIO,
+    cache: Arc<DashMap<TableIdent, (String, TableMetadata)>>,
+}
+
+impl SqlCatalog {
+    /// Create new sql catalog instance
+    pub async fn new(config: SqlCatalogConfig) -> Result<Self> {
+        install_default_drivers();
+        let max_connections: u32 = config
+            .props
+            .get("pool.max-connections")
+            .map(|v| v.parse().unwrap())
+            .unwrap_or(10);
+        let idle_timeout: u64 = config
+            .props
+            .get("pool.idle-timeout")
+            .map(|v| v.parse().unwrap())
+            .unwrap_or(10);
+        let test_before_acquire: bool = config
+            .props
+            .get("pool.test-before-acquire")
+            .map(|v| v.parse().unwrap())
+            .unwrap_or(true);
+
+        let pool = AnyPoolOptions::new()
+            .max_connections(max_connections)
+            .idle_timeout(Duration::from_secs(idle_timeout))
+            .test_before_acquire(test_before_acquire)
+            .connect(&config.url)
+            .await
+            .map_err(from_sqlx_error)?;
+
+        sqlx::query(
+            &("create table if not exists ".to_string()
+                + CATALOG_TABLE_VIEW_NAME
+                + " ("
+                + CATALOG_NAME
+                + " varchar(255) not null,"
+                + TABLE_NAMESPACE
+                + " varchar(255) not null,"
+                + TABLE_NAME
+                + " varchar(255) not null,"
+                + METADATA_LOCATION_PROP
+                + " varchar(255),"
+                + PREVIOUS_METADATA_LOCATION_PROP
+                + " varchar(255),"
+                + RECORD_TYPE
+                + " varchar(5), primary key ("
+                + CATALOG_NAME
+                + ", "
+                + TABLE_NAMESPACE
+                + ", "
+                + TABLE_NAME
+                + ")
+                        );"),
+        )
+        .execute(&pool)
+        .await
+        .map_err(from_sqlx_error)?;
+
+        sqlx::query(
+            "create table if not exists iceberg_namespace_properties (
+                            catalog_name varchar(255) not null,
+                            namespace varchar(255) not null,
+                            property_key varchar(255),
+                            property_value varchar(255),
+                            primary key (catalog_name, namespace, property_key)
+                        );",
+        )
+        .execute(&pool)
+        .await
+        .map_err(from_sqlx_error)?;
+
+        let file_io = FileIO::from_path(&config.warehouse)?
+            .with_props(&config.props)
+            .build()?;
+
+        Ok(SqlCatalog {
+            name: config.name.to_owned(),
+            connection: pool,
+            storage: file_io,
+            cache: Arc::new(DashMap::new()),
+        })
+    }
+}
+
+#[derive(Debug)]
+struct TableRef {
+    table_namespace: String,
+    table_name: String,
+    metadata_location: String,
+    _previous_metadata_location: Option<String>,
+}
+
+fn query_map(row: &AnyRow) -> std::result::Result<TableRef, sqlx::Error> {
+    Ok(TableRef {
+        table_namespace: row.try_get(0)?,
+        table_name: row.try_get(1)?,
+        metadata_location: row.try_get(2)?,
+        _previous_metadata_location: row.try_get::<String, 
_>(3).map(Some).or_else(|err| {
+            if let sqlx::Error::ColumnDecode {
+                index: _,
+                source: _,
+            } = err
+            {
+                Ok(None)
+            } else {
+                Err(err)
+            }
+        })?,
+    })
+}
+
+#[async_trait]
+impl Catalog for SqlCatalog {
+    async fn list_namespaces(
+        &self,
+        _parent: Option<&NamespaceIdent>,
+    ) -> Result<Vec<NamespaceIdent>> {
+        let name = self.name.clone();
+        let rows = sqlx::query(
+            "select distinct table_namespace from iceberg_tables where 
catalog_name = ?;",
+        )
+        .bind(&name)
+        .fetch_all(&self.connection)
+        .await
+        .map_err(from_sqlx_error)?;
+        let iter = rows.iter().map(|row| row.try_get::<String, _>(0));
+
+        Ok(iter
+            .map(|x| {
+                x.and_then(|y| {
+                    NamespaceIdent::from_vec(
+                        
y.split('.').map(ToString::to_string).collect::<Vec<_>>(),
+                    )
+                    .map_err(|err| sqlx::Error::Decode(Box::new(err)))
+                })
+            })
+            .collect::<std::result::Result<_, sqlx::Error>>()
+            .map_err(from_sqlx_error)?)
+    }
+
+    async fn create_namespace(
+        &self,
+        _namespace: &NamespaceIdent,
+        _properties: HashMap<String, String>,
+    ) -> Result<Namespace> {
+        todo!()
+    }
+
+    async fn get_namespace(&self, _namespace: &NamespaceIdent) -> 
Result<Namespace> {
+        todo!()
+    }
+
+    async fn namespace_exists(&self, _namespace: &NamespaceIdent) -> 
Result<bool> {
+        todo!()
+    }
+
+    async fn update_namespace(
+        &self,
+        _namespace: &NamespaceIdent,
+        _properties: HashMap<String, String>,
+    ) -> Result<()> {
+        todo!()
+    }
+
+    async fn drop_namespace(&self, _namespace: &NamespaceIdent) -> Result<()> {
+        todo!()
+    }
+
+    async fn list_tables(&self, namespace: &NamespaceIdent) -> 
Result<Vec<TableIdent>> {
+        let name = self.name.clone();
+        let namespace = namespace.join(".");
+        let rows = sqlx::query(
+            &("select ".to_string()
+                + TABLE_NAMESPACE
+                + ", "
+                + TABLE_NAME
+                + ", "
+                + METADATA_LOCATION_PROP
+                + ", "
+                + PREVIOUS_METADATA_LOCATION_PROP
+                + " from "
+                + CATALOG_TABLE_VIEW_NAME
+                + " where "
+                + CATALOG_NAME
+                + " = ? and "
+                + TABLE_NAMESPACE
+                + "= ?;"),
+        )
+        .bind(&name)
+        .bind(&namespace)
+        .fetch_all(&self.connection)
+        .await
+        .map_err(from_sqlx_error)?;
+        let iter = rows.iter().map(query_map);
+
+        Ok(iter
+            .map(|x| {
+                x.and_then(|y| {
+                    let namespace = NamespaceIdent::from_vec(
+                        y.table_namespace
+                            .split('.')
+                            .map(ToString::to_string)
+                            .collect::<Vec<_>>(),
+                    )
+                    .map_err(|err| sqlx::Error::Decode(Box::new(err)))?;
+                    Ok(TableIdent::new(namespace, y.table_name))
+                })
+            })
+            .collect::<std::result::Result<_, sqlx::Error>>()
+            .map_err(from_sqlx_error)?)
+    }
+
+    async fn table_exists(&self, identifier: &TableIdent) -> Result<bool> {
+        let catalog_name = self.name.clone();
+        let namespace = identifier.namespace().encode_in_url();
+        let name = identifier.name().to_string();
+        let rows = sqlx::query(
+            &("select ".to_string()
+                + TABLE_NAMESPACE
+                + ", "
+                + TABLE_NAME
+                + ", "
+                + METADATA_LOCATION_PROP
+                + ", "
+                + PREVIOUS_METADATA_LOCATION_PROP
+                + " from "
+                + CATALOG_TABLE_VIEW_NAME
+                + " where "
+                + CATALOG_NAME
+                + " = ? and "
+                + TABLE_NAMESPACE
+                + " = ? and "
+                + TABLE_NAME
+                + " = ?;"),

Review Comment:
   Add `record_type = ?`?



##########
crates/catalog/sql/src/catalog.rs:
##########
@@ -0,0 +1,517 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use dashmap::DashMap;
+use futures::{AsyncReadExt, AsyncWriteExt};
+use sqlx::any::AnyPoolOptions;
+use sqlx::{
+    any::{install_default_drivers, AnyRow},
+    AnyPool, Row,
+};
+use std::collections::HashMap;
+
+use iceberg::{
+    io::FileIO,
+    spec::{TableMetadata, TableMetadataBuilder},
+    table::Table,
+    Catalog, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit, 
TableCreation,
+    TableIdent,
+};
+use std::time::Duration;
+use typed_builder::TypedBuilder;
+use uuid::Uuid;
+
+use crate::error::from_sqlx_error;
+
+static CATALOG_TABLE_VIEW_NAME: &str = "iceberg_tables";
+static CATALOG_NAME: &str = "catalog_name";
+static TABLE_NAME: &str = "table_name";
+static TABLE_NAMESPACE: &str = "table_namespace";
+static METADATA_LOCATION_PROP: &str = "metadata_location";
+static PREVIOUS_METADATA_LOCATION_PROP: &str = "previous_metadata_location";
+static RECORD_TYPE: &str = "iceberg_type";
+
+/// Sql catalog config
+#[derive(Debug, TypedBuilder)]
+pub struct SqlCatalogConfig {
+    url: String,
+    name: String,
+    warehouse: String,
+    #[builder(default)]
+    props: HashMap<String, String>,
+}
+
+#[derive(Debug)]
+/// Sql catalog implementation.
+pub struct SqlCatalog {
+    name: String,
+    connection: AnyPool,
+    storage: FileIO,
+    cache: Arc<DashMap<TableIdent, (String, TableMetadata)>>,
+}
+
+impl SqlCatalog {
+    /// Create new sql catalog instance
+    pub async fn new(config: SqlCatalogConfig) -> Result<Self> {
+        install_default_drivers();
+        let max_connections: u32 = config
+            .props
+            .get("pool.max-connections")
+            .map(|v| v.parse().unwrap())
+            .unwrap_or(10);
+        let idle_timeout: u64 = config
+            .props
+            .get("pool.idle-timeout")
+            .map(|v| v.parse().unwrap())
+            .unwrap_or(10);
+        let test_before_acquire: bool = config
+            .props
+            .get("pool.test-before-acquire")
+            .map(|v| v.parse().unwrap())
+            .unwrap_or(true);
+
+        let pool = AnyPoolOptions::new()
+            .max_connections(max_connections)
+            .idle_timeout(Duration::from_secs(idle_timeout))
+            .test_before_acquire(test_before_acquire)
+            .connect(&config.url)
+            .await
+            .map_err(from_sqlx_error)?;
+
+        sqlx::query(
+            &("create table if not exists ".to_string()
+                + CATALOG_TABLE_VIEW_NAME
+                + " ("
+                + CATALOG_NAME
+                + " varchar(255) not null,"
+                + TABLE_NAMESPACE
+                + " varchar(255) not null,"
+                + TABLE_NAME
+                + " varchar(255) not null,"
+                + METADATA_LOCATION_PROP
+                + " varchar(255),"
+                + PREVIOUS_METADATA_LOCATION_PROP
+                + " varchar(255),"
+                + RECORD_TYPE
+                + " varchar(5), primary key ("
+                + CATALOG_NAME
+                + ", "
+                + TABLE_NAMESPACE
+                + ", "
+                + TABLE_NAME
+                + ")
+                        );"),
+        )
+        .execute(&pool)
+        .await
+        .map_err(from_sqlx_error)?;
+
+        sqlx::query(
+            "create table if not exists iceberg_namespace_properties (
+                            catalog_name varchar(255) not null,
+                            namespace varchar(255) not null,
+                            property_key varchar(255),
+                            property_value varchar(255),
+                            primary key (catalog_name, namespace, property_key)
+                        );",
+        )
+        .execute(&pool)
+        .await
+        .map_err(from_sqlx_error)?;
+
+        let file_io = FileIO::from_path(&config.warehouse)?
+            .with_props(&config.props)
+            .build()?;
+
+        Ok(SqlCatalog {
+            name: config.name.to_owned(),
+            connection: pool,
+            storage: file_io,
+            cache: Arc::new(DashMap::new()),
+        })
+    }
+}
+
+#[derive(Debug)]
+struct TableRef {
+    table_namespace: String,
+    table_name: String,
+    metadata_location: String,
+    _previous_metadata_location: Option<String>,
+}
+
+fn query_map(row: &AnyRow) -> std::result::Result<TableRef, sqlx::Error> {
+    Ok(TableRef {
+        table_namespace: row.try_get(0)?,
+        table_name: row.try_get(1)?,
+        metadata_location: row.try_get(2)?,
+        _previous_metadata_location: row.try_get::<String, 
_>(3).map(Some).or_else(|err| {
+            if let sqlx::Error::ColumnDecode {
+                index: _,
+                source: _,
+            } = err
+            {
+                Ok(None)
+            } else {
+                Err(err)
+            }
+        })?,
+    })
+}
+
+#[async_trait]
+impl Catalog for SqlCatalog {
+    async fn list_namespaces(
+        &self,
+        _parent: Option<&NamespaceIdent>,
+    ) -> Result<Vec<NamespaceIdent>> {
+        let name = self.name.clone();
+        let rows = sqlx::query(
+            "select distinct table_namespace from iceberg_tables where 
catalog_name = ?;",
+        )
+        .bind(&name)
+        .fetch_all(&self.connection)
+        .await
+        .map_err(from_sqlx_error)?;
+        let iter = rows.iter().map(|row| row.try_get::<String, _>(0));
+
+        Ok(iter
+            .map(|x| {
+                x.and_then(|y| {
+                    NamespaceIdent::from_vec(
+                        
y.split('.').map(ToString::to_string).collect::<Vec<_>>(),
+                    )
+                    .map_err(|err| sqlx::Error::Decode(Box::new(err)))
+                })
+            })
+            .collect::<std::result::Result<_, sqlx::Error>>()
+            .map_err(from_sqlx_error)?)
+    }
+
+    async fn create_namespace(
+        &self,
+        _namespace: &NamespaceIdent,
+        _properties: HashMap<String, String>,
+    ) -> Result<Namespace> {
+        todo!()
+    }
+
+    async fn get_namespace(&self, _namespace: &NamespaceIdent) -> 
Result<Namespace> {
+        todo!()
+    }
+
+    async fn namespace_exists(&self, _namespace: &NamespaceIdent) -> 
Result<bool> {
+        todo!()
+    }
+
+    async fn update_namespace(
+        &self,
+        _namespace: &NamespaceIdent,
+        _properties: HashMap<String, String>,
+    ) -> Result<()> {
+        todo!()
+    }
+
+    async fn drop_namespace(&self, _namespace: &NamespaceIdent) -> Result<()> {
+        todo!()
+    }
+
+    async fn list_tables(&self, namespace: &NamespaceIdent) -> 
Result<Vec<TableIdent>> {
+        let name = self.name.clone();
+        let namespace = namespace.join(".");
+        let rows = sqlx::query(
+            &("select ".to_string()
+                + TABLE_NAMESPACE
+                + ", "
+                + TABLE_NAME
+                + ", "
+                + METADATA_LOCATION_PROP
+                + ", "
+                + PREVIOUS_METADATA_LOCATION_PROP
+                + " from "
+                + CATALOG_TABLE_VIEW_NAME
+                + " where "
+                + CATALOG_NAME
+                + " = ? and "
+                + TABLE_NAMESPACE
+                + "= ?;"),
+        )
+        .bind(&name)
+        .bind(&namespace)
+        .fetch_all(&self.connection)
+        .await
+        .map_err(from_sqlx_error)?;
+        let iter = rows.iter().map(query_map);
+
+        Ok(iter
+            .map(|x| {
+                x.and_then(|y| {
+                    let namespace = NamespaceIdent::from_vec(
+                        y.table_namespace
+                            .split('.')
+                            .map(ToString::to_string)
+                            .collect::<Vec<_>>(),
+                    )
+                    .map_err(|err| sqlx::Error::Decode(Box::new(err)))?;
+                    Ok(TableIdent::new(namespace, y.table_name))
+                })
+            })
+            .collect::<std::result::Result<_, sqlx::Error>>()
+            .map_err(from_sqlx_error)?)
+    }
+
+    async fn table_exists(&self, identifier: &TableIdent) -> Result<bool> {
+        let catalog_name = self.name.clone();
+        let namespace = identifier.namespace().encode_in_url();
+        let name = identifier.name().to_string();
+        let rows = sqlx::query(
+            &("select ".to_string()
+                + TABLE_NAMESPACE
+                + ", "
+                + TABLE_NAME
+                + ", "
+                + METADATA_LOCATION_PROP
+                + ", "
+                + PREVIOUS_METADATA_LOCATION_PROP
+                + " from "
+                + CATALOG_TABLE_VIEW_NAME
+                + " where "
+                + CATALOG_NAME
+                + " = ? and "
+                + TABLE_NAMESPACE
+                + " = ? and "
+                + TABLE_NAME
+                + " = ?;"),
+        )
+        .bind(&catalog_name)
+        .bind(&namespace)
+        .bind(&name)
+        .fetch_all(&self.connection)
+        .await
+        .map_err(from_sqlx_error)?;
+        let mut iter = rows.iter().map(query_map);
+
+        Ok(iter.next().is_some())
+    }
+
+    async fn drop_table(&self, _identifier: &TableIdent) -> Result<()> {
+        todo!()
+    }
+
+    async fn load_table(&self, identifier: &TableIdent) -> Result<Table> {
+        let metadata_location = {
+            let catalog_name = self.name.clone();
+            let namespace = identifier.namespace().encode_in_url();
+            let name = identifier.name().to_string();
+            let row = sqlx::query(
+                &("select ".to_string()
+                    + TABLE_NAMESPACE
+                    + ", "
+                    + TABLE_NAME
+                    + ", "
+                    + METADATA_LOCATION_PROP
+                    + ", "
+                    + PREVIOUS_METADATA_LOCATION_PROP
+                    + " from "
+                    + CATALOG_TABLE_VIEW_NAME
+                    + " where "
+                    + CATALOG_NAME
+                    + " = ? and "
+                    + TABLE_NAMESPACE
+                    + " = ? and "
+                    + TABLE_NAME
+                    + " = ?;"),
+            )
+            .bind(&catalog_name)
+            .bind(&namespace)
+            .bind(&name)
+            .fetch_one(&self.connection)
+            .await
+            .map_err(from_sqlx_error)?;
+            let row = query_map(&row).map_err(from_sqlx_error)?;
+
+            row.metadata_location
+        };
+        let file = self.storage.new_input(&metadata_location)?;
+
+        let mut json = String::new();
+        file.reader().await?.read_to_string(&mut json).await?;
+
+        let metadata: TableMetadata = serde_json::from_str(&json)?;
+
+        self.cache
+            .insert(identifier.clone(), (metadata_location, metadata.clone()));
+
+        let table = Table::builder()
+            .file_io(self.storage.clone())
+            .identifier(identifier.clone())
+            .metadata(metadata)
+            .build();
+
+        Ok(table)
+    }
+
+    async fn create_table(
+        &self,
+        namespace: &NamespaceIdent,
+        creation: TableCreation,
+    ) -> Result<Table> {
+        let location = creation.location.as_ref().ok_or(Error::new(
+            ErrorKind::DataInvalid,
+            "Table creation with the Sql catalog requires a location.",
+        ))?;
+        let name = creation.name.clone();
+
+        let uuid = Uuid::new_v4();
+        let metadata_location =
+            location.clone() + "/metadata/" + "0-" + &uuid.to_string() + 
".metadata.json";
+
+        let metadata = 
TableMetadataBuilder::from_table_creation(creation)?.build()?;
+
+        let file = self.storage.new_output(&metadata_location)?;
+        file.writer()
+            .await?
+            .write_all(&serde_json::to_vec(&metadata)?)
+            .await?;
+        {
+            let catalog_name = self.name.clone();
+            let namespace = namespace.encode_in_url();
+            let name = name.clone();
+            let metadata_location = metadata_location.to_string();
+
+            sqlx::query(
+                &("insert into ".to_string()
+                    + CATALOG_TABLE_VIEW_NAME
+                    + " ("
+                    + CATALOG_NAME
+                    + ", "
+                    + TABLE_NAMESPACE
+                    + ", "
+                    + TABLE_NAME
+                    + ", "
+                    + METADATA_LOCATION_PROP
+                    + ") values (?, ?, ?, ?);"),
+            )
+            .bind(&catalog_name)
+            .bind(&namespace)
+            .bind(&name)
+            .bind(&metadata_location)
+            .execute(&self.connection)
+            .await
+            .map_err(from_sqlx_error)?;
+        }
+
+        Ok(Table::builder()
+            .file_io(self.storage.clone())
+            .metadata_location(metadata_location)
+            .identifier(TableIdent::new(namespace.clone(), name))
+            .metadata(metadata)
+            .build())
+    }
+
+    async fn rename_table(&self, _src: &TableIdent, _dest: &TableIdent) -> 
Result<()> {
+        todo!()
+    }
+
+    async fn update_table(&self, _commit: TableCommit) -> Result<Table> {
+        todo!()
+    }
+}
+
+#[cfg(test)]
+pub mod tests {
+    use iceberg::{
+        spec::{NestedField, PrimitiveType, Schema, Type},
+        Catalog, NamespaceIdent, TableCreation, TableIdent,
+    };
+    use tempfile::TempDir;
+
+    use crate::{SqlCatalog, SqlCatalogConfig};
+    use sqlx::migrate::MigrateDatabase;
+
+    #[tokio::test]
+    async fn test_create_update_drop_table() {
+        let dir = TempDir::with_prefix("sql-test").unwrap();
+        let warehouse_root = dir.path().to_str().unwrap();
+
+        //name of the database should be part of the url. usually for sqllite 
it creates or opens one if (.db found)
+        let sql_lite_url = "sqlite://iceberg";

Review Comment:
   Is it possible to use in memory file or tmp file to make the tests 
reproducible?



##########
crates/catalog/sql/src/catalog.rs:
##########
@@ -0,0 +1,517 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use dashmap::DashMap;
+use futures::{AsyncReadExt, AsyncWriteExt};
+use sqlx::any::AnyPoolOptions;
+use sqlx::{
+    any::{install_default_drivers, AnyRow},
+    AnyPool, Row,
+};
+use std::collections::HashMap;
+
+use iceberg::{
+    io::FileIO,
+    spec::{TableMetadata, TableMetadataBuilder},
+    table::Table,
+    Catalog, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit, 
TableCreation,
+    TableIdent,
+};
+use std::time::Duration;
+use typed_builder::TypedBuilder;
+use uuid::Uuid;
+
+use crate::error::from_sqlx_error;
+
+static CATALOG_TABLE_VIEW_NAME: &str = "iceberg_tables";
+static CATALOG_NAME: &str = "catalog_name";
+static TABLE_NAME: &str = "table_name";
+static TABLE_NAMESPACE: &str = "table_namespace";
+static METADATA_LOCATION_PROP: &str = "metadata_location";
+static PREVIOUS_METADATA_LOCATION_PROP: &str = "previous_metadata_location";
+static RECORD_TYPE: &str = "iceberg_type";
+
+/// Sql catalog config
+#[derive(Debug, TypedBuilder)]
+pub struct SqlCatalogConfig {
+    url: String,
+    name: String,
+    warehouse: String,
+    #[builder(default)]
+    props: HashMap<String, String>,
+}
+
+#[derive(Debug)]
+/// Sql catalog implementation.
+pub struct SqlCatalog {
+    name: String,
+    connection: AnyPool,
+    storage: FileIO,
+    cache: Arc<DashMap<TableIdent, (String, TableMetadata)>>,
+}
+
+impl SqlCatalog {
+    /// Create new sql catalog instance
+    pub async fn new(config: SqlCatalogConfig) -> Result<Self> {
+        install_default_drivers();
+        let max_connections: u32 = config
+            .props
+            .get("pool.max-connections")
+            .map(|v| v.parse().unwrap())
+            .unwrap_or(10);
+        let idle_timeout: u64 = config
+            .props
+            .get("pool.idle-timeout")
+            .map(|v| v.parse().unwrap())
+            .unwrap_or(10);
+        let test_before_acquire: bool = config
+            .props
+            .get("pool.test-before-acquire")
+            .map(|v| v.parse().unwrap())
+            .unwrap_or(true);
+
+        let pool = AnyPoolOptions::new()
+            .max_connections(max_connections)
+            .idle_timeout(Duration::from_secs(idle_timeout))
+            .test_before_acquire(test_before_acquire)
+            .connect(&config.url)
+            .await
+            .map_err(from_sqlx_error)?;
+
+        sqlx::query(
+            &("create table if not exists ".to_string()
+                + CATALOG_TABLE_VIEW_NAME
+                + " ("
+                + CATALOG_NAME
+                + " varchar(255) not null,"
+                + TABLE_NAMESPACE
+                + " varchar(255) not null,"
+                + TABLE_NAME
+                + " varchar(255) not null,"
+                + METADATA_LOCATION_PROP
+                + " varchar(255),"
+                + PREVIOUS_METADATA_LOCATION_PROP
+                + " varchar(255),"
+                + RECORD_TYPE
+                + " varchar(5), primary key ("
+                + CATALOG_NAME
+                + ", "
+                + TABLE_NAMESPACE
+                + ", "
+                + TABLE_NAME
+                + ")
+                        );"),
+        )
+        .execute(&pool)
+        .await
+        .map_err(from_sqlx_error)?;
+
+        sqlx::query(
+            "create table if not exists iceberg_namespace_properties (
+                            catalog_name varchar(255) not null,
+                            namespace varchar(255) not null,
+                            property_key varchar(255),
+                            property_value varchar(255),
+                            primary key (catalog_name, namespace, property_key)
+                        );",
+        )
+        .execute(&pool)
+        .await
+        .map_err(from_sqlx_error)?;
+
+        let file_io = FileIO::from_path(&config.warehouse)?
+            .with_props(&config.props)
+            .build()?;
+
+        Ok(SqlCatalog {
+            name: config.name.to_owned(),
+            connection: pool,
+            storage: file_io,
+            cache: Arc::new(DashMap::new()),
+        })
+    }
+}
+
+#[derive(Debug)]
+struct TableRef {
+    table_namespace: String,
+    table_name: String,
+    metadata_location: String,
+    _previous_metadata_location: Option<String>,
+}
+
+fn query_map(row: &AnyRow) -> std::result::Result<TableRef, sqlx::Error> {
+    Ok(TableRef {
+        table_namespace: row.try_get(0)?,
+        table_name: row.try_get(1)?,
+        metadata_location: row.try_get(2)?,
+        _previous_metadata_location: row.try_get::<String, 
_>(3).map(Some).or_else(|err| {
+            if let sqlx::Error::ColumnDecode {
+                index: _,
+                source: _,
+            } = err
+            {
+                Ok(None)
+            } else {
+                Err(err)
+            }
+        })?,
+    })
+}
+
+#[async_trait]
+impl Catalog for SqlCatalog {
+    async fn list_namespaces(
+        &self,
+        _parent: Option<&NamespaceIdent>,
+    ) -> Result<Vec<NamespaceIdent>> {
+        let name = self.name.clone();

Review Comment:
   Do we need to clone here?



##########
crates/catalog/sql/src/catalog.rs:
##########
@@ -0,0 +1,517 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use dashmap::DashMap;
+use futures::{AsyncReadExt, AsyncWriteExt};
+use sqlx::any::AnyPoolOptions;
+use sqlx::{
+    any::{install_default_drivers, AnyRow},
+    AnyPool, Row,
+};
+use std::collections::HashMap;
+
+use iceberg::{
+    io::FileIO,
+    spec::{TableMetadata, TableMetadataBuilder},
+    table::Table,
+    Catalog, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit, 
TableCreation,
+    TableIdent,
+};
+use std::time::Duration;
+use typed_builder::TypedBuilder;
+use uuid::Uuid;
+
+use crate::error::from_sqlx_error;
+
+static CATALOG_TABLE_VIEW_NAME: &str = "iceberg_tables";
+static CATALOG_NAME: &str = "catalog_name";
+static TABLE_NAME: &str = "table_name";
+static TABLE_NAMESPACE: &str = "table_namespace";
+static METADATA_LOCATION_PROP: &str = "metadata_location";
+static PREVIOUS_METADATA_LOCATION_PROP: &str = "previous_metadata_location";
+static RECORD_TYPE: &str = "iceberg_type";
+
+/// Sql catalog config
+#[derive(Debug, TypedBuilder)]
+pub struct SqlCatalogConfig {
+    url: String,
+    name: String,
+    warehouse: String,
+    #[builder(default)]
+    props: HashMap<String, String>,
+}
+
+#[derive(Debug)]
+/// Sql catalog implementation.
+pub struct SqlCatalog {
+    name: String,
+    connection: AnyPool,
+    storage: FileIO,
+    cache: Arc<DashMap<TableIdent, (String, TableMetadata)>>,
+}
+
+impl SqlCatalog {
+    /// Create new sql catalog instance
+    pub async fn new(config: SqlCatalogConfig) -> Result<Self> {
+        install_default_drivers();
+        let max_connections: u32 = config
+            .props
+            .get("pool.max-connections")
+            .map(|v| v.parse().unwrap())
+            .unwrap_or(10);
+        let idle_timeout: u64 = config
+            .props
+            .get("pool.idle-timeout")
+            .map(|v| v.parse().unwrap())
+            .unwrap_or(10);
+        let test_before_acquire: bool = config
+            .props
+            .get("pool.test-before-acquire")
+            .map(|v| v.parse().unwrap())
+            .unwrap_or(true);
+
+        let pool = AnyPoolOptions::new()
+            .max_connections(max_connections)
+            .idle_timeout(Duration::from_secs(idle_timeout))
+            .test_before_acquire(test_before_acquire)
+            .connect(&config.url)
+            .await
+            .map_err(from_sqlx_error)?;
+
+        sqlx::query(
+            &("create table if not exists ".to_string()
+                + CATALOG_TABLE_VIEW_NAME
+                + " ("
+                + CATALOG_NAME
+                + " varchar(255) not null,"
+                + TABLE_NAMESPACE
+                + " varchar(255) not null,"
+                + TABLE_NAME
+                + " varchar(255) not null,"
+                + METADATA_LOCATION_PROP
+                + " varchar(255),"
+                + PREVIOUS_METADATA_LOCATION_PROP
+                + " varchar(255),"
+                + RECORD_TYPE
+                + " varchar(5), primary key ("
+                + CATALOG_NAME
+                + ", "
+                + TABLE_NAMESPACE
+                + ", "
+                + TABLE_NAME
+                + ")
+                        );"),
+        )
+        .execute(&pool)
+        .await
+        .map_err(from_sqlx_error)?;
+
+        sqlx::query(
+            "create table if not exists iceberg_namespace_properties (
+                            catalog_name varchar(255) not null,
+                            namespace varchar(255) not null,
+                            property_key varchar(255),
+                            property_value varchar(255),
+                            primary key (catalog_name, namespace, property_key)
+                        );",
+        )
+        .execute(&pool)
+        .await
+        .map_err(from_sqlx_error)?;
+
+        let file_io = FileIO::from_path(&config.warehouse)?
+            .with_props(&config.props)
+            .build()?;
+
+        Ok(SqlCatalog {
+            name: config.name.to_owned(),
+            connection: pool,
+            storage: file_io,
+            cache: Arc::new(DashMap::new()),
+        })
+    }
+}
+
+#[derive(Debug)]
+struct TableRef {
+    table_namespace: String,
+    table_name: String,
+    metadata_location: String,
+    _previous_metadata_location: Option<String>,
+}
+
+fn query_map(row: &AnyRow) -> std::result::Result<TableRef, sqlx::Error> {
+    Ok(TableRef {
+        table_namespace: row.try_get(0)?,
+        table_name: row.try_get(1)?,
+        metadata_location: row.try_get(2)?,
+        _previous_metadata_location: row.try_get::<String, 
_>(3).map(Some).or_else(|err| {
+            if let sqlx::Error::ColumnDecode {
+                index: _,
+                source: _,
+            } = err
+            {
+                Ok(None)
+            } else {
+                Err(err)
+            }
+        })?,
+    })
+}
+
+#[async_trait]
+impl Catalog for SqlCatalog {
+    async fn list_namespaces(
+        &self,
+        _parent: Option<&NamespaceIdent>,
+    ) -> Result<Vec<NamespaceIdent>> {
+        let name = self.name.clone();
+        let rows = sqlx::query(
+            "select distinct table_namespace from iceberg_tables where 
catalog_name = ?;",
+        )
+        .bind(&name)
+        .fetch_all(&self.connection)
+        .await
+        .map_err(from_sqlx_error)?;
+        let iter = rows.iter().map(|row| row.try_get::<String, _>(0));
+
+        Ok(iter
+            .map(|x| {
+                x.and_then(|y| {
+                    NamespaceIdent::from_vec(
+                        
y.split('.').map(ToString::to_string).collect::<Vec<_>>(),

Review Comment:
   This appeared several times, why not extracting them in a standalone method?



##########
crates/catalog/sql/src/catalog.rs:
##########
@@ -0,0 +1,517 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use dashmap::DashMap;
+use futures::{AsyncReadExt, AsyncWriteExt};
+use sqlx::any::AnyPoolOptions;
+use sqlx::{
+    any::{install_default_drivers, AnyRow},
+    AnyPool, Row,
+};
+use std::collections::HashMap;
+
+use iceberg::{
+    io::FileIO,
+    spec::{TableMetadata, TableMetadataBuilder},
+    table::Table,
+    Catalog, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit, 
TableCreation,
+    TableIdent,
+};
+use std::time::Duration;
+use typed_builder::TypedBuilder;
+use uuid::Uuid;
+
+use crate::error::from_sqlx_error;
+
+static CATALOG_TABLE_VIEW_NAME: &str = "iceberg_tables";
+static CATALOG_NAME: &str = "catalog_name";
+static TABLE_NAME: &str = "table_name";
+static TABLE_NAMESPACE: &str = "table_namespace";
+static METADATA_LOCATION_PROP: &str = "metadata_location";
+static PREVIOUS_METADATA_LOCATION_PROP: &str = "previous_metadata_location";
+static RECORD_TYPE: &str = "iceberg_type";
+
+/// Sql catalog config
+#[derive(Debug, TypedBuilder)]
+pub struct SqlCatalogConfig {
+    url: String,
+    name: String,
+    warehouse: String,
+    #[builder(default)]
+    props: HashMap<String, String>,
+}
+
+#[derive(Debug)]
+/// Sql catalog implementation.
+pub struct SqlCatalog {
+    name: String,
+    connection: AnyPool,
+    storage: FileIO,
+    cache: Arc<DashMap<TableIdent, (String, TableMetadata)>>,
+}
+
+impl SqlCatalog {
+    /// Create new sql catalog instance
+    pub async fn new(config: SqlCatalogConfig) -> Result<Self> {
+        install_default_drivers();
+        let max_connections: u32 = config
+            .props
+            .get("pool.max-connections")
+            .map(|v| v.parse().unwrap())
+            .unwrap_or(10);
+        let idle_timeout: u64 = config
+            .props
+            .get("pool.idle-timeout")
+            .map(|v| v.parse().unwrap())
+            .unwrap_or(10);
+        let test_before_acquire: bool = config
+            .props
+            .get("pool.test-before-acquire")
+            .map(|v| v.parse().unwrap())
+            .unwrap_or(true);
+
+        let pool = AnyPoolOptions::new()
+            .max_connections(max_connections)
+            .idle_timeout(Duration::from_secs(idle_timeout))
+            .test_before_acquire(test_before_acquire)
+            .connect(&config.url)
+            .await
+            .map_err(from_sqlx_error)?;
+
+        sqlx::query(
+            &("create table if not exists ".to_string()
+                + CATALOG_TABLE_VIEW_NAME
+                + " ("
+                + CATALOG_NAME
+                + " varchar(255) not null,"
+                + TABLE_NAMESPACE
+                + " varchar(255) not null,"
+                + TABLE_NAME
+                + " varchar(255) not null,"
+                + METADATA_LOCATION_PROP
+                + " varchar(255),"
+                + PREVIOUS_METADATA_LOCATION_PROP
+                + " varchar(255),"
+                + RECORD_TYPE
+                + " varchar(5), primary key ("
+                + CATALOG_NAME
+                + ", "
+                + TABLE_NAMESPACE
+                + ", "
+                + TABLE_NAME
+                + ")
+                        );"),
+        )
+        .execute(&pool)
+        .await
+        .map_err(from_sqlx_error)?;
+
+        sqlx::query(
+            "create table if not exists iceberg_namespace_properties (
+                            catalog_name varchar(255) not null,
+                            namespace varchar(255) not null,
+                            property_key varchar(255),
+                            property_value varchar(255),
+                            primary key (catalog_name, namespace, property_key)
+                        );",
+        )
+        .execute(&pool)
+        .await
+        .map_err(from_sqlx_error)?;
+
+        let file_io = FileIO::from_path(&config.warehouse)?
+            .with_props(&config.props)
+            .build()?;
+
+        Ok(SqlCatalog {
+            name: config.name.to_owned(),
+            connection: pool,
+            storage: file_io,
+            cache: Arc::new(DashMap::new()),
+        })
+    }
+}
+
+#[derive(Debug)]
+struct TableRef {
+    table_namespace: String,
+    table_name: String,
+    metadata_location: String,
+    _previous_metadata_location: Option<String>,
+}
+
+fn query_map(row: &AnyRow) -> std::result::Result<TableRef, sqlx::Error> {
+    Ok(TableRef {
+        table_namespace: row.try_get(0)?,
+        table_name: row.try_get(1)?,
+        metadata_location: row.try_get(2)?,
+        _previous_metadata_location: row.try_get::<String, 
_>(3).map(Some).or_else(|err| {
+            if let sqlx::Error::ColumnDecode {
+                index: _,
+                source: _,
+            } = err
+            {
+                Ok(None)
+            } else {
+                Err(err)
+            }
+        })?,
+    })
+}
+
+#[async_trait]
+impl Catalog for SqlCatalog {
+    async fn list_namespaces(
+        &self,
+        _parent: Option<&NamespaceIdent>,
+    ) -> Result<Vec<NamespaceIdent>> {
+        let name = self.name.clone();
+        let rows = sqlx::query(
+            "select distinct table_namespace from iceberg_tables where 
catalog_name = ?;",
+        )
+        .bind(&name)
+        .fetch_all(&self.connection)
+        .await
+        .map_err(from_sqlx_error)?;
+        let iter = rows.iter().map(|row| row.try_get::<String, _>(0));
+
+        Ok(iter
+            .map(|x| {
+                x.and_then(|y| {
+                    NamespaceIdent::from_vec(
+                        
y.split('.').map(ToString::to_string).collect::<Vec<_>>(),
+                    )
+                    .map_err(|err| sqlx::Error::Decode(Box::new(err)))
+                })
+            })
+            .collect::<std::result::Result<_, sqlx::Error>>()
+            .map_err(from_sqlx_error)?)
+    }
+
+    async fn create_namespace(
+        &self,
+        _namespace: &NamespaceIdent,
+        _properties: HashMap<String, String>,
+    ) -> Result<Namespace> {
+        todo!()
+    }
+
+    async fn get_namespace(&self, _namespace: &NamespaceIdent) -> 
Result<Namespace> {
+        todo!()
+    }
+
+    async fn namespace_exists(&self, _namespace: &NamespaceIdent) -> 
Result<bool> {
+        todo!()
+    }
+
+    async fn update_namespace(
+        &self,
+        _namespace: &NamespaceIdent,
+        _properties: HashMap<String, String>,
+    ) -> Result<()> {
+        todo!()
+    }
+
+    async fn drop_namespace(&self, _namespace: &NamespaceIdent) -> Result<()> {
+        todo!()
+    }
+
+    async fn list_tables(&self, namespace: &NamespaceIdent) -> 
Result<Vec<TableIdent>> {
+        let name = self.name.clone();
+        let namespace = namespace.join(".");
+        let rows = sqlx::query(
+            &("select ".to_string()
+                + TABLE_NAMESPACE
+                + ", "
+                + TABLE_NAME
+                + ", "
+                + METADATA_LOCATION_PROP
+                + ", "
+                + PREVIOUS_METADATA_LOCATION_PROP
+                + " from "
+                + CATALOG_TABLE_VIEW_NAME
+                + " where "
+                + CATALOG_NAME
+                + " = ? and "
+                + TABLE_NAMESPACE
+                + "= ?;"),
+        )
+        .bind(&name)
+        .bind(&namespace)
+        .fetch_all(&self.connection)
+        .await
+        .map_err(from_sqlx_error)?;
+        let iter = rows.iter().map(query_map);
+
+        Ok(iter
+            .map(|x| {
+                x.and_then(|y| {
+                    let namespace = NamespaceIdent::from_vec(
+                        y.table_namespace
+                            .split('.')
+                            .map(ToString::to_string)
+                            .collect::<Vec<_>>(),
+                    )
+                    .map_err(|err| sqlx::Error::Decode(Box::new(err)))?;
+                    Ok(TableIdent::new(namespace, y.table_name))
+                })
+            })
+            .collect::<std::result::Result<_, sqlx::Error>>()
+            .map_err(from_sqlx_error)?)
+    }
+
+    async fn table_exists(&self, identifier: &TableIdent) -> Result<bool> {
+        let catalog_name = self.name.clone();
+        let namespace = identifier.namespace().encode_in_url();
+        let name = identifier.name().to_string();
+        let rows = sqlx::query(
+            &("select ".to_string()
+                + TABLE_NAMESPACE
+                + ", "
+                + TABLE_NAME
+                + ", "
+                + METADATA_LOCATION_PROP
+                + ", "
+                + PREVIOUS_METADATA_LOCATION_PROP
+                + " from "
+                + CATALOG_TABLE_VIEW_NAME
+                + " where "
+                + CATALOG_NAME
+                + " = ? and "
+                + TABLE_NAMESPACE
+                + " = ? and "
+                + TABLE_NAME
+                + " = ?;"),
+        )
+        .bind(&catalog_name)
+        .bind(&namespace)
+        .bind(&name)
+        .fetch_all(&self.connection)
+        .await
+        .map_err(from_sqlx_error)?;
+        let mut iter = rows.iter().map(query_map);
+
+        Ok(iter.next().is_some())
+    }
+
+    async fn drop_table(&self, _identifier: &TableIdent) -> Result<()> {
+        todo!()
+    }
+
+    async fn load_table(&self, identifier: &TableIdent) -> Result<Table> {
+        let metadata_location = {
+            let catalog_name = self.name.clone();
+            let namespace = identifier.namespace().encode_in_url();
+            let name = identifier.name().to_string();
+            let row = sqlx::query(
+                &("select ".to_string()
+                    + TABLE_NAMESPACE
+                    + ", "
+                    + TABLE_NAME
+                    + ", "
+                    + METADATA_LOCATION_PROP
+                    + ", "
+                    + PREVIOUS_METADATA_LOCATION_PROP
+                    + " from "
+                    + CATALOG_TABLE_VIEW_NAME
+                    + " where "
+                    + CATALOG_NAME
+                    + " = ? and "
+                    + TABLE_NAMESPACE
+                    + " = ? and "
+                    + TABLE_NAME
+                    + " = ?;"),
+            )
+            .bind(&catalog_name)
+            .bind(&namespace)
+            .bind(&name)
+            .fetch_one(&self.connection)
+            .await
+            .map_err(from_sqlx_error)?;
+            let row = query_map(&row).map_err(from_sqlx_error)?;
+
+            row.metadata_location
+        };
+        let file = self.storage.new_input(&metadata_location)?;
+
+        let mut json = String::new();
+        file.reader().await?.read_to_string(&mut json).await?;
+
+        let metadata: TableMetadata = serde_json::from_str(&json)?;
+
+        self.cache
+            .insert(identifier.clone(), (metadata_location, metadata.clone()));
+
+        let table = Table::builder()
+            .file_io(self.storage.clone())
+            .identifier(identifier.clone())
+            .metadata(metadata)
+            .build();
+
+        Ok(table)
+    }
+
+    async fn create_table(

Review Comment:
   There are some things missing here:
   1. We should first check namespace exists
   2. The location is optional, it should use warehouse's subdir as location
   
   I would suggest to refer to [python's 
implementation](https://github.com/apache/iceberg-python/blob/a892309936effa7ec575195ad3be70193e82d704/pyiceberg/catalog/sql.py#L146),
 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to