This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 86efc93  [feat] Support append arrow record batch (#34)
86efc93 is described below

commit 86efc937062816ef726e41726ce16f0179588036
Author: yuxia Luo <[email protected]>
AuthorDate: Tue Oct 21 15:04:50 2025 +0800

    [feat] Support append arrow record batch (#34)
---
 .github/workflows/ci.yml                        |   2 +-
 crates/fluss/src/client/table/append.rs         |  11 +-
 crates/fluss/src/client/write/accumulator.rs    |   4 +-
 crates/fluss/src/client/write/batch.rs          |  22 ++-
 crates/fluss/src/client/write/mod.rs            |  20 ++-
 crates/fluss/src/client/write/sender.rs         |   1 -
 crates/fluss/src/client/write/writer_client.rs  |  33 ++--
 crates/fluss/src/record/arrow.rs                | 211 ++++++++++++++++++------
 crates/fluss/tests/integration/admin.rs         |   8 +-
 crates/fluss/tests/integration/fluss_cluster.rs |  67 +++++---
 crates/fluss/tests/integration/table.rs         | 132 +++++++++++++++
 crates/fluss/tests/integration/utils.rs         |  30 ++++
 crates/fluss/tests/test_fluss.rs                |   3 +
 13 files changed, 443 insertions(+), 101 deletions(-)

diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 73e2b3f..69625f8 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -91,7 +91,7 @@ jobs:
         # only run IT in linux since no docker in macos by default
         run: |
           if [ "$RUNNER_OS" == "Linux" ]; then
-            cargo test --features integration_tests --all-targets --workspace
+            RUST_TEST_THREADS=1 cargo test --features integration_tests 
--all-targets --workspace  -- --nocapture
           fi
         env:
           RUST_LOG: DEBUG
diff --git a/crates/fluss/src/client/table/append.rs 
b/crates/fluss/src/client/table/append.rs
index bf15266..ad3e55e 100644
--- a/crates/fluss/src/client/table/append.rs
+++ b/crates/fluss/src/client/table/append.rs
@@ -16,12 +16,12 @@
 // under the License.
 
 use crate::client::{WriteRecord, WriterClient};
+use crate::error::Result;
 use crate::metadata::{TableInfo, TablePath};
 use crate::row::GenericRow;
+use arrow::array::RecordBatch;
 use std::sync::Arc;
 
-use crate::error::Result;
-
 #[allow(dead_code)]
 pub struct TableAppend {
     table_path: TablePath,
@@ -63,6 +63,13 @@ impl AppendWriter {
         result_handle.result(result)
     }
 
+    pub async fn append_arrow_batch(&self, batch: RecordBatch) -> Result<()> {
+        let record = WriteRecord::new_record_batch(self.table_path.clone(), 
batch);
+        let result_handle = self.writer_client.send(&record).await?;
+        let result = result_handle.wait().await?;
+        result_handle.result(result)
+    }
+
     pub async fn flush(&self) -> Result<()> {
         self.writer_client.flush().await
     }
diff --git a/crates/fluss/src/client/write/accumulator.rs 
b/crates/fluss/src/client/write/accumulator.rs
index 32622c7..e4ca957 100644
--- a/crates/fluss/src/client/write/accumulator.rs
+++ b/crates/fluss/src/client/write/accumulator.rs
@@ -17,7 +17,7 @@
 
 use crate::client::write::batch::WriteBatch::ArrowLog;
 use crate::client::write::batch::{ArrowLogWriteBatch, WriteBatch};
-use crate::client::{ResultHandle, WriteRecord};
+use crate::client::{Record, ResultHandle, WriteRecord};
 use crate::cluster::{BucketLocation, Cluster, ServerNode};
 use crate::config::Config;
 use crate::error::Result;
@@ -105,6 +105,7 @@ impl RecordAccumulator {
             row_type,
             bucket_id,
             current_time_ms(),
+            matches!(record.row, Record::RecordBatch(_)),
         ));
 
         let batch_id = batch.batch_id();
@@ -159,7 +160,6 @@ impl RecordAccumulator {
                 true, false, true,
             ));
         }
-
         self.append_new_batch(cluster, record, bucket_id, &mut dq_guard)
     }
 
diff --git a/crates/fluss/src/client/write/batch.rs 
b/crates/fluss/src/client/write/batch.rs
index 64c5dd6..13b3d36 100644
--- a/crates/fluss/src/client/write/batch.rs
+++ b/crates/fluss/src/client/write/batch.rs
@@ -18,11 +18,10 @@
 use crate::BucketId;
 use crate::client::broadcast::{BatchWriteResult, BroadcastOnce};
 use crate::client::{ResultHandle, WriteRecord};
-use crate::metadata::{DataType, TablePath};
-use std::cmp::max;
-
 use crate::error::Result;
+use crate::metadata::{DataType, TablePath};
 use crate::record::MemoryLogRecordsArrowBuilder;
+use std::cmp::max;
 
 #[allow(dead_code)]
 pub struct InnerWriteBatch {
@@ -140,12 +139,16 @@ impl ArrowLogWriteBatch {
         row_type: &DataType,
         bucket_id: BucketId,
         create_ms: i64,
+        to_append_record_batch: bool,
     ) -> Self {
         let base = InnerWriteBatch::new(batch_id, table_path, create_ms, 
bucket_id);
-
         Self {
             write_batch: base,
-            arrow_builder: MemoryLogRecordsArrowBuilder::new(schema_id, 
row_type),
+            arrow_builder: MemoryLogRecordsArrowBuilder::new(
+                schema_id,
+                row_type,
+                to_append_record_batch,
+            ),
         }
     }
 
@@ -157,8 +160,13 @@ impl ArrowLogWriteBatch {
         if self.arrow_builder.is_closed() || self.arrow_builder.is_full() {
             Ok(None)
         } else {
-            self.arrow_builder.append(&write_record.row)?;
-            Ok(Some(ResultHandle::new(self.write_batch.results.receiver())))
+            // append successfully
+            if self.arrow_builder.append(write_record)? {
+                
Ok(Some(ResultHandle::new(self.write_batch.results.receiver())))
+            } else {
+                // append fail
+                Ok(None)
+            }
         }
     }
 
diff --git a/crates/fluss/src/client/write/mod.rs 
b/crates/fluss/src/client/write/mod.rs
index 74df951..e632cde 100644
--- a/crates/fluss/src/client/write/mod.rs
+++ b/crates/fluss/src/client/write/mod.rs
@@ -23,6 +23,7 @@ use crate::error::Error;
 use crate::metadata::TablePath;
 use crate::row::GenericRow;
 pub use accumulator::*;
+use arrow::array::RecordBatch;
 use std::sync::Arc;
 
 pub(crate) mod broadcast;
@@ -34,13 +35,28 @@ mod writer_client;
 pub use writer_client::WriterClient;
 
 pub struct WriteRecord<'a> {
-    pub row: GenericRow<'a>,
+    pub row: Record<'a>,
     pub table_path: Arc<TablePath>,
 }
 
+pub enum Record<'a> {
+    Row(GenericRow<'a>),
+    RecordBatch(Arc<RecordBatch>),
+}
+
 impl<'a> WriteRecord<'a> {
     pub fn new(table_path: Arc<TablePath>, row: GenericRow<'a>) -> Self {
-        Self { row, table_path }
+        Self {
+            row: Record::Row(row),
+            table_path,
+        }
+    }
+
+    pub fn new_record_batch(table_path: Arc<TablePath>, row: RecordBatch) -> 
Self {
+        Self {
+            row: Record::RecordBatch(Arc::new(row)),
+            table_path,
+        }
     }
 }
 
diff --git a/crates/fluss/src/client/write/sender.rs 
b/crates/fluss/src/client/write/sender.rs
index e25e2ba..27460e3 100644
--- a/crates/fluss/src/client/write/sender.rs
+++ b/crates/fluss/src/client/write/sender.rs
@@ -122,7 +122,6 @@ impl Sender {
         collated: &HashMap<i32, Vec<Arc<ReadyWriteBatch>>>,
     ) -> Result<()> {
         for (leader_id, batches) in collated {
-            println!("send request batch");
             self.send_write_request(*leader_id, self.ack, batches)
                 .await?;
         }
diff --git a/crates/fluss/src/client/write/writer_client.rs 
b/crates/fluss/src/client/write/writer_client.rs
index 01fe289..28f5371 100644
--- a/crates/fluss/src/client/write/writer_client.rs
+++ b/crates/fluss/src/client/write/writer_client.rs
@@ -90,20 +90,12 @@ impl WriterClient {
         let table_path = &record.table_path;
         let cluster = self.metadata.get_cluster();
 
-        let bucket_assigner = {
-            if let Some(assigner) = self.bucket_assigners.get(table_path) {
-                assigner.clone()
-            } else {
-                let assigner = 
Arc::new(Self::create_bucket_assigner(table_path.as_ref()));
-                self.bucket_assigners
-                    .insert(table_path.as_ref().clone(), assigner.clone());
-                assigner
-            }
-        };
+        let (bucket_assigner, bucket_id) = self.assign_bucket(table_path);
 
-        let bucket_id = bucket_assigner.assign_bucket(None, &cluster);
-
-        let mut result = self.accumulate.append(record, 1, &cluster, 
true).await?;
+        let mut result = self
+            .accumulate
+            .append(record, bucket_id, &cluster, true)
+            .await?;
 
         if result.abort_record_for_new_batch {
             let prev_bucket_id = bucket_id;
@@ -121,6 +113,21 @@ impl WriterClient {
 
         Ok(result.result_handle.expect("result_handle should exist"))
     }
+    fn assign_bucket(&self, table_path: &Arc<TablePath>) -> (Arc<Box<dyn 
BucketAssigner>>, i32) {
+        let cluster = self.metadata.get_cluster();
+        let bucket_assigner = {
+            if let Some(assigner) = self.bucket_assigners.get(table_path) {
+                assigner.clone()
+            } else {
+                let assigner = 
Arc::new(Self::create_bucket_assigner(table_path.as_ref()));
+                self.bucket_assigners
+                    .insert(table_path.as_ref().clone(), assigner.clone());
+                assigner
+            }
+        };
+        let bucket_id = bucket_assigner.assign_bucket(None, &cluster);
+        (bucket_assigner, bucket_id)
+    }
 
     pub async fn close(self) -> Result<()> {
         self.shutdown_tx
diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs
index fa63b00..487f50c 100644
--- a/crates/fluss/src/record/arrow.rs
+++ b/crates/fluss/src/record/arrow.rs
@@ -15,6 +15,11 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use crate::client::{Record, WriteRecord};
+use crate::error::Result;
+use crate::metadata::DataType;
+use crate::record::{ChangeType, ScanRecord};
+use crate::row::{ColumnarRow, GenericRow};
 use arrow::array::{
     ArrayBuilder, ArrayRef, BinaryBuilder, BooleanBuilder, Float32Builder, 
Float64Builder,
     Int8Builder, Int16Builder, Int32Builder, Int64Builder, StringBuilder, 
UInt8Builder,
@@ -35,11 +40,6 @@ use std::{
     sync::Arc,
 };
 
-use crate::error::Result;
-use crate::metadata::DataType;
-use crate::record::{ChangeType, ScanRecord};
-use crate::row::{ColumnarRow, GenericRow};
-
 /// const for record batch
 pub const BASE_OFFSET_LENGTH: usize = 8;
 pub const LENGTH_LENGTH: usize = 4;
@@ -95,14 +95,71 @@ pub struct MemoryLogRecordsArrowBuilder {
     magic: u8,
     writer_id: i64,
     batch_sequence: i32,
+    arrow_record_batch_builder: Box<dyn ArrowRecordBatchInnerBuilder>,
+    is_closed: bool,
+}
+
+pub trait ArrowRecordBatchInnerBuilder: Send + Sync {
+    fn build_arrow_record_batch(&self) -> Result<Arc<RecordBatch>>;
+
+    fn append(&mut self, row: &GenericRow) -> Result<bool>;
+
+    fn append_batch(&mut self, record_batch: Arc<RecordBatch>) -> Result<bool>;
+
+    fn schema(&self) -> SchemaRef;
+
+    fn records_count(&self) -> i32;
+
+    fn is_full(&self) -> bool;
+}
+
+#[derive(Default)]
+pub struct PrebuiltRecordBatchBuilder {
+    arrow_record_batch: Option<Arc<RecordBatch>>,
+    records_count: i32,
+}
+
+impl ArrowRecordBatchInnerBuilder for PrebuiltRecordBatchBuilder {
+    fn build_arrow_record_batch(&self) -> Result<Arc<RecordBatch>> {
+        Ok(self.arrow_record_batch.as_ref().unwrap().clone())
+    }
+
+    fn append(&mut self, _row: &GenericRow) -> Result<bool> {
+        // append one single row is not supported, return false directly
+        Ok(false)
+    }
+
+    fn append_batch(&mut self, record_batch: Arc<RecordBatch>) -> Result<bool> 
{
+        if self.arrow_record_batch.is_some() {
+            return Ok(false);
+        }
+        self.records_count = record_batch.num_rows() as i32;
+        self.arrow_record_batch = Some(record_batch);
+        Ok(true)
+    }
+
+    fn schema(&self) -> SchemaRef {
+        self.arrow_record_batch.as_ref().unwrap().schema()
+    }
+
+    fn records_count(&self) -> i32 {
+        self.records_count
+    }
+
+    fn is_full(&self) -> bool {
+        // full if has one record batch
+        self.arrow_record_batch.is_some()
+    }
+}
+
+pub struct RowAppendRecordBatchBuilder {
     table_schema: SchemaRef,
-    record_count: i32,
     arrow_column_builders: Mutex<Vec<Box<dyn ArrayBuilder>>>,
-    is_closed: bool,
+    records_count: i32,
 }
 
-impl MemoryLogRecordsArrowBuilder {
-    pub fn new(schema_id: i32, row_type: &DataType) -> Self {
+impl RowAppendRecordBatchBuilder {
+    pub fn new(row_type: &DataType) -> Self {
         let schema_ref = to_arrow_schema(row_type);
         let builders = Mutex::new(
             schema_ref
@@ -111,32 +168,106 @@ impl MemoryLogRecordsArrowBuilder {
                 .map(|field| Self::create_builder(field.data_type()))
                 .collect(),
         );
+        Self {
+            table_schema: schema_ref.clone(),
+            arrow_column_builders: builders,
+            records_count: 0,
+        }
+    }
+
+    fn create_builder(data_type: &arrow_schema::DataType) -> Box<dyn 
ArrayBuilder> {
+        match data_type {
+            arrow_schema::DataType::Int8 => Box::new(Int8Builder::new()),
+            arrow_schema::DataType::Int16 => Box::new(Int16Builder::new()),
+            arrow_schema::DataType::Int32 => Box::new(Int32Builder::new()),
+            arrow_schema::DataType::Int64 => Box::new(Int64Builder::new()),
+            arrow_schema::DataType::UInt8 => Box::new(UInt8Builder::new()),
+            arrow_schema::DataType::UInt16 => Box::new(UInt16Builder::new()),
+            arrow_schema::DataType::UInt32 => Box::new(UInt32Builder::new()),
+            arrow_schema::DataType::UInt64 => Box::new(UInt64Builder::new()),
+            arrow_schema::DataType::Float32 => Box::new(Float32Builder::new()),
+            arrow_schema::DataType::Float64 => Box::new(Float64Builder::new()),
+            arrow_schema::DataType::Boolean => Box::new(BooleanBuilder::new()),
+            arrow_schema::DataType::Utf8 => Box::new(StringBuilder::new()),
+            arrow_schema::DataType::Binary => Box::new(BinaryBuilder::new()),
+            dt => panic!("Unsupported data type: {dt:?}"),
+        }
+    }
+}
+
+impl ArrowRecordBatchInnerBuilder for RowAppendRecordBatchBuilder {
+    fn build_arrow_record_batch(&self) -> Result<Arc<RecordBatch>> {
+        let arrays = self
+            .arrow_column_builders
+            .lock()
+            .iter_mut()
+            .map(|b| b.finish())
+            .collect::<Vec<ArrayRef>>();
+        Ok(Arc::new(RecordBatch::try_new(
+            self.table_schema.clone(),
+            arrays,
+        )?))
+    }
+
+    fn append(&mut self, row: &GenericRow) -> Result<bool> {
+        for (idx, value) in row.values.iter().enumerate() {
+            let mut builder_binding = self.arrow_column_builders.lock();
+            let builder = builder_binding.get_mut(idx).unwrap();
+            value.append_to(builder.as_mut())?;
+        }
+        self.records_count += 1;
+        Ok(true)
+    }
+
+    fn append_batch(&mut self, _record_batch: Arc<RecordBatch>) -> 
Result<bool> {
+        Ok(false)
+    }
+
+    fn schema(&self) -> SchemaRef {
+        self.table_schema.clone()
+    }
+
+    fn records_count(&self) -> i32 {
+        self.records_count
+    }
+
+    fn is_full(&self) -> bool {
+        self.records_count() >= DEFAULT_MAX_RECORD
+    }
+}
+
+impl MemoryLogRecordsArrowBuilder {
+    pub fn new(schema_id: i32, row_type: &DataType, to_append_record_batch: 
bool) -> Self {
+        let arrow_batch_builder: Box<dyn ArrowRecordBatchInnerBuilder> = {
+            if to_append_record_batch {
+                Box::new(PrebuiltRecordBatchBuilder::default())
+            } else {
+                Box::new(RowAppendRecordBatchBuilder::new(row_type))
+            }
+        };
         MemoryLogRecordsArrowBuilder {
             base_log_offset: BUILDER_DEFAULT_OFFSET,
             schema_id,
             magic: CURRENT_LOG_MAGIC_VALUE,
             writer_id: NO_WRITER_ID,
             batch_sequence: NO_BATCH_SEQUENCE,
-            record_count: 0,
-            table_schema: schema_ref,
-            arrow_column_builders: builders,
             is_closed: false,
+            arrow_record_batch_builder: arrow_batch_builder,
         }
     }
 
-    pub fn append(&mut self, row: &GenericRow) -> Result<()> {
-        for (idx, value) in row.values.iter().enumerate() {
-            let mut builder_binding = self.arrow_column_builders.lock();
-            let builder = builder_binding.get_mut(idx).unwrap();
-            value.append_to(builder.as_mut())?;
+    pub fn append(&mut self, record: &WriteRecord) -> Result<bool> {
+        match &record.row {
+            Record::Row(row) => 
Ok(self.arrow_record_batch_builder.append(row)?),
+            Record::RecordBatch(record_batch) => Ok(self
+                .arrow_record_batch_builder
+                .append_batch(record_batch.clone())?),
         }
-        self.record_count += 1;
         // todo: consider write other change type
-        Ok(())
     }
 
     pub fn is_full(&self) -> bool {
-        self.record_count >= DEFAULT_MAX_RECORD
+        self.arrow_record_batch_builder.records_count() >= DEFAULT_MAX_RECORD
     }
 
     pub fn is_closed(&self) -> bool {
@@ -150,18 +281,12 @@ impl MemoryLogRecordsArrowBuilder {
     pub fn build(&self) -> Result<Vec<u8>> {
         // serialize arrow batch
         let mut arrow_batch_bytes = vec![];
-        let mut writer = StreamWriter::try_new(&mut arrow_batch_bytes, 
&self.table_schema)?;
-
-        let arrays = self
-            .arrow_column_builders
-            .lock()
-            .iter_mut()
-            .map(|b| b.finish())
-            .collect::<Vec<ArrayRef>>();
-        let record_batch = RecordBatch::try_new(self.table_schema.clone(), 
arrays)?;
+        let table_schema = self.arrow_record_batch_builder.schema();
+        let mut writer = StreamWriter::try_new(&mut arrow_batch_bytes, 
&table_schema)?;
         // get header len
         let header = writer.get_ref().len();
-        writer.write(&record_batch)?;
+        let record_batch = 
self.arrow_record_batch_builder.build_arrow_record_batch()?;
+        writer.write(record_batch.as_ref())?;
         // get real arrow batch bytes
         let real_arrow_batch_bytes = &arrow_batch_bytes[header..];
 
@@ -195,39 +320,21 @@ impl MemoryLogRecordsArrowBuilder {
         cursor.write_u32::<LittleEndian>(0)?; // crc placeholder
         cursor.write_i16::<LittleEndian>(self.schema_id as i16)?;
 
+        let record_count = self.arrow_record_batch_builder.records_count();
         // todo: curerntly, always is append only
         let append_only = true;
         cursor.write_u8(if append_only { 1 } else { 0 })?;
-        cursor.write_i32::<LittleEndian>(if self.record_count > 0 {
-            self.record_count - 1
+        cursor.write_i32::<LittleEndian>(if record_count > 0 {
+            record_count - 1
         } else {
             0
         })?;
 
         cursor.write_i64::<LittleEndian>(self.writer_id)?;
         cursor.write_i32::<LittleEndian>(self.batch_sequence)?;
-        cursor.write_i32::<LittleEndian>(self.record_count)?;
+        cursor.write_i32::<LittleEndian>(record_count)?;
         Ok(())
     }
-
-    fn create_builder(data_type: &arrow_schema::DataType) -> Box<dyn 
ArrayBuilder> {
-        match data_type {
-            arrow_schema::DataType::Int8 => Box::new(Int8Builder::new()),
-            arrow_schema::DataType::Int16 => Box::new(Int16Builder::new()),
-            arrow_schema::DataType::Int32 => Box::new(Int32Builder::new()),
-            arrow_schema::DataType::Int64 => Box::new(Int64Builder::new()),
-            arrow_schema::DataType::UInt8 => Box::new(UInt8Builder::new()),
-            arrow_schema::DataType::UInt16 => Box::new(UInt16Builder::new()),
-            arrow_schema::DataType::UInt32 => Box::new(UInt32Builder::new()),
-            arrow_schema::DataType::UInt64 => Box::new(UInt64Builder::new()),
-            arrow_schema::DataType::Float32 => Box::new(Float32Builder::new()),
-            arrow_schema::DataType::Float64 => Box::new(Float64Builder::new()),
-            arrow_schema::DataType::Boolean => Box::new(BooleanBuilder::new()),
-            arrow_schema::DataType::Utf8 => Box::new(StringBuilder::new()),
-            arrow_schema::DataType::Binary => Box::new(BinaryBuilder::new()),
-            dt => panic!("Unsupported data type: {dt:?}"),
-        }
-    }
 }
 
 pub trait ToArrow {
diff --git a/crates/fluss/tests/integration/admin.rs 
b/crates/fluss/tests/integration/admin.rs
index 0d958a5..c51373d 100644
--- a/crates/fluss/tests/integration/admin.rs
+++ b/crates/fluss/tests/integration/admin.rs
@@ -38,20 +38,24 @@ mod admin_test {
         TablePath,
     };
     use std::sync::Arc;
+    use std::thread;
 
     fn before_all() {
         // Create a new tokio runtime in a separate thread
         let cluster_guard = SHARED_FLUSS_CLUSTER.clone();
-        std::thread::spawn(move || {
+        thread::spawn(move || {
             let rt = tokio::runtime::Runtime::new().expect("Failed to create 
runtime");
             rt.block_on(async {
-                let cluster = FlussTestingClusterBuilder::new().build().await;
+                let cluster = 
FlussTestingClusterBuilder::new("test-admin").build().await;
                 let mut guard = cluster_guard.write();
                 *guard = Some(cluster);
             });
         })
         .join()
         .expect("Failed to create cluster");
+        // wait for 20 seconds to avoid the error like
+        // CoordinatorEventProcessor is not initialized yet
+        thread::sleep(std::time::Duration::from_secs(20));
     }
 
     fn get_fluss_cluster() -> Arc<FlussTestingCluster> {
diff --git a/crates/fluss/tests/integration/fluss_cluster.rs 
b/crates/fluss/tests/integration/fluss_cluster.rs
index 83a4795..e827e14 100644
--- a/crates/fluss/tests/integration/fluss_cluster.rs
+++ b/crates/fluss/tests/integration/fluss_cluster.rs
@@ -28,13 +28,14 @@ use testcontainers::{ContainerAsync, GenericImage, 
ImageExt};
 const FLUSS_VERSION: &str = "0.7.0";
 
 pub struct FlussTestingClusterBuilder {
-    number_of_tablet_servers: usize,
+    number_of_tablet_servers: i32,
     network: &'static str,
     cluster_conf: HashMap<String, String>,
+    testing_name: String,
 }
 
 impl FlussTestingClusterBuilder {
-    pub fn new() -> Self {
+    pub fn new(testing_name: impl Into<String>) -> Self {
         // reduce testing resources
         let mut cluster_conf = HashMap::new();
         cluster_conf.insert(
@@ -50,14 +51,27 @@ impl FlussTestingClusterBuilder {
             number_of_tablet_servers: 1,
             cluster_conf,
             network: "fluss-cluster-network",
+            testing_name: testing_name.into(),
         }
     }
 
+    fn tablet_server_container_name(&self, server_id: i32) -> String {
+        format!("tablet-server-{}-{}", self.testing_name, server_id)
+    }
+
+    fn coordinator_server_container_name(&self) -> String {
+        format!("coordinator-server-{}", self.testing_name)
+    }
+
+    fn zookeeper_container_name(&self) -> String {
+        format!("zookeeper-{}", self.testing_name)
+    }
+
     pub async fn build(&mut self) -> FlussTestingCluster {
         let zookeeper = Arc::new(
             GenericImage::new("zookeeper", "3.9.2")
                 .with_network(self.network)
-                .with_container_name("zookeeper")
+                .with_container_name(self.zookeeper_container_name())
                 .start()
                 .await
                 .unwrap(),
@@ -83,15 +97,25 @@ impl FlussTestingClusterBuilder {
 
     async fn start_coordinator_server(&mut self) -> 
ContainerAsync<GenericImage> {
         let mut coordinator_confs = HashMap::new();
-        coordinator_confs.insert("zookeeper.address", "zookeeper:2181");
+        coordinator_confs.insert(
+            "zookeeper.address",
+            format!("{}:2181", self.zookeeper_container_name()),
+        );
         coordinator_confs.insert(
             "bind.listeners",
-            "INTERNAL://coordinator-server:0, 
CLIENT://coordinator-server:9123",
+            format!(
+                "INTERNAL://{}:0, CLIENT://{}:9123",
+                self.coordinator_server_container_name(),
+                self.coordinator_server_container_name()
+            ),
         );
-        coordinator_confs.insert("advertised.listeners", 
"CLIENT://localhost:9123");
-        coordinator_confs.insert("internal.listener.name", "INTERNAL");
+        coordinator_confs.insert(
+            "advertised.listeners",
+            "CLIENT://localhost:9123".to_string(),
+        );
+        coordinator_confs.insert("internal.listener.name", 
"INTERNAL".to_string());
         GenericImage::new("fluss/fluss", FLUSS_VERSION)
-            .with_container_name("coordinator-server")
+            .with_container_name(self.coordinator_server_container_name())
             .with_mapped_port(9123, ContainerPort::Tcp(9123))
             .with_network(self.network)
             .with_cmd(vec!["coordinatorServer"])
@@ -104,26 +128,30 @@ impl FlussTestingClusterBuilder {
             .unwrap()
     }
 
-    async fn start_tablet_server(&self, server_id: usize) -> 
ContainerAsync<GenericImage> {
+    async fn start_tablet_server(&self, server_id: i32) -> 
ContainerAsync<GenericImage> {
         let mut tablet_server_confs = HashMap::new();
         let bind_listeners = format!(
-            "INTERNAL://tablet-server-{}:0, CLIENT://tablet-server-{}:9123",
-            server_id, server_id
+            "INTERNAL://{}:0, CLIENT://{}:9123",
+            self.tablet_server_container_name(server_id),
+            self.tablet_server_container_name(server_id),
         );
         let expose_host_port = 9124 + server_id;
         let advertised_listeners = format!("CLIENT://localhost:{}", 
expose_host_port);
         let tablet_server_id = format!("{}", server_id);
-        tablet_server_confs.insert("zookeeper.address", "zookeeper:2181");
-        tablet_server_confs.insert("bind.listeners", bind_listeners.as_str());
-        tablet_server_confs.insert("advertised.listeners", 
advertised_listeners.as_str());
-        tablet_server_confs.insert("internal.listener.name", "INTERNAL");
-        tablet_server_confs.insert("tablet-server.id", 
tablet_server_id.as_str());
+        tablet_server_confs.insert(
+            "zookeeper.address",
+            format!("{}:2181", self.zookeeper_container_name()),
+        );
+        tablet_server_confs.insert("bind.listeners", bind_listeners);
+        tablet_server_confs.insert("advertised.listeners", 
advertised_listeners);
+        tablet_server_confs.insert("internal.listener.name", 
"INTERNAL".to_string());
+        tablet_server_confs.insert("tablet-server.id", tablet_server_id);
 
         GenericImage::new("fluss/fluss", FLUSS_VERSION)
             .with_cmd(vec!["tabletServer"])
             .with_mapped_port(expose_host_port as u16, 
ContainerPort::Tcp(9123))
             .with_network(self.network)
-            .with_container_name(format!("tablet-server-{}", server_id))
+            .with_container_name(self.tablet_server_container_name(server_id))
             .with_env_var(
                 "FLUSS_PROPERTIES",
                 self.to_fluss_properties_with(tablet_server_confs),
@@ -133,7 +161,7 @@ impl FlussTestingClusterBuilder {
             .unwrap()
     }
 
-    fn to_fluss_properties_with(&self, extra_properties: HashMap<&str, &str>) 
-> String {
+    fn to_fluss_properties_with(&self, extra_properties: HashMap<&str, 
String>) -> String {
         let mut fluss_properties = Vec::new();
         for (k, v) in self.cluster_conf.iter() {
             fluss_properties.push(format!("{}: {}", k, v));
@@ -150,7 +178,7 @@ impl FlussTestingClusterBuilder {
 pub struct FlussTestingCluster {
     zookeeper: Arc<ContainerAsync<GenericImage>>,
     coordinator_server: Arc<ContainerAsync<GenericImage>>,
-    tablet_servers: HashMap<usize, Arc<ContainerAsync<GenericImage>>>,
+    tablet_servers: HashMap<i32, Arc<ContainerAsync<GenericImage>>>,
     bootstrap_servers: String,
 }
 
@@ -165,6 +193,7 @@ impl FlussTestingCluster {
 
     pub async fn get_fluss_connection(&self) -> FlussConnection {
         let mut config = Config::default();
+        config.writer_acks = "all".to_string();
         config.bootstrap_server = Some(self.bootstrap_servers.clone());
 
         // Retry mechanism: retry for up to 1 minute
diff --git a/crates/fluss/tests/integration/table.rs 
b/crates/fluss/tests/integration/table.rs
new file mode 100644
index 0000000..a1a6cb2
--- /dev/null
+++ b/crates/fluss/tests/integration/table.rs
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+use crate::integration::fluss_cluster::FlussTestingCluster;
+use once_cell::sync::Lazy;
+use parking_lot::RwLock;
+use std::sync::Arc;
+
+#[cfg(test)]
+use test_env_helpers::*;
+
+// Module-level shared cluster instance (only for this test file)
+static SHARED_FLUSS_CLUSTER: Lazy<Arc<RwLock<Option<FlussTestingCluster>>>> =
+    Lazy::new(|| Arc::new(RwLock::new(None)));
+
+#[cfg(test)]
+#[before_all]
+#[after_all]
+mod table_test {
+    use super::SHARED_FLUSS_CLUSTER;
+    use crate::integration::fluss_cluster::{FlussTestingCluster, 
FlussTestingClusterBuilder};
+    use crate::integration::utils::create_table;
+    use arrow::array::record_batch;
+    use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath};
+    use std::sync::Arc;
+    use std::sync::atomic::AtomicUsize;
+    use std::thread;
+    fn before_all() {
+        // Create a new tokio runtime in a separate thread
+        let cluster_guard = SHARED_FLUSS_CLUSTER.clone();
+        std::thread::spawn(move || {
+            let rt = tokio::runtime::Runtime::new().expect("Failed to create 
runtime");
+            rt.block_on(async {
+                let cluster = 
FlussTestingClusterBuilder::new("test_table").build().await;
+                let mut guard = cluster_guard.write();
+                *guard = Some(cluster);
+            });
+        })
+        .join()
+        .expect("Failed to create cluster");
+        // wait for 20 seconds to avoid the error like
+        // CoordinatorEventProcessor is not initialized yet
+        thread::sleep(std::time::Duration::from_secs(20));
+    }
+
+    fn get_fluss_cluster() -> Arc<FlussTestingCluster> {
+        let cluster_guard = SHARED_FLUSS_CLUSTER.read();
+        if cluster_guard.is_none() {
+            panic!("Fluss cluster not initialized. Make sure before_all() was 
called.");
+        }
+        Arc::new(cluster_guard.as_ref().unwrap().clone())
+    }
+
+    fn after_all() {
+        // Create a new tokio runtime in a separate thread
+        let cluster_guard = SHARED_FLUSS_CLUSTER.clone();
+        std::thread::spawn(move || {
+            let rt = tokio::runtime::Runtime::new().expect("Failed to create 
runtime");
+            rt.block_on(async {
+                let mut guard = cluster_guard.write();
+                if let Some(cluster) = guard.take() {
+                    cluster.stop().await;
+                }
+            });
+        })
+        .join()
+        .expect("Failed to cleanup cluster");
+    }
+
+    #[tokio::test]
+    async fn append_record_batch() {
+        let cluster = get_fluss_cluster();
+        let connection = cluster.get_fluss_connection().await;
+
+        let admin = connection.get_admin().await.expect("Failed to get admin");
+
+        let table_path =
+            TablePath::new("fluss".to_string(), 
"test_append_record_batch".to_string());
+
+        let table_descriptor = TableDescriptor::builder()
+            .schema(
+                Schema::builder()
+                    .column("c1", DataTypes::int())
+                    .column("c2", DataTypes::string())
+                    .build()
+                    .expect("Failed to build schema"),
+            )
+            .build()
+            .expect("Failed to build table");
+
+        create_table(&admin, &table_path, &table_descriptor).await;
+
+        let append_writer = connection
+            .get_table(&table_path)
+            .await
+            .expect("Failed to get table")
+            .new_append()
+            .expect("Failed to create append")
+            .create_writer();
+
+        let batch1 =
+            record_batch!(("c1", Int32, [1, 2, 3]), ("c2", Utf8, ["a1", "a2", 
"a3"])).unwrap();
+        append_writer
+            .append_arrow_batch(batch1)
+            .await
+            .expect("Failed to append batch");
+
+        let batch2 =
+            record_batch!(("c1", Int32, [4, 5, 6]), ("c2", Utf8, ["a4", "a5", 
"a6"])).unwrap();
+        append_writer
+            .append_arrow_batch(batch2)
+            .await
+            .expect("Failed to append batch");
+
+        // todo: add scan code to verify the records appended in #30
+    }
+}
diff --git a/crates/fluss/tests/integration/utils.rs 
b/crates/fluss/tests/integration/utils.rs
new file mode 100644
index 0000000..cd1f6cc
--- /dev/null
+++ b/crates/fluss/tests/integration/utils.rs
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+use fluss::client::FlussAdmin;
+use fluss::metadata::{TableDescriptor, TablePath};
+
+pub async fn create_table(
+    admin: &FlussAdmin,
+    table_path: &TablePath,
+    table_descriptor: &TableDescriptor,
+) {
+    admin
+        .create_table(&table_path, &table_descriptor, false)
+        .await
+        .expect("Failed to create table");
+}
diff --git a/crates/fluss/tests/test_fluss.rs b/crates/fluss/tests/test_fluss.rs
index 28b9bef..a15ca23 100644
--- a/crates/fluss/tests/test_fluss.rs
+++ b/crates/fluss/tests/test_fluss.rs
@@ -22,4 +22,7 @@ extern crate fluss;
 mod integration {
     mod admin;
     mod fluss_cluster;
+    mod table;
+
+    mod utils;
 }


Reply via email to