liurenjie1024 commented on code in PR #534: URL: https://github.com/apache/iceberg-rust/pull/534#discussion_r1720598113
########## crates/catalog/sql/src/catalog.rs: ########## @@ -167,43 +177,335 @@ impl SqlCatalog { .await .map_err(from_sqlx_error) } + + /// Execute statements in a transaction, provided or not + pub async fn execute( + &self, + query: &str, + args: Vec<Option<&str>>, + transaction: Option<&mut Transaction<'_, Any>>, + ) -> Result<AnyQueryResult> { + let query_with_placeholders = self.replace_placeholders(query); + + let mut sqlx_query = sqlx::query(&query_with_placeholders); + for arg in args { + sqlx_query = sqlx_query.bind(arg); + } + + match transaction { + Some(t) => sqlx_query.execute(&mut **t).await.map_err(from_sqlx_error), + None => { + let mut tx = self.connection.begin().await.map_err(from_sqlx_error)?; + let result = sqlx_query.execute(&mut *tx).await.map_err(from_sqlx_error); + let _ = tx.commit().await.map_err(from_sqlx_error); + result + } + } + } } #[async_trait] impl Catalog for SqlCatalog { async fn list_namespaces( &self, - _parent: Option<&NamespaceIdent>, + parent: Option<&NamespaceIdent>, ) -> Result<Vec<NamespaceIdent>> { - todo!() + let table_namespaces_stmt = format!( + "SELECT {CATALOG_FIELD_TABLE_NAMESPACE} + FROM {CATALOG_TABLE_NAME} + WHERE {CATALOG_FIELD_CATALOG_NAME} = ?" + ); + let namespaces_stmt = format!( + "SELECT {NAMESPACE_FIELD_NAME} Review Comment: Ditto. ########## crates/catalog/sql/src/catalog.rs: ########## @@ -167,43 +177,335 @@ impl SqlCatalog { .await .map_err(from_sqlx_error) } + + /// Execute statements in a transaction, provided or not + pub async fn execute( Review Comment: Ditto. ########## crates/catalog/sql/src/catalog.rs: ########## @@ -167,43 +177,335 @@ impl SqlCatalog { .await .map_err(from_sqlx_error) } + + /// Execute statements in a transaction, provided or not + pub async fn execute( + &self, + query: &str, + args: Vec<Option<&str>>, + transaction: Option<&mut Transaction<'_, Any>>, + ) -> Result<AnyQueryResult> { + let query_with_placeholders = self.replace_placeholders(query); + + let mut sqlx_query = sqlx::query(&query_with_placeholders); + for arg in args { + sqlx_query = sqlx_query.bind(arg); + } + + match transaction { + Some(t) => sqlx_query.execute(&mut **t).await.map_err(from_sqlx_error), + None => { + let mut tx = self.connection.begin().await.map_err(from_sqlx_error)?; + let result = sqlx_query.execute(&mut *tx).await.map_err(from_sqlx_error); + let _ = tx.commit().await.map_err(from_sqlx_error); + result + } + } + } } #[async_trait] impl Catalog for SqlCatalog { async fn list_namespaces( &self, - _parent: Option<&NamespaceIdent>, + parent: Option<&NamespaceIdent>, ) -> Result<Vec<NamespaceIdent>> { - todo!() + let table_namespaces_stmt = format!( + "SELECT {CATALOG_FIELD_TABLE_NAMESPACE} Review Comment: We need to add `DISTINCT` here. ########## crates/catalog/sql/src/catalog.rs: ########## @@ -167,43 +177,335 @@ impl SqlCatalog { .await .map_err(from_sqlx_error) } + + /// Execute statements in a transaction, provided or not + pub async fn execute( + &self, + query: &str, + args: Vec<Option<&str>>, + transaction: Option<&mut Transaction<'_, Any>>, + ) -> Result<AnyQueryResult> { + let query_with_placeholders = self.replace_placeholders(query); + + let mut sqlx_query = sqlx::query(&query_with_placeholders); + for arg in args { + sqlx_query = sqlx_query.bind(arg); + } + + match transaction { + Some(t) => sqlx_query.execute(&mut **t).await.map_err(from_sqlx_error), + None => { + let mut tx = self.connection.begin().await.map_err(from_sqlx_error)?; + let result = sqlx_query.execute(&mut *tx).await.map_err(from_sqlx_error); + let _ = tx.commit().await.map_err(from_sqlx_error); + result + } + } + } } #[async_trait] impl Catalog for SqlCatalog { async fn list_namespaces( &self, - _parent: Option<&NamespaceIdent>, + parent: Option<&NamespaceIdent>, ) -> Result<Vec<NamespaceIdent>> { - todo!() + let table_namespaces_stmt = format!( + "SELECT {CATALOG_FIELD_TABLE_NAMESPACE} + FROM {CATALOG_TABLE_NAME} + WHERE {CATALOG_FIELD_CATALOG_NAME} = ?" + ); + let namespaces_stmt = format!( + "SELECT {NAMESPACE_FIELD_NAME} + FROM {NAMESPACE_TABLE_NAME} + WHERE {CATALOG_FIELD_CATALOG_NAME} = ?" + ); + + match parent { + Some(parent) => { + if self.namespace_exists(parent).await? { + let parent_str = parent.join("."); + let parent_table_namespaces_stmt = format!( + "{table_namespaces_stmt} AND {CATALOG_FIELD_TABLE_NAMESPACE} LIKE CONCAT(?, '%')" + ); + let parent_namespaces_stmt = + format!("{namespaces_stmt} AND {NAMESPACE_FIELD_NAME} LIKE CONCAT(?, '%')"); + + let namespace_rows = self + .fetch_rows( + &format!( + "{parent_namespaces_stmt} UNION {parent_table_namespaces_stmt}" + ), + vec![ + Some(&self.name), + Some(&parent_str), + Some(&self.name), + Some(&parent_str), + ], + ) + .await?; + + let mut namespaces = Vec::<NamespaceIdent>::with_capacity(namespace_rows.len()); Review Comment: This should be a set. ########## crates/catalog/sql/src/catalog.rs: ########## @@ -167,43 +177,335 @@ impl SqlCatalog { .await .map_err(from_sqlx_error) } + + /// Execute statements in a transaction, provided or not + pub async fn execute( + &self, + query: &str, + args: Vec<Option<&str>>, + transaction: Option<&mut Transaction<'_, Any>>, + ) -> Result<AnyQueryResult> { + let query_with_placeholders = self.replace_placeholders(query); + + let mut sqlx_query = sqlx::query(&query_with_placeholders); + for arg in args { + sqlx_query = sqlx_query.bind(arg); + } + + match transaction { + Some(t) => sqlx_query.execute(&mut **t).await.map_err(from_sqlx_error), + None => { + let mut tx = self.connection.begin().await.map_err(from_sqlx_error)?; + let result = sqlx_query.execute(&mut *tx).await.map_err(from_sqlx_error); + let _ = tx.commit().await.map_err(from_sqlx_error); + result + } + } + } } #[async_trait] impl Catalog for SqlCatalog { async fn list_namespaces( &self, - _parent: Option<&NamespaceIdent>, + parent: Option<&NamespaceIdent>, ) -> Result<Vec<NamespaceIdent>> { - todo!() + let table_namespaces_stmt = format!( + "SELECT {CATALOG_FIELD_TABLE_NAMESPACE} + FROM {CATALOG_TABLE_NAME} + WHERE {CATALOG_FIELD_CATALOG_NAME} = ?" + ); + let namespaces_stmt = format!( + "SELECT {NAMESPACE_FIELD_NAME} + FROM {NAMESPACE_TABLE_NAME} + WHERE {CATALOG_FIELD_CATALOG_NAME} = ?" + ); + + match parent { + Some(parent) => { + if self.namespace_exists(parent).await? { + let parent_str = parent.join("."); + let parent_table_namespaces_stmt = format!( + "{table_namespaces_stmt} AND {CATALOG_FIELD_TABLE_NAMESPACE} LIKE CONCAT(?, '%')" + ); + let parent_namespaces_stmt = + format!("{namespaces_stmt} AND {NAMESPACE_FIELD_NAME} LIKE CONCAT(?, '%')"); + + let namespace_rows = self + .fetch_rows( + &format!( + "{parent_namespaces_stmt} UNION {parent_table_namespaces_stmt}" + ), + vec![ + Some(&self.name), + Some(&parent_str), + Some(&self.name), + Some(&parent_str), + ], + ) + .await?; + + let mut namespaces = Vec::<NamespaceIdent>::with_capacity(namespace_rows.len()); + + for row in namespace_rows { + let nsp = row.try_get::<String, _>(0).map_err(from_sqlx_error)?; + if nsp != parent_str { + namespaces.push(NamespaceIdent::from_strs(nsp.split("."))?); + } + } + + Ok(namespaces) + } else { + no_such_namespace_err(parent) + } + } + None => { + let namespace_rows = self + .fetch_rows( + &format!("{namespaces_stmt} UNION {table_namespaces_stmt}"), + vec![Some(&self.name), Some(&self.name)], + ) + .await?; + + let mut namespaces = HashSet::<NamespaceIdent>::with_capacity(namespace_rows.len()); + + for row in namespace_rows.iter() { + let nsp = row.try_get::<String, _>(0).map_err(from_sqlx_error)?; + let mut levels = nsp.split(".").collect::<Vec<&str>>(); + if !levels.is_empty() { + let first_level = levels.drain(..1).collect::<Vec<&str>>(); + namespaces.insert(NamespaceIdent::from_strs(first_level)?); + } + } + + Ok(namespaces.into_iter().collect::<Vec<NamespaceIdent>>()) + } + } } async fn create_namespace( &self, - _namespace: &NamespaceIdent, - _properties: HashMap<String, String>, + namespace: &NamespaceIdent, + properties: HashMap<String, String>, ) -> Result<Namespace> { - todo!() + let exists = self.namespace_exists(namespace).await?; + + if exists { + return Err(Error::new( + iceberg::ErrorKind::Unexpected, + format!("Namespace {:?} already exists", namespace), + )); + } + + for i in 1..namespace.len() { + let parent_namespace = NamespaceIdent::from_vec(namespace[..i].to_vec())?; + let parent_exists = self.namespace_exists(&parent_namespace).await?; + if !parent_exists { + return no_such_namespace_err(&parent_namespace); + } + } + + let namespace_str = namespace.join("."); + let insert = format!( + "INSERT INTO {NAMESPACE_TABLE_NAME} ({CATALOG_FIELD_CATALOG_NAME}, {NAMESPACE_FIELD_NAME}, {NAMESPACE_FIELD_PROPERTY_KEY}, {NAMESPACE_FIELD_PROPERTY_VALUE}) + VALUES (?, ?, ?, ?)"); + if !properties.is_empty() { + let mut query_args = Vec::with_capacity(properties.len() * 4); + let mut properties_insert = insert.clone(); + for (index, (key, value)) in properties.iter().enumerate() { + query_args.extend_from_slice(&[ + Some(self.name.as_str()), + Some(namespace_str.as_str()), + Some(key.as_str()), + Some(value.as_str()), + ]); + if index > 0 { + properties_insert = format!("{properties_insert}, (?, ?, ?, ?)"); + } + } + + self.execute(&properties_insert, query_args, None).await?; + + Ok(Namespace::with_properties(namespace.clone(), properties)) + } else { + // set a default property of exists = true + // up for debate if this is worthwhile + self.execute( + &insert, + vec![ + Some(&self.name), + Some(&namespace_str), + Some("exists"), + Some("true"), + ], + None, + ) + .await?; + Ok(Namespace::with_properties(namespace.clone(), properties)) + } } - async fn get_namespace(&self, _namespace: &NamespaceIdent) -> Result<Namespace> { - todo!() + async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result<Namespace> { + let exists = self.namespace_exists(namespace).await?; + if exists { + let namespace_props = self + .fetch_rows( + &format!( + "SELECT + {NAMESPACE_FIELD_NAME}, + {NAMESPACE_FIELD_PROPERTY_KEY}, + {NAMESPACE_FIELD_PROPERTY_VALUE} + FROM {NAMESPACE_TABLE_NAME} + WHERE {CATALOG_FIELD_CATALOG_NAME} = ? + AND {NAMESPACE_FIELD_NAME} = ?" + ), + vec![Some(&self.name), Some(&namespace.join("."))], + ) + .await?; + + let mut properties = HashMap::with_capacity(namespace_props.len()); + + for row in namespace_props { + let key = row + .try_get::<String, _>(NAMESPACE_FIELD_PROPERTY_KEY) + .map_err(from_sqlx_error)?; + let value = row + .try_get::<String, _>(NAMESPACE_FIELD_PROPERTY_VALUE) + .map_err(from_sqlx_error)?; + + properties.insert(key, value); + } + + Ok(Namespace::with_properties(namespace.clone(), properties)) + } else { + no_such_namespace_err(namespace) + } } - async fn namespace_exists(&self, _namespace: &NamespaceIdent) -> Result<bool> { - todo!() + async fn namespace_exists(&self, namespace: &NamespaceIdent) -> Result<bool> { + let namespace_str = namespace.join("."); + + let table_namespaces = self + .fetch_rows( + &format!( + "SELECT 1 FROM {CATALOG_TABLE_NAME} Review Comment: I think we also need to check namespace property table? It's possible that a namespace has property but has not table created yet. ########## crates/catalog/sql/src/catalog.rs: ########## @@ -167,43 +177,335 @@ impl SqlCatalog { .await .map_err(from_sqlx_error) } + + /// Execute statements in a transaction, provided or not + pub async fn execute( + &self, + query: &str, + args: Vec<Option<&str>>, + transaction: Option<&mut Transaction<'_, Any>>, + ) -> Result<AnyQueryResult> { + let query_with_placeholders = self.replace_placeholders(query); + + let mut sqlx_query = sqlx::query(&query_with_placeholders); + for arg in args { + sqlx_query = sqlx_query.bind(arg); + } + + match transaction { + Some(t) => sqlx_query.execute(&mut **t).await.map_err(from_sqlx_error), + None => { + let mut tx = self.connection.begin().await.map_err(from_sqlx_error)?; + let result = sqlx_query.execute(&mut *tx).await.map_err(from_sqlx_error); + let _ = tx.commit().await.map_err(from_sqlx_error); + result + } + } + } } #[async_trait] impl Catalog for SqlCatalog { async fn list_namespaces( &self, - _parent: Option<&NamespaceIdent>, + parent: Option<&NamespaceIdent>, ) -> Result<Vec<NamespaceIdent>> { - todo!() + let table_namespaces_stmt = format!( + "SELECT {CATALOG_FIELD_TABLE_NAMESPACE} + FROM {CATALOG_TABLE_NAME} + WHERE {CATALOG_FIELD_CATALOG_NAME} = ?" + ); + let namespaces_stmt = format!( + "SELECT {NAMESPACE_FIELD_NAME} + FROM {NAMESPACE_TABLE_NAME} + WHERE {CATALOG_FIELD_CATALOG_NAME} = ?" + ); + + match parent { + Some(parent) => { + if self.namespace_exists(parent).await? { + let parent_str = parent.join("."); + let parent_table_namespaces_stmt = format!( + "{table_namespaces_stmt} AND {CATALOG_FIELD_TABLE_NAMESPACE} LIKE CONCAT(?, '%')" Review Comment: To be honest, I feel this is a little difficult to read. I can understand that you are trying to remove duplicates, but sometimes a little duplicate is worth it if it improves readability. ########## crates/catalog/sql/src/catalog.rs: ########## @@ -141,21 +142,30 @@ impl SqlCatalog { } /// SQLX Any does not implement PostgresSQL bindings, so we have to do this. - pub async fn execute_statement( - &self, - query: &String, - args: Vec<Option<&String>>, - ) -> Result<Vec<AnyRow>> { - let query_with_placeholders: Cow<str> = - if self.sql_bind_style == SqlBindStyle::DollarNumeric { - let mut query = query.clone(); - for i in 0..args.len() { - query = query.replacen("?", &format!("${}", i + 1), 1); - } - Cow::Owned(query) - } else { - Cow::Borrowed(query) - }; + fn replace_placeholders(&self, query: &str) -> String { + match self.sql_bind_style { + SqlBindStyle::DollarNumeric => { + let mut count = 1; + query + .chars() + .fold(String::with_capacity(query.len()), |mut acc, c| { + if c == '?' { + acc.push('$'); + acc.push_str(&count.to_string()); + count += 1; + } else { + acc.push(c); + } + acc + }) + } + _ => query.to_owned(), + } + } + + /// Fetch a vec of AnyRows from a given query + pub async fn fetch_rows(&self, query: &str, args: Vec<Option<&str>>) -> Result<Vec<AnyRow>> { Review Comment: Do we need to make this public? ########## crates/catalog/sql/src/catalog.rs: ########## @@ -167,43 +177,335 @@ impl SqlCatalog { .await .map_err(from_sqlx_error) } + + /// Execute statements in a transaction, provided or not + pub async fn execute( + &self, + query: &str, + args: Vec<Option<&str>>, + transaction: Option<&mut Transaction<'_, Any>>, + ) -> Result<AnyQueryResult> { + let query_with_placeholders = self.replace_placeholders(query); + + let mut sqlx_query = sqlx::query(&query_with_placeholders); + for arg in args { + sqlx_query = sqlx_query.bind(arg); + } + + match transaction { + Some(t) => sqlx_query.execute(&mut **t).await.map_err(from_sqlx_error), + None => { + let mut tx = self.connection.begin().await.map_err(from_sqlx_error)?; + let result = sqlx_query.execute(&mut *tx).await.map_err(from_sqlx_error); + let _ = tx.commit().await.map_err(from_sqlx_error); + result + } + } + } } #[async_trait] impl Catalog for SqlCatalog { async fn list_namespaces( &self, - _parent: Option<&NamespaceIdent>, + parent: Option<&NamespaceIdent>, ) -> Result<Vec<NamespaceIdent>> { - todo!() + let table_namespaces_stmt = format!( + "SELECT {CATALOG_FIELD_TABLE_NAMESPACE} + FROM {CATALOG_TABLE_NAME} + WHERE {CATALOG_FIELD_CATALOG_NAME} = ?" + ); + let namespaces_stmt = format!( + "SELECT {NAMESPACE_FIELD_NAME} + FROM {NAMESPACE_TABLE_NAME} + WHERE {CATALOG_FIELD_CATALOG_NAME} = ?" + ); + + match parent { + Some(parent) => { + if self.namespace_exists(parent).await? { + let parent_str = parent.join("."); + let parent_table_namespaces_stmt = format!( + "{table_namespaces_stmt} AND {CATALOG_FIELD_TABLE_NAMESPACE} LIKE CONCAT(?, '%')" + ); + let parent_namespaces_stmt = + format!("{namespaces_stmt} AND {NAMESPACE_FIELD_NAME} LIKE CONCAT(?, '%')"); + + let namespace_rows = self + .fetch_rows( + &format!( + "{parent_namespaces_stmt} UNION {parent_table_namespaces_stmt}" + ), + vec![ + Some(&self.name), + Some(&parent_str), + Some(&self.name), + Some(&parent_str), + ], + ) + .await?; + + let mut namespaces = Vec::<NamespaceIdent>::with_capacity(namespace_rows.len()); + + for row in namespace_rows { + let nsp = row.try_get::<String, _>(0).map_err(from_sqlx_error)?; + if nsp != parent_str { Review Comment: Should this be `starts_with`? ########## crates/catalog/sql/src/catalog.rs: ########## @@ -167,43 +177,335 @@ impl SqlCatalog { .await .map_err(from_sqlx_error) } + + /// Execute statements in a transaction, provided or not + pub async fn execute( + &self, + query: &str, + args: Vec<Option<&str>>, + transaction: Option<&mut Transaction<'_, Any>>, + ) -> Result<AnyQueryResult> { + let query_with_placeholders = self.replace_placeholders(query); + + let mut sqlx_query = sqlx::query(&query_with_placeholders); + for arg in args { + sqlx_query = sqlx_query.bind(arg); + } + + match transaction { + Some(t) => sqlx_query.execute(&mut **t).await.map_err(from_sqlx_error), + None => { + let mut tx = self.connection.begin().await.map_err(from_sqlx_error)?; + let result = sqlx_query.execute(&mut *tx).await.map_err(from_sqlx_error); + let _ = tx.commit().await.map_err(from_sqlx_error); + result + } + } + } } #[async_trait] impl Catalog for SqlCatalog { async fn list_namespaces( &self, - _parent: Option<&NamespaceIdent>, + parent: Option<&NamespaceIdent>, ) -> Result<Vec<NamespaceIdent>> { - todo!() + let table_namespaces_stmt = format!( + "SELECT {CATALOG_FIELD_TABLE_NAMESPACE} + FROM {CATALOG_TABLE_NAME} + WHERE {CATALOG_FIELD_CATALOG_NAME} = ?" + ); + let namespaces_stmt = format!( + "SELECT {NAMESPACE_FIELD_NAME} + FROM {NAMESPACE_TABLE_NAME} + WHERE {CATALOG_FIELD_CATALOG_NAME} = ?" + ); + + match parent { + Some(parent) => { + if self.namespace_exists(parent).await? { + let parent_str = parent.join("."); + let parent_table_namespaces_stmt = format!( + "{table_namespaces_stmt} AND {CATALOG_FIELD_TABLE_NAMESPACE} LIKE CONCAT(?, '%')" + ); + let parent_namespaces_stmt = + format!("{namespaces_stmt} AND {NAMESPACE_FIELD_NAME} LIKE CONCAT(?, '%')"); + + let namespace_rows = self + .fetch_rows( + &format!( + "{parent_namespaces_stmt} UNION {parent_table_namespaces_stmt}" + ), + vec![ + Some(&self.name), + Some(&parent_str), + Some(&self.name), + Some(&parent_str), + ], + ) + .await?; + + let mut namespaces = Vec::<NamespaceIdent>::with_capacity(namespace_rows.len()); + + for row in namespace_rows { + let nsp = row.try_get::<String, _>(0).map_err(from_sqlx_error)?; + if nsp != parent_str { + namespaces.push(NamespaceIdent::from_strs(nsp.split("."))?); + } + } + + Ok(namespaces) + } else { + no_such_namespace_err(parent) + } + } + None => { + let namespace_rows = self + .fetch_rows( + &format!("{namespaces_stmt} UNION {table_namespaces_stmt}"), + vec![Some(&self.name), Some(&self.name)], + ) + .await?; + + let mut namespaces = HashSet::<NamespaceIdent>::with_capacity(namespace_rows.len()); Review Comment: Ditto. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org