callum-ryan commented on code in PR #534:
URL: https://github.com/apache/iceberg-rust/pull/534#discussion_r1725539258


##########
crates/catalog/sql/src/catalog.rs:
##########
@@ -167,43 +177,312 @@ impl SqlCatalog {
             .await
             .map_err(from_sqlx_error)
     }
+
+    /// Execute statements in a transaction, provided or not
+    async fn execute(
+        &self,
+        query: &str,
+        args: Vec<Option<&str>>,
+        transaction: Option<&mut Transaction<'_, Any>>,
+    ) -> Result<AnyQueryResult> {
+        let query_with_placeholders = self.replace_placeholders(query);
+
+        let mut sqlx_query = sqlx::query(&query_with_placeholders);
+        for arg in args {
+            sqlx_query = sqlx_query.bind(arg);
+        }
+
+        match transaction {
+            Some(t) => sqlx_query.execute(&mut 
**t).await.map_err(from_sqlx_error),
+            None => {
+                let mut tx = 
self.connection.begin().await.map_err(from_sqlx_error)?;
+                let result = sqlx_query.execute(&mut 
*tx).await.map_err(from_sqlx_error);
+                let _ = tx.commit().await.map_err(from_sqlx_error);
+                result
+            }
+        }
+    }
 }
 
 #[async_trait]
 impl Catalog for SqlCatalog {
     async fn list_namespaces(
         &self,
-        _parent: Option<&NamespaceIdent>,
+        parent: Option<&NamespaceIdent>,
     ) -> Result<Vec<NamespaceIdent>> {
-        todo!()
+        // UNION will remove duplicates.
+        let all_namespaces_stmt = format!(
+            "SELECT {CATALOG_FIELD_TABLE_NAMESPACE}
+             FROM {CATALOG_TABLE_NAME}
+             WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
+             UNION
+             SELECT {NAMESPACE_FIELD_NAME}
+             FROM {NAMESPACE_TABLE_NAME}
+             WHERE {CATALOG_FIELD_CATALOG_NAME} = ?"
+        );
+
+        let namespace_rows = self
+            .fetch_rows(&all_namespaces_stmt, vec![
+                Some(&self.name),
+                Some(&self.name),
+            ])
+            .await?;
+
+        let mut namespaces = 
HashSet::<NamespaceIdent>::with_capacity(namespace_rows.len());
+
+        if let Some(parent) = parent {
+            if self.namespace_exists(parent).await? {
+                let parent_str = parent.join(".");
+
+                for row in namespace_rows.iter() {
+                    let nsp = row.try_get::<String, 
_>(0).map_err(from_sqlx_error)?;
+                    // if parent = a, then we only want to see a.b, a.c 
returned.
+                    if nsp != parent_str && nsp.starts_with(&parent_str) {
+                        
namespaces.insert(NamespaceIdent::from_strs(nsp.split("."))?);
+                    }
+                }
+
+                Ok(namespaces.into_iter().collect::<Vec<NamespaceIdent>>())
+            } else {
+                no_such_namespace_err(parent)
+            }
+        } else {
+            for row in namespace_rows.iter() {
+                let nsp = row.try_get::<String, 
_>(0).map_err(from_sqlx_error)?;
+                let mut levels = nsp.split(".").collect::<Vec<&str>>();
+                if !levels.is_empty() {
+                    let first_level = levels.drain(..1).collect::<Vec<&str>>();
+                    namespaces.insert(NamespaceIdent::from_strs(first_level)?);
+                }
+            }
+
+            Ok(namespaces.into_iter().collect::<Vec<NamespaceIdent>>())
+        }
     }
 
     async fn create_namespace(
         &self,
-        _namespace: &NamespaceIdent,
-        _properties: HashMap<String, String>,
+        namespace: &NamespaceIdent,
+        properties: HashMap<String, String>,
     ) -> Result<Namespace> {
-        todo!()
+        let exists = self.namespace_exists(namespace).await?;
+
+        if exists {
+            return Err(Error::new(
+                iceberg::ErrorKind::Unexpected,
+                format!("Namespace {:?} already exists", namespace),
+            ));
+        }
+
+        for i in 1..namespace.len() {
+            let parent_namespace = 
NamespaceIdent::from_vec(namespace[..i].to_vec())?;
+            let parent_exists = 
self.namespace_exists(&parent_namespace).await?;
+            if !parent_exists {
+                return no_such_namespace_err(&parent_namespace);
+            }
+        }
+
+        let namespace_str = namespace.join(".");
+        let insert = format!(
+            "INSERT INTO {NAMESPACE_TABLE_NAME} ({CATALOG_FIELD_CATALOG_NAME}, 
{NAMESPACE_FIELD_NAME}, {NAMESPACE_FIELD_PROPERTY_KEY}, 
{NAMESPACE_FIELD_PROPERTY_VALUE})
+             VALUES (?, ?, ?, ?)");
+        if !properties.is_empty() {
+            let mut query_args = Vec::with_capacity(properties.len() * 4);
+            let mut properties_insert = insert.clone();
+            for (index, (key, value)) in properties.iter().enumerate() {
+                query_args.extend_from_slice(&[
+                    Some(self.name.as_str()),
+                    Some(namespace_str.as_str()),
+                    Some(key.as_str()),
+                    Some(value.as_str()),
+                ]);
+                if index > 0 {
+                    properties_insert = format!("{properties_insert}, (?, ?, 
?, ?)");
+                }
+            }
+
+            self.execute(&properties_insert, query_args, None).await?;
+
+            Ok(Namespace::with_properties(namespace.clone(), properties))
+        } else {
+            // set a default property of exists = true
+            // up for debate if this is worthwhile
+            self.execute(
+                &insert,
+                vec![
+                    Some(&self.name),
+                    Some(&namespace_str),
+                    Some("exists"),
+                    Some("true"),
+                ],
+                None,
+            )
+            .await?;
+            Ok(Namespace::with_properties(namespace.clone(), properties))
+        }
     }
 
-    async fn get_namespace(&self, _namespace: &NamespaceIdent) -> 
Result<Namespace> {
-        todo!()
+    async fn get_namespace(&self, namespace: &NamespaceIdent) -> 
Result<Namespace> {
+        let exists = self.namespace_exists(namespace).await?;
+        if exists {
+            let namespace_props = self
+                .fetch_rows(
+                    &format!(
+                        "SELECT
+                            {NAMESPACE_FIELD_NAME},
+                            {NAMESPACE_FIELD_PROPERTY_KEY},
+                            {NAMESPACE_FIELD_PROPERTY_VALUE}
+                            FROM {NAMESPACE_TABLE_NAME}
+                            WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
+                            AND {NAMESPACE_FIELD_NAME} = ?"
+                    ),
+                    vec![Some(&self.name), Some(&namespace.join("."))],
+                )
+                .await?;
+
+            let mut properties = HashMap::with_capacity(namespace_props.len());
+
+            for row in namespace_props {
+                let key = row
+                    .try_get::<String, _>(NAMESPACE_FIELD_PROPERTY_KEY)
+                    .map_err(from_sqlx_error)?;
+                let value = row
+                    .try_get::<String, _>(NAMESPACE_FIELD_PROPERTY_VALUE)
+                    .map_err(from_sqlx_error)?;
+
+                properties.insert(key, value);
+            }
+
+            Ok(Namespace::with_properties(namespace.clone(), properties))
+        } else {
+            no_such_namespace_err(namespace)
+        }
     }
 
-    async fn namespace_exists(&self, _namespace: &NamespaceIdent) -> 
Result<bool> {
-        todo!()
+    async fn namespace_exists(&self, namespace: &NamespaceIdent) -> 
Result<bool> {
+        let namespace_str = namespace.join(".");
+
+        let table_namespaces = self
+            .fetch_rows(
+                &format!(
+                    "SELECT 1 FROM {CATALOG_TABLE_NAME}
+                     WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
+                      AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
+                     LIMIT 1"
+                ),
+                vec![Some(&self.name), Some(&namespace_str)],
+            )
+            .await?;
+
+        if !table_namespaces.is_empty() {
+            Ok(true)
+        } else {
+            let namespaces = self
+                .fetch_rows(
+                    &format!(
+                        "SELECT 1 FROM {NAMESPACE_TABLE_NAME}
+                         WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
+                          AND {NAMESPACE_FIELD_NAME} = ?
+                         LIMIT 1"
+                    ),
+                    vec![Some(&self.name), Some(&namespace_str)],
+                )
+                .await?;
+            if !namespaces.is_empty() {
+                Ok(true)
+            } else {
+                Ok(false)
+            }
+        }
     }
 
     async fn update_namespace(
         &self,
-        _namespace: &NamespaceIdent,
-        _properties: HashMap<String, String>,
+        namespace: &NamespaceIdent,
+        properties: HashMap<String, String>,
     ) -> Result<()> {
-        todo!()
+        let exists = self.namespace_exists(namespace).await?;
+        if exists {
+            let existing_properties = 
self.get_namespace(namespace).await?.properties().clone();
+            let namespace_str = namespace.join(".");
+
+            let mut updates = vec![];
+            let mut inserts = vec![];
+
+            for (key, value) in properties.iter() {
+                if existing_properties.contains_key(key) {
+                    if existing_properties.get(key) != Some(value) {
+                        updates.push((key, value));
+                    }
+                } else {
+                    inserts.push((key, value));
+                }
+            }
+
+            let mut tx = 
self.connection.begin().await.map_err(from_sqlx_error)?;
+            let update_stmt = format!(
+                "UPDATE {NAMESPACE_TABLE_NAME} SET 
{NAMESPACE_FIELD_PROPERTY_VALUE} = ?
+                 WHERE {CATALOG_FIELD_CATALOG_NAME} = ? 
+                 AND {NAMESPACE_FIELD_NAME} = ?
+                 AND {NAMESPACE_FIELD_PROPERTY_KEY} = ?"
+            );
+
+            let insert_stmt = format!(
+                "INSERT INTO {NAMESPACE_TABLE_NAME} 
({CATALOG_FIELD_CATALOG_NAME}, {NAMESPACE_FIELD_NAME}, 
{NAMESPACE_FIELD_PROPERTY_KEY}, {NAMESPACE_FIELD_PROPERTY_VALUE})
+                 VALUES (?, ?, ?, ?)"
+            );
+
+            for (key, value) in updates {
+                self.execute(
+                    &update_stmt,
+                    vec![
+                        Some(value),
+                        Some(&self.name),
+                        Some(&namespace_str),
+                        Some(key),
+                    ],
+                    Some(&mut tx),
+                )
+                .await?;
+            }
+
+            for (key, value) in inserts {
+                self.execute(
+                    &insert_stmt,
+                    vec![
+                        Some(&self.name),
+                        Some(&namespace_str),
+                        Some(key),
+                        Some(value),
+                    ],
+                    Some(&mut tx),
+                )
+                .await?;
+            }
+
+            let _ = tx.commit().await.map_err(from_sqlx_error)?;
+
+            Ok(())
+        } else {
+            no_such_namespace_err(namespace)
+        }
     }
 
-    async fn drop_namespace(&self, _namespace: &NamespaceIdent) -> Result<()> {
-        todo!()
+    async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> {
+        let exists = self.namespace_exists(namespace).await?;
+        if exists {
+            // TODO: check that the namespace is empty

Review Comment:
   Yes, this functionality will be added with the `list_tables` method I am 
going to implement in the next phase of the SQL Catalog. I'll remove this for 
now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to