This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 4831808  [test] Add IT for table operation in admin (#32)
4831808 is described below

commit 4831808763cb59d2bd6aca1af8c73eddd529c100
Author: Junbo Wang <[email protected]>
AuthorDate: Thu Oct 16 21:02:37 2025 +0800

    [test] Add IT for table operation in admin (#32)
    
    ---------
    
    Co-authored-by: 王俊博(wangjunbo) <[email protected]>
    Co-authored-by: luoyuxia <[email protected]>
---
 crates/fluss/src/metadata/table.rs      |   6 +-
 crates/fluss/tests/integration/admin.rs | 122 +++++++++++++++++++++++++++++++-
 2 files changed, 123 insertions(+), 5 deletions(-)

diff --git a/crates/fluss/src/metadata/table.rs 
b/crates/fluss/src/metadata/table.rs
index 2b48ec6..751dd6d 100644
--- a/crates/fluss/src/metadata/table.rs
+++ b/crates/fluss/src/metadata/table.rs
@@ -23,7 +23,7 @@ use serde::{Deserialize, Serialize};
 use std::collections::{HashMap, HashSet};
 use std::fmt::{Display, Formatter};
 
-#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
+#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
 pub struct Column {
     name: String,
     data_type: DataType,
@@ -66,7 +66,7 @@ impl Column {
     }
 }
 
-#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
+#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
 pub struct PrimaryKey {
     constraint_name: String,
     column_names: Vec<String>,
@@ -90,7 +90,7 @@ impl PrimaryKey {
     }
 }
 
-#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
+#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
 pub struct Schema {
     columns: Vec<Column>,
     primary_key: Option<PrimaryKey>,
diff --git a/crates/fluss/tests/integration/admin.rs 
b/crates/fluss/tests/integration/admin.rs
index 73f52db..0d958a5 100644
--- a/crates/fluss/tests/integration/admin.rs
+++ b/crates/fluss/tests/integration/admin.rs
@@ -33,7 +33,10 @@ static SHARED_FLUSS_CLUSTER: 
Lazy<Arc<RwLock<Option<FlussTestingCluster>>>> =
 mod admin_test {
     use super::SHARED_FLUSS_CLUSTER;
     use crate::integration::fluss_cluster::{FlussTestingCluster, 
FlussTestingClusterBuilder};
-    use fluss::metadata::DatabaseDescriptorBuilder;
+    use fluss::metadata::{
+        DataTypes, DatabaseDescriptorBuilder, KvFormat, LogFormat, Schema, 
TableDescriptor,
+        TablePath,
+    };
     use std::sync::Arc;
 
     fn before_all() {
@@ -126,6 +129,121 @@ mod admin_test {
 
     #[tokio::test]
     async fn test_create_table() {
-        // todo
+        let cluster = get_fluss_cluster();
+        let connection = cluster.get_fluss_connection().await;
+        let admin = connection
+            .get_admin()
+            .await
+            .expect("Failed to get admin client");
+
+        let test_db_name = "test_create_table_db";
+        let db_descriptor = DatabaseDescriptorBuilder::default()
+            .comment("Database for test_create_table")
+            .build();
+
+        assert_eq!(admin.database_exists(test_db_name).await.unwrap(), false);
+        admin
+            .create_database(test_db_name, false, Some(&db_descriptor))
+            .await
+            .expect("Failed to create test database");
+
+        let test_table_name = "test_user_table";
+        let table_path = TablePath::new(test_db_name.to_string(), 
test_table_name.to_string());
+
+        // build table schema
+        let table_schema = Schema::builder()
+            .column("id", DataTypes::int())
+            .column("name", DataTypes::string())
+            .column("age", DataTypes::int())
+            .with_comment("User's age (optional)")
+            .column("email", DataTypes::string())
+            .primary_key(vec!["id".to_string()])
+            .build()
+            .expect("Failed to build table schema");
+
+        // build table descriptor
+        let table_descriptor = TableDescriptor::builder()
+            .schema(table_schema.clone())
+            .comment("Test table for user data (id, name, age, email)")
+            .distributed_by(Some(3), vec!["id".to_string()])
+            .property("table.replication.factor", "1")
+            .log_format(LogFormat::ARROW)
+            .kv_format(KvFormat::INDEXED)
+            .build()
+            .expect("Failed to build table descriptor");
+
+        // create test table
+        admin
+            .create_table(&table_path, &table_descriptor, false)
+            .await
+            .expect("Failed to create test table");
+
+        assert!(
+            admin.table_exists(&table_path).await.unwrap(),
+            "Table {:?} should exist after creation",
+            table_path
+        );
+
+        let tables = admin.list_tables(test_db_name).await.unwrap();
+        assert_eq!(
+            tables.len(),
+            1,
+            "There should be exactly one table in the database"
+        );
+        assert!(
+            tables.contains(&test_table_name.to_string()),
+            "Table list should contain the created table"
+        );
+
+        let table_info = admin
+            .get_table(&table_path)
+            .await
+            .expect("Failed to get table info");
+
+        // verify table comment
+        assert_eq!(
+            table_info.get_comment(),
+            Some("Test table for user data (id, name, age, email)"),
+            "Table comment mismatch"
+        );
+
+        // verify schema columns
+        let actual_schema = table_info.get_schema();
+        assert_eq!(actual_schema, table_descriptor.schema(), "Schema 
mismatch");
+
+        // verify primary key
+        assert_eq!(
+            table_info.get_primary_keys(),
+            &vec!["id".to_string()],
+            "Primary key columns mismatch"
+        );
+
+        // verify distribution and properties
+        assert_eq!(table_info.get_num_buckets(), 3, "Bucket count mismatch");
+        assert_eq!(
+            table_info.get_bucket_keys(),
+            &vec!["id".to_string()],
+            "Bucket keys mismatch"
+        );
+
+        assert_eq!(
+            table_info.get_properties(),
+            table_descriptor.properties(),
+            "Properties mismatch"
+        );
+
+        // drop table
+        admin
+            .drop_table(&table_path, false)
+            .await
+            .expect("Failed to drop table");
+        // table shouldn't exist now
+        assert_eq!(admin.table_exists(&table_path).await.unwrap(), false);
+
+        // drop database
+        admin.drop_database(test_db_name, false, true).await;
+
+        // database shouldn't exist now
+        assert_eq!(admin.database_exists(test_db_name).await.unwrap(), false);
     }
 }

Reply via email to