liurenjie1024 commented on code in PR #534:
URL: https://github.com/apache/iceberg-rust/pull/534#discussion_r1725114189


##########
crates/catalog/sql/src/catalog.rs:
##########
@@ -167,43 +177,312 @@ impl SqlCatalog {
             .await
             .map_err(from_sqlx_error)
     }
+
+    /// Execute statements in a transaction, provided or not
+    async fn execute(
+        &self,
+        query: &str,
+        args: Vec<Option<&str>>,
+        transaction: Option<&mut Transaction<'_, Any>>,
+    ) -> Result<AnyQueryResult> {
+        let query_with_placeholders = self.replace_placeholders(query);
+
+        let mut sqlx_query = sqlx::query(&query_with_placeholders);
+        for arg in args {
+            sqlx_query = sqlx_query.bind(arg);
+        }
+
+        match transaction {
+            Some(t) => sqlx_query.execute(&mut 
**t).await.map_err(from_sqlx_error),
+            None => {
+                let mut tx = 
self.connection.begin().await.map_err(from_sqlx_error)?;
+                let result = sqlx_query.execute(&mut 
*tx).await.map_err(from_sqlx_error);
+                let _ = tx.commit().await.map_err(from_sqlx_error);
+                result
+            }
+        }
+    }
 }
 
 #[async_trait]
 impl Catalog for SqlCatalog {
     async fn list_namespaces(
         &self,
-        _parent: Option<&NamespaceIdent>,
+        parent: Option<&NamespaceIdent>,
     ) -> Result<Vec<NamespaceIdent>> {
-        todo!()
+        // UNION will remove duplicates.
+        let all_namespaces_stmt = format!(
+            "SELECT {CATALOG_FIELD_TABLE_NAMESPACE}
+             FROM {CATALOG_TABLE_NAME}
+             WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
+             UNION
+             SELECT {NAMESPACE_FIELD_NAME}
+             FROM {NAMESPACE_TABLE_NAME}
+             WHERE {CATALOG_FIELD_CATALOG_NAME} = ?"
+        );
+
+        let namespace_rows = self
+            .fetch_rows(&all_namespaces_stmt, vec![
+                Some(&self.name),
+                Some(&self.name),
+            ])
+            .await?;
+
+        let mut namespaces = 
HashSet::<NamespaceIdent>::with_capacity(namespace_rows.len());
+
+        if let Some(parent) = parent {
+            if self.namespace_exists(parent).await? {
+                let parent_str = parent.join(".");
+
+                for row in namespace_rows.iter() {
+                    let nsp = row.try_get::<String, 
_>(0).map_err(from_sqlx_error)?;
+                    // if parent = a, then we only want to see a.b, a.c 
returned.
+                    if nsp != parent_str && nsp.starts_with(&parent_str) {
+                        
namespaces.insert(NamespaceIdent::from_strs(nsp.split("."))?);
+                    }
+                }
+
+                Ok(namespaces.into_iter().collect::<Vec<NamespaceIdent>>())
+            } else {
+                no_such_namespace_err(parent)
+            }
+        } else {
+            for row in namespace_rows.iter() {
+                let nsp = row.try_get::<String, 
_>(0).map_err(from_sqlx_error)?;
+                let mut levels = nsp.split(".").collect::<Vec<&str>>();
+                if !levels.is_empty() {
+                    let first_level = levels.drain(..1).collect::<Vec<&str>>();
+                    namespaces.insert(NamespaceIdent::from_strs(first_level)?);
+                }
+            }
+
+            Ok(namespaces.into_iter().collect::<Vec<NamespaceIdent>>())
+        }
     }
 
     async fn create_namespace(
         &self,
-        _namespace: &NamespaceIdent,
-        _properties: HashMap<String, String>,
+        namespace: &NamespaceIdent,
+        properties: HashMap<String, String>,
     ) -> Result<Namespace> {
-        todo!()
+        let exists = self.namespace_exists(namespace).await?;
+
+        if exists {
+            return Err(Error::new(
+                iceberg::ErrorKind::Unexpected,
+                format!("Namespace {:?} already exists", namespace),
+            ));
+        }
+
+        for i in 1..namespace.len() {

Review Comment:
   Do we really need this check? I don't see java implementation does this.



##########
crates/catalog/sql/src/catalog.rs:
##########
@@ -167,43 +177,312 @@ impl SqlCatalog {
             .await
             .map_err(from_sqlx_error)
     }
+
+    /// Execute statements in a transaction, provided or not
+    async fn execute(
+        &self,
+        query: &str,
+        args: Vec<Option<&str>>,
+        transaction: Option<&mut Transaction<'_, Any>>,
+    ) -> Result<AnyQueryResult> {
+        let query_with_placeholders = self.replace_placeholders(query);
+
+        let mut sqlx_query = sqlx::query(&query_with_placeholders);
+        for arg in args {
+            sqlx_query = sqlx_query.bind(arg);
+        }
+
+        match transaction {
+            Some(t) => sqlx_query.execute(&mut 
**t).await.map_err(from_sqlx_error),
+            None => {
+                let mut tx = 
self.connection.begin().await.map_err(from_sqlx_error)?;
+                let result = sqlx_query.execute(&mut 
*tx).await.map_err(from_sqlx_error);
+                let _ = tx.commit().await.map_err(from_sqlx_error);
+                result
+            }
+        }
+    }
 }
 
 #[async_trait]
 impl Catalog for SqlCatalog {
     async fn list_namespaces(
         &self,
-        _parent: Option<&NamespaceIdent>,
+        parent: Option<&NamespaceIdent>,
     ) -> Result<Vec<NamespaceIdent>> {
-        todo!()
+        // UNION will remove duplicates.
+        let all_namespaces_stmt = format!(
+            "SELECT {CATALOG_FIELD_TABLE_NAMESPACE}
+             FROM {CATALOG_TABLE_NAME}
+             WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
+             UNION
+             SELECT {NAMESPACE_FIELD_NAME}
+             FROM {NAMESPACE_TABLE_NAME}
+             WHERE {CATALOG_FIELD_CATALOG_NAME} = ?"
+        );
+
+        let namespace_rows = self
+            .fetch_rows(&all_namespaces_stmt, vec![
+                Some(&self.name),
+                Some(&self.name),
+            ])
+            .await?;
+
+        let mut namespaces = 
HashSet::<NamespaceIdent>::with_capacity(namespace_rows.len());
+
+        if let Some(parent) = parent {
+            if self.namespace_exists(parent).await? {
+                let parent_str = parent.join(".");
+
+                for row in namespace_rows.iter() {
+                    let nsp = row.try_get::<String, 
_>(0).map_err(from_sqlx_error)?;
+                    // if parent = a, then we only want to see a.b, a.c 
returned.
+                    if nsp != parent_str && nsp.starts_with(&parent_str) {
+                        
namespaces.insert(NamespaceIdent::from_strs(nsp.split("."))?);
+                    }
+                }
+
+                Ok(namespaces.into_iter().collect::<Vec<NamespaceIdent>>())
+            } else {
+                no_such_namespace_err(parent)
+            }
+        } else {
+            for row in namespace_rows.iter() {
+                let nsp = row.try_get::<String, 
_>(0).map_err(from_sqlx_error)?;
+                let mut levels = nsp.split(".").collect::<Vec<&str>>();
+                if !levels.is_empty() {
+                    let first_level = levels.drain(..1).collect::<Vec<&str>>();
+                    namespaces.insert(NamespaceIdent::from_strs(first_level)?);
+                }
+            }
+
+            Ok(namespaces.into_iter().collect::<Vec<NamespaceIdent>>())
+        }
     }
 
     async fn create_namespace(
         &self,
-        _namespace: &NamespaceIdent,
-        _properties: HashMap<String, String>,
+        namespace: &NamespaceIdent,
+        properties: HashMap<String, String>,
     ) -> Result<Namespace> {
-        todo!()
+        let exists = self.namespace_exists(namespace).await?;
+
+        if exists {
+            return Err(Error::new(
+                iceberg::ErrorKind::Unexpected,
+                format!("Namespace {:?} already exists", namespace),
+            ));
+        }
+
+        for i in 1..namespace.len() {
+            let parent_namespace = 
NamespaceIdent::from_vec(namespace[..i].to_vec())?;
+            let parent_exists = 
self.namespace_exists(&parent_namespace).await?;
+            if !parent_exists {
+                return no_such_namespace_err(&parent_namespace);
+            }
+        }
+
+        let namespace_str = namespace.join(".");
+        let insert = format!(
+            "INSERT INTO {NAMESPACE_TABLE_NAME} ({CATALOG_FIELD_CATALOG_NAME}, 
{NAMESPACE_FIELD_NAME}, {NAMESPACE_FIELD_PROPERTY_KEY}, 
{NAMESPACE_FIELD_PROPERTY_VALUE})
+             VALUES (?, ?, ?, ?)");
+        if !properties.is_empty() {
+            let mut query_args = Vec::with_capacity(properties.len() * 4);
+            let mut properties_insert = insert.clone();
+            for (index, (key, value)) in properties.iter().enumerate() {
+                query_args.extend_from_slice(&[
+                    Some(self.name.as_str()),
+                    Some(namespace_str.as_str()),
+                    Some(key.as_str()),
+                    Some(value.as_str()),
+                ]);
+                if index > 0 {
+                    properties_insert = format!("{properties_insert}, (?, ?, 
?, ?)");
+                }
+            }
+
+            self.execute(&properties_insert, query_args, None).await?;
+
+            Ok(Namespace::with_properties(namespace.clone(), properties))
+        } else {
+            // set a default property of exists = true
+            // up for debate if this is worthwhile
+            self.execute(
+                &insert,
+                vec![
+                    Some(&self.name),
+                    Some(&namespace_str),
+                    Some("exists"),
+                    Some("true"),
+                ],
+                None,
+            )
+            .await?;
+            Ok(Namespace::with_properties(namespace.clone(), properties))
+        }
     }
 
-    async fn get_namespace(&self, _namespace: &NamespaceIdent) -> 
Result<Namespace> {
-        todo!()
+    async fn get_namespace(&self, namespace: &NamespaceIdent) -> 
Result<Namespace> {
+        let exists = self.namespace_exists(namespace).await?;
+        if exists {
+            let namespace_props = self
+                .fetch_rows(
+                    &format!(
+                        "SELECT
+                            {NAMESPACE_FIELD_NAME},
+                            {NAMESPACE_FIELD_PROPERTY_KEY},
+                            {NAMESPACE_FIELD_PROPERTY_VALUE}
+                            FROM {NAMESPACE_TABLE_NAME}
+                            WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
+                            AND {NAMESPACE_FIELD_NAME} = ?"
+                    ),
+                    vec![Some(&self.name), Some(&namespace.join("."))],
+                )
+                .await?;
+
+            let mut properties = HashMap::with_capacity(namespace_props.len());
+
+            for row in namespace_props {
+                let key = row
+                    .try_get::<String, _>(NAMESPACE_FIELD_PROPERTY_KEY)
+                    .map_err(from_sqlx_error)?;
+                let value = row
+                    .try_get::<String, _>(NAMESPACE_FIELD_PROPERTY_VALUE)
+                    .map_err(from_sqlx_error)?;
+
+                properties.insert(key, value);
+            }
+
+            Ok(Namespace::with_properties(namespace.clone(), properties))
+        } else {
+            no_such_namespace_err(namespace)
+        }
     }
 
-    async fn namespace_exists(&self, _namespace: &NamespaceIdent) -> 
Result<bool> {
-        todo!()
+    async fn namespace_exists(&self, namespace: &NamespaceIdent) -> 
Result<bool> {
+        let namespace_str = namespace.join(".");
+
+        let table_namespaces = self
+            .fetch_rows(
+                &format!(
+                    "SELECT 1 FROM {CATALOG_TABLE_NAME}
+                     WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
+                      AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
+                     LIMIT 1"
+                ),
+                vec![Some(&self.name), Some(&namespace_str)],
+            )
+            .await?;
+
+        if !table_namespaces.is_empty() {
+            Ok(true)
+        } else {
+            let namespaces = self
+                .fetch_rows(
+                    &format!(
+                        "SELECT 1 FROM {NAMESPACE_TABLE_NAME}
+                         WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
+                          AND {NAMESPACE_FIELD_NAME} = ?
+                         LIMIT 1"
+                    ),
+                    vec![Some(&self.name), Some(&namespace_str)],
+                )
+                .await?;
+            if !namespaces.is_empty() {
+                Ok(true)
+            } else {
+                Ok(false)
+            }
+        }
     }
 
     async fn update_namespace(
         &self,
-        _namespace: &NamespaceIdent,
-        _properties: HashMap<String, String>,
+        namespace: &NamespaceIdent,
+        properties: HashMap<String, String>,
     ) -> Result<()> {
-        todo!()
+        let exists = self.namespace_exists(namespace).await?;
+        if exists {
+            let existing_properties = 
self.get_namespace(namespace).await?.properties().clone();
+            let namespace_str = namespace.join(".");
+
+            let mut updates = vec![];
+            let mut inserts = vec![];
+
+            for (key, value) in properties.iter() {
+                if existing_properties.contains_key(key) {
+                    if existing_properties.get(key) != Some(value) {
+                        updates.push((key, value));
+                    }
+                } else {
+                    inserts.push((key, value));
+                }
+            }
+
+            let mut tx = 
self.connection.begin().await.map_err(from_sqlx_error)?;
+            let update_stmt = format!(
+                "UPDATE {NAMESPACE_TABLE_NAME} SET 
{NAMESPACE_FIELD_PROPERTY_VALUE} = ?
+                 WHERE {CATALOG_FIELD_CATALOG_NAME} = ? 
+                 AND {NAMESPACE_FIELD_NAME} = ?
+                 AND {NAMESPACE_FIELD_PROPERTY_KEY} = ?"
+            );
+
+            let insert_stmt = format!(
+                "INSERT INTO {NAMESPACE_TABLE_NAME} 
({CATALOG_FIELD_CATALOG_NAME}, {NAMESPACE_FIELD_NAME}, 
{NAMESPACE_FIELD_PROPERTY_KEY}, {NAMESPACE_FIELD_PROPERTY_VALUE})
+                 VALUES (?, ?, ?, ?)"
+            );
+
+            for (key, value) in updates {
+                self.execute(
+                    &update_stmt,
+                    vec![
+                        Some(value),
+                        Some(&self.name),
+                        Some(&namespace_str),
+                        Some(key),
+                    ],
+                    Some(&mut tx),
+                )
+                .await?;
+            }
+
+            for (key, value) in inserts {
+                self.execute(
+                    &insert_stmt,
+                    vec![
+                        Some(&self.name),
+                        Some(&namespace_str),
+                        Some(key),
+                        Some(value),
+                    ],
+                    Some(&mut tx),
+                )
+                .await?;
+            }
+
+            let _ = tx.commit().await.map_err(from_sqlx_error)?;
+
+            Ok(())
+        } else {
+            no_such_namespace_err(namespace)
+        }
     }
 
-    async fn drop_namespace(&self, _namespace: &NamespaceIdent) -> Result<()> {
-        todo!()
+    async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> {
+        let exists = self.namespace_exists(namespace).await?;
+        if exists {
+            // TODO: check that the namespace is empty

Review Comment:
   If we decided to implement checking in following pr, I would sugges to move 
the implementation of this method in following pr. Or please create an issue to 
track this.



##########
crates/catalog/sql/src/catalog.rs:
##########
@@ -141,21 +142,30 @@ impl SqlCatalog {
     }
 
     /// SQLX Any does not implement PostgresSQL bindings, so we have to do 
this.
-    pub async fn execute_statement(
-        &self,
-        query: &String,
-        args: Vec<Option<&String>>,
-    ) -> Result<Vec<AnyRow>> {
-        let query_with_placeholders: Cow<str> =
-            if self.sql_bind_style == SqlBindStyle::DollarNumeric {
-                let mut query = query.clone();
-                for i in 0..args.len() {
-                    query = query.replacen("?", &format!("${}", i + 1), 1);
-                }
-                Cow::Owned(query)
-            } else {
-                Cow::Borrowed(query)
-            };
+    fn replace_placeholders(&self, query: &str) -> String {
+        match self.sql_bind_style {

Review Comment:
   Asking user to pass `sql_bind_style` would be error prone, could we infer 
this from url?



##########
crates/catalog/sql/src/catalog.rs:
##########
@@ -167,43 +177,312 @@ impl SqlCatalog {
             .await
             .map_err(from_sqlx_error)
     }
+
+    /// Execute statements in a transaction, provided or not
+    async fn execute(
+        &self,
+        query: &str,
+        args: Vec<Option<&str>>,
+        transaction: Option<&mut Transaction<'_, Any>>,
+    ) -> Result<AnyQueryResult> {
+        let query_with_placeholders = self.replace_placeholders(query);
+
+        let mut sqlx_query = sqlx::query(&query_with_placeholders);
+        for arg in args {
+            sqlx_query = sqlx_query.bind(arg);
+        }
+
+        match transaction {
+            Some(t) => sqlx_query.execute(&mut 
**t).await.map_err(from_sqlx_error),
+            None => {
+                let mut tx = 
self.connection.begin().await.map_err(from_sqlx_error)?;
+                let result = sqlx_query.execute(&mut 
*tx).await.map_err(from_sqlx_error);
+                let _ = tx.commit().await.map_err(from_sqlx_error);
+                result
+            }
+        }
+    }
 }
 
 #[async_trait]
 impl Catalog for SqlCatalog {
     async fn list_namespaces(
         &self,
-        _parent: Option<&NamespaceIdent>,
+        parent: Option<&NamespaceIdent>,
     ) -> Result<Vec<NamespaceIdent>> {
-        todo!()
+        // UNION will remove duplicates.
+        let all_namespaces_stmt = format!(
+            "SELECT {CATALOG_FIELD_TABLE_NAMESPACE}
+             FROM {CATALOG_TABLE_NAME}
+             WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
+             UNION
+             SELECT {NAMESPACE_FIELD_NAME}
+             FROM {NAMESPACE_TABLE_NAME}
+             WHERE {CATALOG_FIELD_CATALOG_NAME} = ?"
+        );
+
+        let namespace_rows = self
+            .fetch_rows(&all_namespaces_stmt, vec![
+                Some(&self.name),
+                Some(&self.name),
+            ])
+            .await?;
+
+        let mut namespaces = 
HashSet::<NamespaceIdent>::with_capacity(namespace_rows.len());
+
+        if let Some(parent) = parent {
+            if self.namespace_exists(parent).await? {
+                let parent_str = parent.join(".");
+
+                for row in namespace_rows.iter() {
+                    let nsp = row.try_get::<String, 
_>(0).map_err(from_sqlx_error)?;
+                    // if parent = a, then we only want to see a.b, a.c 
returned.
+                    if nsp != parent_str && nsp.starts_with(&parent_str) {
+                        
namespaces.insert(NamespaceIdent::from_strs(nsp.split("."))?);
+                    }
+                }
+
+                Ok(namespaces.into_iter().collect::<Vec<NamespaceIdent>>())
+            } else {
+                no_such_namespace_err(parent)
+            }
+        } else {
+            for row in namespace_rows.iter() {
+                let nsp = row.try_get::<String, 
_>(0).map_err(from_sqlx_error)?;
+                let mut levels = nsp.split(".").collect::<Vec<&str>>();
+                if !levels.is_empty() {
+                    let first_level = levels.drain(..1).collect::<Vec<&str>>();
+                    namespaces.insert(NamespaceIdent::from_strs(first_level)?);
+                }
+            }
+
+            Ok(namespaces.into_iter().collect::<Vec<NamespaceIdent>>())
+        }
     }
 
     async fn create_namespace(
         &self,
-        _namespace: &NamespaceIdent,
-        _properties: HashMap<String, String>,
+        namespace: &NamespaceIdent,
+        properties: HashMap<String, String>,
     ) -> Result<Namespace> {
-        todo!()
+        let exists = self.namespace_exists(namespace).await?;
+
+        if exists {
+            return Err(Error::new(
+                iceberg::ErrorKind::Unexpected,
+                format!("Namespace {:?} already exists", namespace),
+            ));
+        }
+
+        for i in 1..namespace.len() {
+            let parent_namespace = 
NamespaceIdent::from_vec(namespace[..i].to_vec())?;
+            let parent_exists = 
self.namespace_exists(&parent_namespace).await?;
+            if !parent_exists {
+                return no_such_namespace_err(&parent_namespace);
+            }
+        }
+
+        let namespace_str = namespace.join(".");
+        let insert = format!(
+            "INSERT INTO {NAMESPACE_TABLE_NAME} ({CATALOG_FIELD_CATALOG_NAME}, 
{NAMESPACE_FIELD_NAME}, {NAMESPACE_FIELD_PROPERTY_KEY}, 
{NAMESPACE_FIELD_PROPERTY_VALUE})

Review Comment:
   I think this is incorrect? We don't need table name field?



##########
crates/catalog/sql/src/catalog.rs:
##########
@@ -167,43 +177,312 @@ impl SqlCatalog {
             .await
             .map_err(from_sqlx_error)
     }
+
+    /// Execute statements in a transaction, provided or not
+    async fn execute(
+        &self,
+        query: &str,
+        args: Vec<Option<&str>>,
+        transaction: Option<&mut Transaction<'_, Any>>,
+    ) -> Result<AnyQueryResult> {
+        let query_with_placeholders = self.replace_placeholders(query);
+
+        let mut sqlx_query = sqlx::query(&query_with_placeholders);
+        for arg in args {
+            sqlx_query = sqlx_query.bind(arg);
+        }
+
+        match transaction {
+            Some(t) => sqlx_query.execute(&mut 
**t).await.map_err(from_sqlx_error),
+            None => {
+                let mut tx = 
self.connection.begin().await.map_err(from_sqlx_error)?;
+                let result = sqlx_query.execute(&mut 
*tx).await.map_err(from_sqlx_error);
+                let _ = tx.commit().await.map_err(from_sqlx_error);
+                result
+            }
+        }
+    }
 }
 
 #[async_trait]
 impl Catalog for SqlCatalog {
     async fn list_namespaces(
         &self,
-        _parent: Option<&NamespaceIdent>,
+        parent: Option<&NamespaceIdent>,
     ) -> Result<Vec<NamespaceIdent>> {
-        todo!()
+        // UNION will remove duplicates.
+        let all_namespaces_stmt = format!(
+            "SELECT {CATALOG_FIELD_TABLE_NAMESPACE}
+             FROM {CATALOG_TABLE_NAME}
+             WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
+             UNION
+             SELECT {NAMESPACE_FIELD_NAME}
+             FROM {NAMESPACE_TABLE_NAME}
+             WHERE {CATALOG_FIELD_CATALOG_NAME} = ?"
+        );
+
+        let namespace_rows = self
+            .fetch_rows(&all_namespaces_stmt, vec![
+                Some(&self.name),
+                Some(&self.name),
+            ])
+            .await?;
+
+        let mut namespaces = 
HashSet::<NamespaceIdent>::with_capacity(namespace_rows.len());
+
+        if let Some(parent) = parent {
+            if self.namespace_exists(parent).await? {
+                let parent_str = parent.join(".");
+
+                for row in namespace_rows.iter() {
+                    let nsp = row.try_get::<String, 
_>(0).map_err(from_sqlx_error)?;
+                    // if parent = a, then we only want to see a.b, a.c 
returned.
+                    if nsp != parent_str && nsp.starts_with(&parent_str) {
+                        
namespaces.insert(NamespaceIdent::from_strs(nsp.split("."))?);
+                    }
+                }
+
+                Ok(namespaces.into_iter().collect::<Vec<NamespaceIdent>>())
+            } else {
+                no_such_namespace_err(parent)
+            }
+        } else {
+            for row in namespace_rows.iter() {
+                let nsp = row.try_get::<String, 
_>(0).map_err(from_sqlx_error)?;
+                let mut levels = nsp.split(".").collect::<Vec<&str>>();
+                if !levels.is_empty() {
+                    let first_level = levels.drain(..1).collect::<Vec<&str>>();
+                    namespaces.insert(NamespaceIdent::from_strs(first_level)?);
+                }
+            }
+
+            Ok(namespaces.into_iter().collect::<Vec<NamespaceIdent>>())
+        }
     }
 
     async fn create_namespace(
         &self,
-        _namespace: &NamespaceIdent,
-        _properties: HashMap<String, String>,
+        namespace: &NamespaceIdent,
+        properties: HashMap<String, String>,
     ) -> Result<Namespace> {
-        todo!()
+        let exists = self.namespace_exists(namespace).await?;
+
+        if exists {
+            return Err(Error::new(
+                iceberg::ErrorKind::Unexpected,
+                format!("Namespace {:?} already exists", namespace),
+            ));
+        }
+
+        for i in 1..namespace.len() {
+            let parent_namespace = 
NamespaceIdent::from_vec(namespace[..i].to_vec())?;
+            let parent_exists = 
self.namespace_exists(&parent_namespace).await?;
+            if !parent_exists {
+                return no_such_namespace_err(&parent_namespace);
+            }
+        }
+
+        let namespace_str = namespace.join(".");
+        let insert = format!(
+            "INSERT INTO {NAMESPACE_TABLE_NAME} ({CATALOG_FIELD_CATALOG_NAME}, 
{NAMESPACE_FIELD_NAME}, {NAMESPACE_FIELD_PROPERTY_KEY}, 
{NAMESPACE_FIELD_PROPERTY_VALUE})
+             VALUES (?, ?, ?, ?)");
+        if !properties.is_empty() {
+            let mut query_args = Vec::with_capacity(properties.len() * 4);
+            let mut properties_insert = insert.clone();
+            for (index, (key, value)) in properties.iter().enumerate() {
+                query_args.extend_from_slice(&[
+                    Some(self.name.as_str()),
+                    Some(namespace_str.as_str()),
+                    Some(key.as_str()),
+                    Some(value.as_str()),
+                ]);
+                if index > 0 {
+                    properties_insert = format!("{properties_insert}, (?, ?, 
?, ?)");

Review Comment:
   We still need to insert `(exists, true)` for non empty properties



##########
crates/catalog/sql/src/catalog.rs:
##########
@@ -276,4 +579,385 @@ mod tests {
         new_sql_catalog(warehouse_loc.clone()).await;
         new_sql_catalog(warehouse_loc.clone()).await;
     }
+
+    #[tokio::test]
+    async fn test_list_namespaces_returns_empty_vector() {
+        let warehouse_loc = temp_path();
+        let catalog = new_sql_catalog(warehouse_loc).await;
+
+        assert_eq!(catalog.list_namespaces(None).await.unwrap(), vec![]);
+    }
+
+    #[tokio::test]
+    async fn test_list_namespaces_returns_multiple_namespaces() {
+        let warehouse_loc = temp_path();
+        let catalog = new_sql_catalog(warehouse_loc).await;
+        let namespace_ident_1 = NamespaceIdent::new("a".into());
+        let namespace_ident_2 = NamespaceIdent::new("b".into());
+        create_namespaces(&catalog, &vec![&namespace_ident_1, 
&namespace_ident_2]).await;
+
+        assert_eq!(
+            to_set(catalog.list_namespaces(None).await.unwrap()),
+            to_set(vec![namespace_ident_1, namespace_ident_2])
+        );
+    }
+
+    #[tokio::test]
+    async fn test_list_namespaces_returns_only_top_level_namespaces() {
+        let warehouse_loc = temp_path();
+        let catalog = new_sql_catalog(warehouse_loc).await;
+        let namespace_ident_1 = NamespaceIdent::new("a".into());
+        let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", 
"b"]).unwrap();
+        let namespace_ident_3 = NamespaceIdent::new("b".into());
+        create_namespaces(&catalog, &vec![
+            &namespace_ident_1,
+            &namespace_ident_2,
+            &namespace_ident_3,
+        ])
+        .await;
+
+        assert_eq!(
+            to_set(catalog.list_namespaces(None).await.unwrap()),
+            to_set(vec![namespace_ident_1, namespace_ident_3])
+        );
+    }
+
+    #[tokio::test]
+    async fn test_list_namespaces_returns_no_namespaces_under_parent() {
+        let warehouse_loc = temp_path();
+        let catalog = new_sql_catalog(warehouse_loc).await;
+        let namespace_ident_1 = NamespaceIdent::new("a".into());
+        let namespace_ident_2 = NamespaceIdent::new("b".into());
+        create_namespaces(&catalog, &vec![&namespace_ident_1, 
&namespace_ident_2]).await;
+
+        assert_eq!(
+            catalog
+                .list_namespaces(Some(&namespace_ident_1))
+                .await
+                .unwrap(),
+            vec![]
+        );
+    }
+
+    #[tokio::test]
+    async fn test_list_namespaces_returns_namespace_under_parent() {
+        let warehouse_loc = temp_path();
+        let catalog = new_sql_catalog(warehouse_loc).await;
+        let namespace_ident_1 = NamespaceIdent::new("a".into());
+        let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", 
"b"]).unwrap();
+        let namespace_ident_3 = NamespaceIdent::new("c".into());
+        create_namespaces(&catalog, &vec![
+            &namespace_ident_1,
+            &namespace_ident_2,
+            &namespace_ident_3,
+        ])
+        .await;
+
+        assert_eq!(
+            to_set(catalog.list_namespaces(None).await.unwrap()),
+            to_set(vec![namespace_ident_1.clone(), namespace_ident_3])
+        );
+
+        assert_eq!(
+            catalog
+                .list_namespaces(Some(&namespace_ident_1))
+                .await
+                .unwrap(),
+            vec![NamespaceIdent::from_strs(vec!["a", "b"]).unwrap()]
+        );
+    }
+
+    #[tokio::test]
+    async fn test_list_namespaces_returns_multiple_namespaces_under_parent() {
+        let warehouse_loc = temp_path();
+        let catalog = new_sql_catalog(warehouse_loc).await;
+        let namespace_ident_1 = NamespaceIdent::new("a".to_string());
+        let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", 
"a"]).unwrap();
+        let namespace_ident_3 = NamespaceIdent::from_strs(vec!["a", 
"b"]).unwrap();
+        let namespace_ident_4 = NamespaceIdent::from_strs(vec!["a", 
"c"]).unwrap();
+        let namespace_ident_5 = NamespaceIdent::new("b".into());
+        create_namespaces(&catalog, &vec![
+            &namespace_ident_1,
+            &namespace_ident_2,
+            &namespace_ident_3,
+            &namespace_ident_4,
+            &namespace_ident_5,
+        ])
+        .await;
+
+        assert_eq!(
+            to_set(
+                catalog
+                    .list_namespaces(Some(&namespace_ident_1))
+                    .await
+                    .unwrap()
+            ),
+            to_set(vec![
+                NamespaceIdent::from_strs(vec!["a", "a"]).unwrap(),
+                NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(),
+                NamespaceIdent::from_strs(vec!["a", "c"]).unwrap(),
+            ])
+        );
+    }
+
+    #[tokio::test]
+    async fn test_namespace_exists_returns_false() {
+        let warehouse_loc = temp_path();
+        let catalog = new_sql_catalog(warehouse_loc).await;
+        let namespace_ident = NamespaceIdent::new("a".into());
+        create_namespace(&catalog, &namespace_ident).await;
+
+        assert!(!catalog
+            .namespace_exists(&NamespaceIdent::new("b".into()))
+            .await
+            .unwrap());
+    }
+
+    #[tokio::test]
+    async fn test_namespace_exists_returns_true() {
+        let warehouse_loc = temp_path();
+        let catalog = new_sql_catalog(warehouse_loc).await;
+        let namespace_ident = NamespaceIdent::new("a".into());
+        create_namespace(&catalog, &namespace_ident).await;
+
+        assert!(catalog.namespace_exists(&namespace_ident).await.unwrap());
+    }
+
+    #[tokio::test]
+    async fn test_create_namespace_with_properties() {
+        let warehouse_loc = temp_path();
+        let catalog = new_sql_catalog(warehouse_loc).await;
+        let namespace_ident = NamespaceIdent::new("abc".into());
+
+        let mut properties: HashMap<String, String> = HashMap::new();
+        properties.insert("k".into(), "v".into());
+
+        assert_eq!(
+            catalog
+                .create_namespace(&namespace_ident, properties.clone())
+                .await
+                .unwrap(),
+            Namespace::with_properties(namespace_ident.clone(), 
properties.clone())
+        );
+
+        assert_eq!(
+            catalog.get_namespace(&namespace_ident).await.unwrap(),
+            Namespace::with_properties(namespace_ident, properties)
+        );
+    }
+
+    #[tokio::test]
+    async fn test_create_namespace_throws_error_if_namespace_already_exists() {
+        let warehouse_loc = temp_path();
+        let catalog = new_sql_catalog(warehouse_loc).await;
+        let namespace_ident = NamespaceIdent::new("a".into());
+        create_namespace(&catalog, &namespace_ident).await;
+
+        assert_eq!(
+            catalog
+                .create_namespace(&namespace_ident, HashMap::new())
+                .await
+                .unwrap_err()
+                .to_string(),
+            format!(
+                "Unexpected => Namespace {:?} already exists",
+                &namespace_ident
+            )
+        );
+
+        assert_eq!(
+            catalog.get_namespace(&namespace_ident).await.unwrap(),
+            Namespace::with_properties(namespace_ident, default_properties())
+        );
+    }
+
+    #[tokio::test]
+    async fn test_create_nested_namespace() {
+        let warehouse_loc = temp_path();
+        let catalog = new_sql_catalog(warehouse_loc).await;
+        let parent_namespace_ident = NamespaceIdent::new("a".into());
+        create_namespace(&catalog, &parent_namespace_ident).await;
+
+        let child_namespace_ident = NamespaceIdent::from_strs(vec!["a", 
"b"]).unwrap();
+
+        assert_eq!(
+            catalog
+                .create_namespace(&child_namespace_ident, HashMap::new())
+                .await
+                .unwrap(),
+            Namespace::new(child_namespace_ident.clone())
+        );
+
+        assert_eq!(
+            catalog.get_namespace(&child_namespace_ident).await.unwrap(),
+            Namespace::with_properties(child_namespace_ident, 
default_properties())
+        );
+    }
+
+    #[tokio::test]
+    async fn test_create_deeply_nested_namespace() {
+        let warehouse_loc = temp_path();
+        let catalog = new_sql_catalog(warehouse_loc).await;
+        let namespace_ident_a = NamespaceIdent::new("a".into());
+        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", 
"b"]).unwrap();
+        create_namespaces(&catalog, &vec![&namespace_ident_a, 
&namespace_ident_a_b]).await;
+
+        let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", 
"c"]).unwrap();
+
+        assert_eq!(
+            catalog
+                .create_namespace(&namespace_ident_a_b_c, HashMap::new())
+                .await
+                .unwrap(),
+            Namespace::new(namespace_ident_a_b_c.clone())
+        );
+
+        assert_eq!(
+            catalog.get_namespace(&namespace_ident_a_b_c).await.unwrap(),
+            Namespace::with_properties(namespace_ident_a_b_c, 
default_properties())
+        );
+    }
+
+    #[tokio::test]
+    async fn 
test_create_nested_namespace_throws_error_if_top_level_namespace_doesnt_exist() 
{
+        let warehouse_loc = temp_path();
+        let catalog = new_sql_catalog(warehouse_loc).await;
+
+        let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a", 
"b"]).unwrap();
+
+        assert_eq!(
+            catalog
+                .create_namespace(&nested_namespace_ident, HashMap::new())
+                .await
+                .unwrap_err()
+                .to_string(),
+            format!(
+                "Unexpected => No such namespace: {:?}",
+                NamespaceIdent::new("a".into())
+            )
+        );
+
+        assert_eq!(catalog.list_namespaces(None).await.unwrap(), vec![]);
+    }
+
+    #[tokio::test]
+    async fn test_drop_namespace() {
+        let warehouse_loc = temp_path();
+        let catalog = new_sql_catalog(warehouse_loc).await;
+        let namespace_ident = NamespaceIdent::new("abc".into());
+        create_namespace(&catalog, &namespace_ident).await;
+
+        catalog.drop_namespace(&namespace_ident).await.unwrap();
+
+        assert!(!catalog.namespace_exists(&namespace_ident).await.unwrap())
+    }
+
+    #[tokio::test]
+    async fn test_drop_nested_namespace() {
+        let warehouse_loc = temp_path();
+        let catalog = new_sql_catalog(warehouse_loc).await;
+        let namespace_ident_a = NamespaceIdent::new("a".into());
+        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", 
"b"]).unwrap();
+        create_namespaces(&catalog, &vec![&namespace_ident_a, 
&namespace_ident_a_b]).await;
+
+        catalog.drop_namespace(&namespace_ident_a_b).await.unwrap();
+
+        assert!(!catalog
+            .namespace_exists(&namespace_ident_a_b)
+            .await
+            .unwrap());
+
+        assert!(catalog.namespace_exists(&namespace_ident_a).await.unwrap());
+    }
+
+    #[tokio::test]
+    async fn test_drop_deeply_nested_namespace() {
+        let warehouse_loc = temp_path();
+        let catalog = new_sql_catalog(warehouse_loc).await;
+        let namespace_ident_a = NamespaceIdent::new("a".into());
+        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", 
"b"]).unwrap();
+        let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", 
"c"]).unwrap();
+        create_namespaces(&catalog, &vec![
+            &namespace_ident_a,
+            &namespace_ident_a_b,
+            &namespace_ident_a_b_c,
+        ])
+        .await;
+
+        catalog
+            .drop_namespace(&namespace_ident_a_b_c)
+            .await
+            .unwrap();
+
+        assert!(!catalog
+            .namespace_exists(&namespace_ident_a_b_c)
+            .await
+            .unwrap());
+
+        assert!(catalog
+            .namespace_exists(&namespace_ident_a_b)
+            .await
+            .unwrap());
+
+        assert!(catalog.namespace_exists(&namespace_ident_a).await.unwrap());
+    }
+
+    #[tokio::test]
+    async fn test_drop_namespace_throws_error_if_namespace_doesnt_exist() {
+        let warehouse_loc = temp_path();
+        let catalog = new_sql_catalog(warehouse_loc).await;
+
+        let non_existent_namespace_ident = NamespaceIdent::new("abc".into());
+        assert_eq!(
+            catalog
+                .drop_namespace(&non_existent_namespace_ident)
+                .await
+                .unwrap_err()
+                .to_string(),
+            format!(
+                "Unexpected => No such namespace: {:?}",
+                non_existent_namespace_ident
+            )
+        )
+    }
+
+    #[tokio::test]
+    async fn 
test_drop_namespace_throws_error_if_nested_namespace_doesnt_exist() {
+        let warehouse_loc = temp_path();
+        let catalog = new_sql_catalog(warehouse_loc).await;
+        create_namespace(&catalog, &NamespaceIdent::new("a".into())).await;
+
+        let non_existent_namespace_ident =
+            NamespaceIdent::from_vec(vec!["a".into(), "b".into()]).unwrap();
+        assert_eq!(
+            catalog
+                .drop_namespace(&non_existent_namespace_ident)
+                .await
+                .unwrap_err()
+                .to_string(),
+            format!(
+                "Unexpected => No such namespace: {:?}",
+                non_existent_namespace_ident
+            )
+        )
+    }
+
+    #[tokio::test]
+    #[ignore = "Java/Python do not drop nested namespaces?"]
+    async fn 
test_dropping_a_namespace_also_drops_namespaces_nested_under_that_one() {

Review Comment:
   Hmm, should we really take this behavior? 



##########
crates/catalog/sql/src/catalog.rs:
##########
@@ -167,43 +177,312 @@ impl SqlCatalog {
             .await
             .map_err(from_sqlx_error)
     }
+
+    /// Execute statements in a transaction, provided or not
+    async fn execute(
+        &self,
+        query: &str,
+        args: Vec<Option<&str>>,
+        transaction: Option<&mut Transaction<'_, Any>>,
+    ) -> Result<AnyQueryResult> {
+        let query_with_placeholders = self.replace_placeholders(query);
+
+        let mut sqlx_query = sqlx::query(&query_with_placeholders);
+        for arg in args {
+            sqlx_query = sqlx_query.bind(arg);
+        }
+
+        match transaction {
+            Some(t) => sqlx_query.execute(&mut 
**t).await.map_err(from_sqlx_error),
+            None => {
+                let mut tx = 
self.connection.begin().await.map_err(from_sqlx_error)?;
+                let result = sqlx_query.execute(&mut 
*tx).await.map_err(from_sqlx_error);
+                let _ = tx.commit().await.map_err(from_sqlx_error);
+                result
+            }
+        }
+    }
 }
 
 #[async_trait]
 impl Catalog for SqlCatalog {
     async fn list_namespaces(
         &self,
-        _parent: Option<&NamespaceIdent>,
+        parent: Option<&NamespaceIdent>,
     ) -> Result<Vec<NamespaceIdent>> {
-        todo!()
+        // UNION will remove duplicates.
+        let all_namespaces_stmt = format!(
+            "SELECT {CATALOG_FIELD_TABLE_NAMESPACE}
+             FROM {CATALOG_TABLE_NAME}
+             WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
+             UNION
+             SELECT {NAMESPACE_FIELD_NAME}
+             FROM {NAMESPACE_TABLE_NAME}
+             WHERE {CATALOG_FIELD_CATALOG_NAME} = ?"
+        );
+
+        let namespace_rows = self
+            .fetch_rows(&all_namespaces_stmt, vec![
+                Some(&self.name),
+                Some(&self.name),
+            ])
+            .await?;
+
+        let mut namespaces = 
HashSet::<NamespaceIdent>::with_capacity(namespace_rows.len());
+
+        if let Some(parent) = parent {
+            if self.namespace_exists(parent).await? {
+                let parent_str = parent.join(".");
+
+                for row in namespace_rows.iter() {
+                    let nsp = row.try_get::<String, 
_>(0).map_err(from_sqlx_error)?;
+                    // if parent = a, then we only want to see a.b, a.c 
returned.
+                    if nsp != parent_str && nsp.starts_with(&parent_str) {
+                        
namespaces.insert(NamespaceIdent::from_strs(nsp.split("."))?);
+                    }
+                }
+
+                Ok(namespaces.into_iter().collect::<Vec<NamespaceIdent>>())
+            } else {
+                no_such_namespace_err(parent)
+            }
+        } else {
+            for row in namespace_rows.iter() {
+                let nsp = row.try_get::<String, 
_>(0).map_err(from_sqlx_error)?;
+                let mut levels = nsp.split(".").collect::<Vec<&str>>();
+                if !levels.is_empty() {
+                    let first_level = levels.drain(..1).collect::<Vec<&str>>();
+                    namespaces.insert(NamespaceIdent::from_strs(first_level)?);
+                }
+            }
+
+            Ok(namespaces.into_iter().collect::<Vec<NamespaceIdent>>())
+        }
     }
 
     async fn create_namespace(
         &self,
-        _namespace: &NamespaceIdent,
-        _properties: HashMap<String, String>,
+        namespace: &NamespaceIdent,
+        properties: HashMap<String, String>,
     ) -> Result<Namespace> {
-        todo!()
+        let exists = self.namespace_exists(namespace).await?;
+
+        if exists {
+            return Err(Error::new(
+                iceberg::ErrorKind::Unexpected,
+                format!("Namespace {:?} already exists", namespace),
+            ));
+        }
+
+        for i in 1..namespace.len() {
+            let parent_namespace = 
NamespaceIdent::from_vec(namespace[..i].to_vec())?;
+            let parent_exists = 
self.namespace_exists(&parent_namespace).await?;
+            if !parent_exists {
+                return no_such_namespace_err(&parent_namespace);
+            }
+        }
+
+        let namespace_str = namespace.join(".");
+        let insert = format!(
+            "INSERT INTO {NAMESPACE_TABLE_NAME} ({CATALOG_FIELD_CATALOG_NAME}, 
{NAMESPACE_FIELD_NAME}, {NAMESPACE_FIELD_PROPERTY_KEY}, 
{NAMESPACE_FIELD_PROPERTY_VALUE})
+             VALUES (?, ?, ?, ?)");
+        if !properties.is_empty() {
+            let mut query_args = Vec::with_capacity(properties.len() * 4);
+            let mut properties_insert = insert.clone();
+            for (index, (key, value)) in properties.iter().enumerate() {
+                query_args.extend_from_slice(&[
+                    Some(self.name.as_str()),
+                    Some(namespace_str.as_str()),
+                    Some(key.as_str()),
+                    Some(value.as_str()),
+                ]);
+                if index > 0 {
+                    properties_insert = format!("{properties_insert}, (?, ?, 
?, ?)");

Review Comment:
   Though this is correct, I feel this maybe slow since it involves many 
allocations. How about `iter::repeat("(?, ?, ?, 
?)").take(properties.len()).join(", ")`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org


Reply via email to