This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 86efc93 [feat] Support append arrow record batch (#34)
86efc93 is described below
commit 86efc937062816ef726e41726ce16f0179588036
Author: yuxia Luo <[email protected]>
AuthorDate: Tue Oct 21 15:04:50 2025 +0800
[feat] Support append arrow record batch (#34)
---
.github/workflows/ci.yml | 2 +-
crates/fluss/src/client/table/append.rs | 11 +-
crates/fluss/src/client/write/accumulator.rs | 4 +-
crates/fluss/src/client/write/batch.rs | 22 ++-
crates/fluss/src/client/write/mod.rs | 20 ++-
crates/fluss/src/client/write/sender.rs | 1 -
crates/fluss/src/client/write/writer_client.rs | 33 ++--
crates/fluss/src/record/arrow.rs | 211 ++++++++++++++++++------
crates/fluss/tests/integration/admin.rs | 8 +-
crates/fluss/tests/integration/fluss_cluster.rs | 67 +++++---
crates/fluss/tests/integration/table.rs | 132 +++++++++++++++
crates/fluss/tests/integration/utils.rs | 30 ++++
crates/fluss/tests/test_fluss.rs | 3 +
13 files changed, 443 insertions(+), 101 deletions(-)
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 73e2b3f..69625f8 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -91,7 +91,7 @@ jobs:
# only run IT in linux since no docker in macos by default
run: |
if [ "$RUNNER_OS" == "Linux" ]; then
- cargo test --features integration_tests --all-targets --workspace
+ RUST_TEST_THREADS=1 cargo test --features integration_tests
--all-targets --workspace -- --nocapture
fi
env:
RUST_LOG: DEBUG
diff --git a/crates/fluss/src/client/table/append.rs
b/crates/fluss/src/client/table/append.rs
index bf15266..ad3e55e 100644
--- a/crates/fluss/src/client/table/append.rs
+++ b/crates/fluss/src/client/table/append.rs
@@ -16,12 +16,12 @@
// under the License.
use crate::client::{WriteRecord, WriterClient};
+use crate::error::Result;
use crate::metadata::{TableInfo, TablePath};
use crate::row::GenericRow;
+use arrow::array::RecordBatch;
use std::sync::Arc;
-use crate::error::Result;
-
#[allow(dead_code)]
pub struct TableAppend {
table_path: TablePath,
@@ -63,6 +63,13 @@ impl AppendWriter {
result_handle.result(result)
}
+ pub async fn append_arrow_batch(&self, batch: RecordBatch) -> Result<()> {
+ let record = WriteRecord::new_record_batch(self.table_path.clone(),
batch);
+ let result_handle = self.writer_client.send(&record).await?;
+ let result = result_handle.wait().await?;
+ result_handle.result(result)
+ }
+
pub async fn flush(&self) -> Result<()> {
self.writer_client.flush().await
}
diff --git a/crates/fluss/src/client/write/accumulator.rs
b/crates/fluss/src/client/write/accumulator.rs
index 32622c7..e4ca957 100644
--- a/crates/fluss/src/client/write/accumulator.rs
+++ b/crates/fluss/src/client/write/accumulator.rs
@@ -17,7 +17,7 @@
use crate::client::write::batch::WriteBatch::ArrowLog;
use crate::client::write::batch::{ArrowLogWriteBatch, WriteBatch};
-use crate::client::{ResultHandle, WriteRecord};
+use crate::client::{Record, ResultHandle, WriteRecord};
use crate::cluster::{BucketLocation, Cluster, ServerNode};
use crate::config::Config;
use crate::error::Result;
@@ -105,6 +105,7 @@ impl RecordAccumulator {
row_type,
bucket_id,
current_time_ms(),
+ matches!(record.row, Record::RecordBatch(_)),
));
let batch_id = batch.batch_id();
@@ -159,7 +160,6 @@ impl RecordAccumulator {
true, false, true,
));
}
-
self.append_new_batch(cluster, record, bucket_id, &mut dq_guard)
}
diff --git a/crates/fluss/src/client/write/batch.rs
b/crates/fluss/src/client/write/batch.rs
index 64c5dd6..13b3d36 100644
--- a/crates/fluss/src/client/write/batch.rs
+++ b/crates/fluss/src/client/write/batch.rs
@@ -18,11 +18,10 @@
use crate::BucketId;
use crate::client::broadcast::{BatchWriteResult, BroadcastOnce};
use crate::client::{ResultHandle, WriteRecord};
-use crate::metadata::{DataType, TablePath};
-use std::cmp::max;
-
use crate::error::Result;
+use crate::metadata::{DataType, TablePath};
use crate::record::MemoryLogRecordsArrowBuilder;
+use std::cmp::max;
#[allow(dead_code)]
pub struct InnerWriteBatch {
@@ -140,12 +139,16 @@ impl ArrowLogWriteBatch {
row_type: &DataType,
bucket_id: BucketId,
create_ms: i64,
+ to_append_record_batch: bool,
) -> Self {
let base = InnerWriteBatch::new(batch_id, table_path, create_ms,
bucket_id);
-
Self {
write_batch: base,
- arrow_builder: MemoryLogRecordsArrowBuilder::new(schema_id,
row_type),
+ arrow_builder: MemoryLogRecordsArrowBuilder::new(
+ schema_id,
+ row_type,
+ to_append_record_batch,
+ ),
}
}
@@ -157,8 +160,13 @@ impl ArrowLogWriteBatch {
if self.arrow_builder.is_closed() || self.arrow_builder.is_full() {
Ok(None)
} else {
- self.arrow_builder.append(&write_record.row)?;
- Ok(Some(ResultHandle::new(self.write_batch.results.receiver())))
+ // append successfully
+ if self.arrow_builder.append(write_record)? {
+
Ok(Some(ResultHandle::new(self.write_batch.results.receiver())))
+ } else {
+ // append fail
+ Ok(None)
+ }
}
}
diff --git a/crates/fluss/src/client/write/mod.rs
b/crates/fluss/src/client/write/mod.rs
index 74df951..e632cde 100644
--- a/crates/fluss/src/client/write/mod.rs
+++ b/crates/fluss/src/client/write/mod.rs
@@ -23,6 +23,7 @@ use crate::error::Error;
use crate::metadata::TablePath;
use crate::row::GenericRow;
pub use accumulator::*;
+use arrow::array::RecordBatch;
use std::sync::Arc;
pub(crate) mod broadcast;
@@ -34,13 +35,28 @@ mod writer_client;
pub use writer_client::WriterClient;
pub struct WriteRecord<'a> {
- pub row: GenericRow<'a>,
+ pub row: Record<'a>,
pub table_path: Arc<TablePath>,
}
+pub enum Record<'a> {
+ Row(GenericRow<'a>),
+ RecordBatch(Arc<RecordBatch>),
+}
+
impl<'a> WriteRecord<'a> {
pub fn new(table_path: Arc<TablePath>, row: GenericRow<'a>) -> Self {
- Self { row, table_path }
+ Self {
+ row: Record::Row(row),
+ table_path,
+ }
+ }
+
+ pub fn new_record_batch(table_path: Arc<TablePath>, row: RecordBatch) ->
Self {
+ Self {
+ row: Record::RecordBatch(Arc::new(row)),
+ table_path,
+ }
}
}
diff --git a/crates/fluss/src/client/write/sender.rs
b/crates/fluss/src/client/write/sender.rs
index e25e2ba..27460e3 100644
--- a/crates/fluss/src/client/write/sender.rs
+++ b/crates/fluss/src/client/write/sender.rs
@@ -122,7 +122,6 @@ impl Sender {
collated: &HashMap<i32, Vec<Arc<ReadyWriteBatch>>>,
) -> Result<()> {
for (leader_id, batches) in collated {
- println!("send request batch");
self.send_write_request(*leader_id, self.ack, batches)
.await?;
}
diff --git a/crates/fluss/src/client/write/writer_client.rs
b/crates/fluss/src/client/write/writer_client.rs
index 01fe289..28f5371 100644
--- a/crates/fluss/src/client/write/writer_client.rs
+++ b/crates/fluss/src/client/write/writer_client.rs
@@ -90,20 +90,12 @@ impl WriterClient {
let table_path = &record.table_path;
let cluster = self.metadata.get_cluster();
- let bucket_assigner = {
- if let Some(assigner) = self.bucket_assigners.get(table_path) {
- assigner.clone()
- } else {
- let assigner =
Arc::new(Self::create_bucket_assigner(table_path.as_ref()));
- self.bucket_assigners
- .insert(table_path.as_ref().clone(), assigner.clone());
- assigner
- }
- };
+ let (bucket_assigner, bucket_id) = self.assign_bucket(table_path);
- let bucket_id = bucket_assigner.assign_bucket(None, &cluster);
-
- let mut result = self.accumulate.append(record, 1, &cluster,
true).await?;
+ let mut result = self
+ .accumulate
+ .append(record, bucket_id, &cluster, true)
+ .await?;
if result.abort_record_for_new_batch {
let prev_bucket_id = bucket_id;
@@ -121,6 +113,21 @@ impl WriterClient {
Ok(result.result_handle.expect("result_handle should exist"))
}
+ fn assign_bucket(&self, table_path: &Arc<TablePath>) -> (Arc<Box<dyn
BucketAssigner>>, i32) {
+ let cluster = self.metadata.get_cluster();
+ let bucket_assigner = {
+ if let Some(assigner) = self.bucket_assigners.get(table_path) {
+ assigner.clone()
+ } else {
+ let assigner =
Arc::new(Self::create_bucket_assigner(table_path.as_ref()));
+ self.bucket_assigners
+ .insert(table_path.as_ref().clone(), assigner.clone());
+ assigner
+ }
+ };
+ let bucket_id = bucket_assigner.assign_bucket(None, &cluster);
+ (bucket_assigner, bucket_id)
+ }
pub async fn close(self) -> Result<()> {
self.shutdown_tx
diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs
index fa63b00..487f50c 100644
--- a/crates/fluss/src/record/arrow.rs
+++ b/crates/fluss/src/record/arrow.rs
@@ -15,6 +15,11 @@
// specific language governing permissions and limitations
// under the License.
+use crate::client::{Record, WriteRecord};
+use crate::error::Result;
+use crate::metadata::DataType;
+use crate::record::{ChangeType, ScanRecord};
+use crate::row::{ColumnarRow, GenericRow};
use arrow::array::{
ArrayBuilder, ArrayRef, BinaryBuilder, BooleanBuilder, Float32Builder,
Float64Builder,
Int8Builder, Int16Builder, Int32Builder, Int64Builder, StringBuilder,
UInt8Builder,
@@ -35,11 +40,6 @@ use std::{
sync::Arc,
};
-use crate::error::Result;
-use crate::metadata::DataType;
-use crate::record::{ChangeType, ScanRecord};
-use crate::row::{ColumnarRow, GenericRow};
-
/// const for record batch
pub const BASE_OFFSET_LENGTH: usize = 8;
pub const LENGTH_LENGTH: usize = 4;
@@ -95,14 +95,71 @@ pub struct MemoryLogRecordsArrowBuilder {
magic: u8,
writer_id: i64,
batch_sequence: i32,
+ arrow_record_batch_builder: Box<dyn ArrowRecordBatchInnerBuilder>,
+ is_closed: bool,
+}
+
+pub trait ArrowRecordBatchInnerBuilder: Send + Sync {
+ fn build_arrow_record_batch(&self) -> Result<Arc<RecordBatch>>;
+
+ fn append(&mut self, row: &GenericRow) -> Result<bool>;
+
+ fn append_batch(&mut self, record_batch: Arc<RecordBatch>) -> Result<bool>;
+
+ fn schema(&self) -> SchemaRef;
+
+ fn records_count(&self) -> i32;
+
+ fn is_full(&self) -> bool;
+}
+
+#[derive(Default)]
+pub struct PrebuiltRecordBatchBuilder {
+ arrow_record_batch: Option<Arc<RecordBatch>>,
+ records_count: i32,
+}
+
+impl ArrowRecordBatchInnerBuilder for PrebuiltRecordBatchBuilder {
+ fn build_arrow_record_batch(&self) -> Result<Arc<RecordBatch>> {
+ Ok(self.arrow_record_batch.as_ref().unwrap().clone())
+ }
+
+ fn append(&mut self, _row: &GenericRow) -> Result<bool> {
+ // append one single row is not supported, return false directly
+ Ok(false)
+ }
+
+ fn append_batch(&mut self, record_batch: Arc<RecordBatch>) -> Result<bool>
{
+ if self.arrow_record_batch.is_some() {
+ return Ok(false);
+ }
+ self.records_count = record_batch.num_rows() as i32;
+ self.arrow_record_batch = Some(record_batch);
+ Ok(true)
+ }
+
+ fn schema(&self) -> SchemaRef {
+ self.arrow_record_batch.as_ref().unwrap().schema()
+ }
+
+ fn records_count(&self) -> i32 {
+ self.records_count
+ }
+
+ fn is_full(&self) -> bool {
+ // full if has one record batch
+ self.arrow_record_batch.is_some()
+ }
+}
+
+pub struct RowAppendRecordBatchBuilder {
table_schema: SchemaRef,
- record_count: i32,
arrow_column_builders: Mutex<Vec<Box<dyn ArrayBuilder>>>,
- is_closed: bool,
+ records_count: i32,
}
-impl MemoryLogRecordsArrowBuilder {
- pub fn new(schema_id: i32, row_type: &DataType) -> Self {
+impl RowAppendRecordBatchBuilder {
+ pub fn new(row_type: &DataType) -> Self {
let schema_ref = to_arrow_schema(row_type);
let builders = Mutex::new(
schema_ref
@@ -111,32 +168,106 @@ impl MemoryLogRecordsArrowBuilder {
.map(|field| Self::create_builder(field.data_type()))
.collect(),
);
+ Self {
+ table_schema: schema_ref.clone(),
+ arrow_column_builders: builders,
+ records_count: 0,
+ }
+ }
+
+ fn create_builder(data_type: &arrow_schema::DataType) -> Box<dyn
ArrayBuilder> {
+ match data_type {
+ arrow_schema::DataType::Int8 => Box::new(Int8Builder::new()),
+ arrow_schema::DataType::Int16 => Box::new(Int16Builder::new()),
+ arrow_schema::DataType::Int32 => Box::new(Int32Builder::new()),
+ arrow_schema::DataType::Int64 => Box::new(Int64Builder::new()),
+ arrow_schema::DataType::UInt8 => Box::new(UInt8Builder::new()),
+ arrow_schema::DataType::UInt16 => Box::new(UInt16Builder::new()),
+ arrow_schema::DataType::UInt32 => Box::new(UInt32Builder::new()),
+ arrow_schema::DataType::UInt64 => Box::new(UInt64Builder::new()),
+ arrow_schema::DataType::Float32 => Box::new(Float32Builder::new()),
+ arrow_schema::DataType::Float64 => Box::new(Float64Builder::new()),
+ arrow_schema::DataType::Boolean => Box::new(BooleanBuilder::new()),
+ arrow_schema::DataType::Utf8 => Box::new(StringBuilder::new()),
+ arrow_schema::DataType::Binary => Box::new(BinaryBuilder::new()),
+ dt => panic!("Unsupported data type: {dt:?}"),
+ }
+ }
+}
+
+impl ArrowRecordBatchInnerBuilder for RowAppendRecordBatchBuilder {
+ fn build_arrow_record_batch(&self) -> Result<Arc<RecordBatch>> {
+ let arrays = self
+ .arrow_column_builders
+ .lock()
+ .iter_mut()
+ .map(|b| b.finish())
+ .collect::<Vec<ArrayRef>>();
+ Ok(Arc::new(RecordBatch::try_new(
+ self.table_schema.clone(),
+ arrays,
+ )?))
+ }
+
+ fn append(&mut self, row: &GenericRow) -> Result<bool> {
+ for (idx, value) in row.values.iter().enumerate() {
+ let mut builder_binding = self.arrow_column_builders.lock();
+ let builder = builder_binding.get_mut(idx).unwrap();
+ value.append_to(builder.as_mut())?;
+ }
+ self.records_count += 1;
+ Ok(true)
+ }
+
+ fn append_batch(&mut self, _record_batch: Arc<RecordBatch>) ->
Result<bool> {
+ Ok(false)
+ }
+
+ fn schema(&self) -> SchemaRef {
+ self.table_schema.clone()
+ }
+
+ fn records_count(&self) -> i32 {
+ self.records_count
+ }
+
+ fn is_full(&self) -> bool {
+ self.records_count() >= DEFAULT_MAX_RECORD
+ }
+}
+
+impl MemoryLogRecordsArrowBuilder {
+ pub fn new(schema_id: i32, row_type: &DataType, to_append_record_batch:
bool) -> Self {
+ let arrow_batch_builder: Box<dyn ArrowRecordBatchInnerBuilder> = {
+ if to_append_record_batch {
+ Box::new(PrebuiltRecordBatchBuilder::default())
+ } else {
+ Box::new(RowAppendRecordBatchBuilder::new(row_type))
+ }
+ };
MemoryLogRecordsArrowBuilder {
base_log_offset: BUILDER_DEFAULT_OFFSET,
schema_id,
magic: CURRENT_LOG_MAGIC_VALUE,
writer_id: NO_WRITER_ID,
batch_sequence: NO_BATCH_SEQUENCE,
- record_count: 0,
- table_schema: schema_ref,
- arrow_column_builders: builders,
is_closed: false,
+ arrow_record_batch_builder: arrow_batch_builder,
}
}
- pub fn append(&mut self, row: &GenericRow) -> Result<()> {
- for (idx, value) in row.values.iter().enumerate() {
- let mut builder_binding = self.arrow_column_builders.lock();
- let builder = builder_binding.get_mut(idx).unwrap();
- value.append_to(builder.as_mut())?;
+ pub fn append(&mut self, record: &WriteRecord) -> Result<bool> {
+ match &record.row {
+ Record::Row(row) =>
Ok(self.arrow_record_batch_builder.append(row)?),
+ Record::RecordBatch(record_batch) => Ok(self
+ .arrow_record_batch_builder
+ .append_batch(record_batch.clone())?),
}
- self.record_count += 1;
// todo: consider write other change type
- Ok(())
}
pub fn is_full(&self) -> bool {
- self.record_count >= DEFAULT_MAX_RECORD
+ self.arrow_record_batch_builder.records_count() >= DEFAULT_MAX_RECORD
}
pub fn is_closed(&self) -> bool {
@@ -150,18 +281,12 @@ impl MemoryLogRecordsArrowBuilder {
pub fn build(&self) -> Result<Vec<u8>> {
// serialize arrow batch
let mut arrow_batch_bytes = vec![];
- let mut writer = StreamWriter::try_new(&mut arrow_batch_bytes,
&self.table_schema)?;
-
- let arrays = self
- .arrow_column_builders
- .lock()
- .iter_mut()
- .map(|b| b.finish())
- .collect::<Vec<ArrayRef>>();
- let record_batch = RecordBatch::try_new(self.table_schema.clone(),
arrays)?;
+ let table_schema = self.arrow_record_batch_builder.schema();
+ let mut writer = StreamWriter::try_new(&mut arrow_batch_bytes,
&table_schema)?;
// get header len
let header = writer.get_ref().len();
- writer.write(&record_batch)?;
+ let record_batch =
self.arrow_record_batch_builder.build_arrow_record_batch()?;
+ writer.write(record_batch.as_ref())?;
// get real arrow batch bytes
let real_arrow_batch_bytes = &arrow_batch_bytes[header..];
@@ -195,39 +320,21 @@ impl MemoryLogRecordsArrowBuilder {
cursor.write_u32::<LittleEndian>(0)?; // crc placeholder
cursor.write_i16::<LittleEndian>(self.schema_id as i16)?;
+ let record_count = self.arrow_record_batch_builder.records_count();
// todo: curerntly, always is append only
let append_only = true;
cursor.write_u8(if append_only { 1 } else { 0 })?;
- cursor.write_i32::<LittleEndian>(if self.record_count > 0 {
- self.record_count - 1
+ cursor.write_i32::<LittleEndian>(if record_count > 0 {
+ record_count - 1
} else {
0
})?;
cursor.write_i64::<LittleEndian>(self.writer_id)?;
cursor.write_i32::<LittleEndian>(self.batch_sequence)?;
- cursor.write_i32::<LittleEndian>(self.record_count)?;
+ cursor.write_i32::<LittleEndian>(record_count)?;
Ok(())
}
-
- fn create_builder(data_type: &arrow_schema::DataType) -> Box<dyn
ArrayBuilder> {
- match data_type {
- arrow_schema::DataType::Int8 => Box::new(Int8Builder::new()),
- arrow_schema::DataType::Int16 => Box::new(Int16Builder::new()),
- arrow_schema::DataType::Int32 => Box::new(Int32Builder::new()),
- arrow_schema::DataType::Int64 => Box::new(Int64Builder::new()),
- arrow_schema::DataType::UInt8 => Box::new(UInt8Builder::new()),
- arrow_schema::DataType::UInt16 => Box::new(UInt16Builder::new()),
- arrow_schema::DataType::UInt32 => Box::new(UInt32Builder::new()),
- arrow_schema::DataType::UInt64 => Box::new(UInt64Builder::new()),
- arrow_schema::DataType::Float32 => Box::new(Float32Builder::new()),
- arrow_schema::DataType::Float64 => Box::new(Float64Builder::new()),
- arrow_schema::DataType::Boolean => Box::new(BooleanBuilder::new()),
- arrow_schema::DataType::Utf8 => Box::new(StringBuilder::new()),
- arrow_schema::DataType::Binary => Box::new(BinaryBuilder::new()),
- dt => panic!("Unsupported data type: {dt:?}"),
- }
- }
}
pub trait ToArrow {
diff --git a/crates/fluss/tests/integration/admin.rs
b/crates/fluss/tests/integration/admin.rs
index 0d958a5..c51373d 100644
--- a/crates/fluss/tests/integration/admin.rs
+++ b/crates/fluss/tests/integration/admin.rs
@@ -38,20 +38,24 @@ mod admin_test {
TablePath,
};
use std::sync::Arc;
+ use std::thread;
fn before_all() {
// Create a new tokio runtime in a separate thread
let cluster_guard = SHARED_FLUSS_CLUSTER.clone();
- std::thread::spawn(move || {
+ thread::spawn(move || {
let rt = tokio::runtime::Runtime::new().expect("Failed to create
runtime");
rt.block_on(async {
- let cluster = FlussTestingClusterBuilder::new().build().await;
+ let cluster =
FlussTestingClusterBuilder::new("test-admin").build().await;
let mut guard = cluster_guard.write();
*guard = Some(cluster);
});
})
.join()
.expect("Failed to create cluster");
+ // wait for 20 seconds to avoid the error like
+ // CoordinatorEventProcessor is not initialized yet
+ thread::sleep(std::time::Duration::from_secs(20));
}
fn get_fluss_cluster() -> Arc<FlussTestingCluster> {
diff --git a/crates/fluss/tests/integration/fluss_cluster.rs
b/crates/fluss/tests/integration/fluss_cluster.rs
index 83a4795..e827e14 100644
--- a/crates/fluss/tests/integration/fluss_cluster.rs
+++ b/crates/fluss/tests/integration/fluss_cluster.rs
@@ -28,13 +28,14 @@ use testcontainers::{ContainerAsync, GenericImage,
ImageExt};
const FLUSS_VERSION: &str = "0.7.0";
pub struct FlussTestingClusterBuilder {
- number_of_tablet_servers: usize,
+ number_of_tablet_servers: i32,
network: &'static str,
cluster_conf: HashMap<String, String>,
+ testing_name: String,
}
impl FlussTestingClusterBuilder {
- pub fn new() -> Self {
+ pub fn new(testing_name: impl Into<String>) -> Self {
// reduce testing resources
let mut cluster_conf = HashMap::new();
cluster_conf.insert(
@@ -50,14 +51,27 @@ impl FlussTestingClusterBuilder {
number_of_tablet_servers: 1,
cluster_conf,
network: "fluss-cluster-network",
+ testing_name: testing_name.into(),
}
}
+ fn tablet_server_container_name(&self, server_id: i32) -> String {
+ format!("tablet-server-{}-{}", self.testing_name, server_id)
+ }
+
+ fn coordinator_server_container_name(&self) -> String {
+ format!("coordinator-server-{}", self.testing_name)
+ }
+
+ fn zookeeper_container_name(&self) -> String {
+ format!("zookeeper-{}", self.testing_name)
+ }
+
pub async fn build(&mut self) -> FlussTestingCluster {
let zookeeper = Arc::new(
GenericImage::new("zookeeper", "3.9.2")
.with_network(self.network)
- .with_container_name("zookeeper")
+ .with_container_name(self.zookeeper_container_name())
.start()
.await
.unwrap(),
@@ -83,15 +97,25 @@ impl FlussTestingClusterBuilder {
async fn start_coordinator_server(&mut self) ->
ContainerAsync<GenericImage> {
let mut coordinator_confs = HashMap::new();
- coordinator_confs.insert("zookeeper.address", "zookeeper:2181");
+ coordinator_confs.insert(
+ "zookeeper.address",
+ format!("{}:2181", self.zookeeper_container_name()),
+ );
coordinator_confs.insert(
"bind.listeners",
- "INTERNAL://coordinator-server:0,
CLIENT://coordinator-server:9123",
+ format!(
+ "INTERNAL://{}:0, CLIENT://{}:9123",
+ self.coordinator_server_container_name(),
+ self.coordinator_server_container_name()
+ ),
);
- coordinator_confs.insert("advertised.listeners",
"CLIENT://localhost:9123");
- coordinator_confs.insert("internal.listener.name", "INTERNAL");
+ coordinator_confs.insert(
+ "advertised.listeners",
+ "CLIENT://localhost:9123".to_string(),
+ );
+ coordinator_confs.insert("internal.listener.name",
"INTERNAL".to_string());
GenericImage::new("fluss/fluss", FLUSS_VERSION)
- .with_container_name("coordinator-server")
+ .with_container_name(self.coordinator_server_container_name())
.with_mapped_port(9123, ContainerPort::Tcp(9123))
.with_network(self.network)
.with_cmd(vec!["coordinatorServer"])
@@ -104,26 +128,30 @@ impl FlussTestingClusterBuilder {
.unwrap()
}
- async fn start_tablet_server(&self, server_id: usize) ->
ContainerAsync<GenericImage> {
+ async fn start_tablet_server(&self, server_id: i32) ->
ContainerAsync<GenericImage> {
let mut tablet_server_confs = HashMap::new();
let bind_listeners = format!(
- "INTERNAL://tablet-server-{}:0, CLIENT://tablet-server-{}:9123",
- server_id, server_id
+ "INTERNAL://{}:0, CLIENT://{}:9123",
+ self.tablet_server_container_name(server_id),
+ self.tablet_server_container_name(server_id),
);
let expose_host_port = 9124 + server_id;
let advertised_listeners = format!("CLIENT://localhost:{}",
expose_host_port);
let tablet_server_id = format!("{}", server_id);
- tablet_server_confs.insert("zookeeper.address", "zookeeper:2181");
- tablet_server_confs.insert("bind.listeners", bind_listeners.as_str());
- tablet_server_confs.insert("advertised.listeners",
advertised_listeners.as_str());
- tablet_server_confs.insert("internal.listener.name", "INTERNAL");
- tablet_server_confs.insert("tablet-server.id",
tablet_server_id.as_str());
+ tablet_server_confs.insert(
+ "zookeeper.address",
+ format!("{}:2181", self.zookeeper_container_name()),
+ );
+ tablet_server_confs.insert("bind.listeners", bind_listeners);
+ tablet_server_confs.insert("advertised.listeners",
advertised_listeners);
+ tablet_server_confs.insert("internal.listener.name",
"INTERNAL".to_string());
+ tablet_server_confs.insert("tablet-server.id", tablet_server_id);
GenericImage::new("fluss/fluss", FLUSS_VERSION)
.with_cmd(vec!["tabletServer"])
.with_mapped_port(expose_host_port as u16,
ContainerPort::Tcp(9123))
.with_network(self.network)
- .with_container_name(format!("tablet-server-{}", server_id))
+ .with_container_name(self.tablet_server_container_name(server_id))
.with_env_var(
"FLUSS_PROPERTIES",
self.to_fluss_properties_with(tablet_server_confs),
@@ -133,7 +161,7 @@ impl FlussTestingClusterBuilder {
.unwrap()
}
- fn to_fluss_properties_with(&self, extra_properties: HashMap<&str, &str>)
-> String {
+ fn to_fluss_properties_with(&self, extra_properties: HashMap<&str,
String>) -> String {
let mut fluss_properties = Vec::new();
for (k, v) in self.cluster_conf.iter() {
fluss_properties.push(format!("{}: {}", k, v));
@@ -150,7 +178,7 @@ impl FlussTestingClusterBuilder {
pub struct FlussTestingCluster {
zookeeper: Arc<ContainerAsync<GenericImage>>,
coordinator_server: Arc<ContainerAsync<GenericImage>>,
- tablet_servers: HashMap<usize, Arc<ContainerAsync<GenericImage>>>,
+ tablet_servers: HashMap<i32, Arc<ContainerAsync<GenericImage>>>,
bootstrap_servers: String,
}
@@ -165,6 +193,7 @@ impl FlussTestingCluster {
pub async fn get_fluss_connection(&self) -> FlussConnection {
let mut config = Config::default();
+ config.writer_acks = "all".to_string();
config.bootstrap_server = Some(self.bootstrap_servers.clone());
// Retry mechanism: retry for up to 1 minute
diff --git a/crates/fluss/tests/integration/table.rs
b/crates/fluss/tests/integration/table.rs
new file mode 100644
index 0000000..a1a6cb2
--- /dev/null
+++ b/crates/fluss/tests/integration/table.rs
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+use crate::integration::fluss_cluster::FlussTestingCluster;
+use once_cell::sync::Lazy;
+use parking_lot::RwLock;
+use std::sync::Arc;
+
+#[cfg(test)]
+use test_env_helpers::*;
+
+// Module-level shared cluster instance (only for this test file)
+static SHARED_FLUSS_CLUSTER: Lazy<Arc<RwLock<Option<FlussTestingCluster>>>> =
+ Lazy::new(|| Arc::new(RwLock::new(None)));
+
+#[cfg(test)]
+#[before_all]
+#[after_all]
+mod table_test {
+ use super::SHARED_FLUSS_CLUSTER;
+ use crate::integration::fluss_cluster::{FlussTestingCluster,
FlussTestingClusterBuilder};
+ use crate::integration::utils::create_table;
+ use arrow::array::record_batch;
+ use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath};
+ use std::sync::Arc;
+ use std::sync::atomic::AtomicUsize;
+ use std::thread;
+ fn before_all() {
+ // Create a new tokio runtime in a separate thread
+ let cluster_guard = SHARED_FLUSS_CLUSTER.clone();
+ std::thread::spawn(move || {
+ let rt = tokio::runtime::Runtime::new().expect("Failed to create
runtime");
+ rt.block_on(async {
+ let cluster =
FlussTestingClusterBuilder::new("test_table").build().await;
+ let mut guard = cluster_guard.write();
+ *guard = Some(cluster);
+ });
+ })
+ .join()
+ .expect("Failed to create cluster");
+ // wait for 20 seconds to avoid the error like
+ // CoordinatorEventProcessor is not initialized yet
+ thread::sleep(std::time::Duration::from_secs(20));
+ }
+
+ fn get_fluss_cluster() -> Arc<FlussTestingCluster> {
+ let cluster_guard = SHARED_FLUSS_CLUSTER.read();
+ if cluster_guard.is_none() {
+ panic!("Fluss cluster not initialized. Make sure before_all() was
called.");
+ }
+ Arc::new(cluster_guard.as_ref().unwrap().clone())
+ }
+
+ fn after_all() {
+ // Create a new tokio runtime in a separate thread
+ let cluster_guard = SHARED_FLUSS_CLUSTER.clone();
+ std::thread::spawn(move || {
+ let rt = tokio::runtime::Runtime::new().expect("Failed to create
runtime");
+ rt.block_on(async {
+ let mut guard = cluster_guard.write();
+ if let Some(cluster) = guard.take() {
+ cluster.stop().await;
+ }
+ });
+ })
+ .join()
+ .expect("Failed to cleanup cluster");
+ }
+
+ #[tokio::test]
+ async fn append_record_batch() {
+ let cluster = get_fluss_cluster();
+ let connection = cluster.get_fluss_connection().await;
+
+ let admin = connection.get_admin().await.expect("Failed to get admin");
+
+ let table_path =
+ TablePath::new("fluss".to_string(),
"test_append_record_batch".to_string());
+
+ let table_descriptor = TableDescriptor::builder()
+ .schema(
+ Schema::builder()
+ .column("c1", DataTypes::int())
+ .column("c2", DataTypes::string())
+ .build()
+ .expect("Failed to build schema"),
+ )
+ .build()
+ .expect("Failed to build table");
+
+ create_table(&admin, &table_path, &table_descriptor).await;
+
+ let append_writer = connection
+ .get_table(&table_path)
+ .await
+ .expect("Failed to get table")
+ .new_append()
+ .expect("Failed to create append")
+ .create_writer();
+
+ let batch1 =
+ record_batch!(("c1", Int32, [1, 2, 3]), ("c2", Utf8, ["a1", "a2",
"a3"])).unwrap();
+ append_writer
+ .append_arrow_batch(batch1)
+ .await
+ .expect("Failed to append batch");
+
+ let batch2 =
+ record_batch!(("c1", Int32, [4, 5, 6]), ("c2", Utf8, ["a4", "a5",
"a6"])).unwrap();
+ append_writer
+ .append_arrow_batch(batch2)
+ .await
+ .expect("Failed to append batch");
+
+ // todo: add scan code to verify the records appended in #30
+ }
+}
diff --git a/crates/fluss/tests/integration/utils.rs
b/crates/fluss/tests/integration/utils.rs
new file mode 100644
index 0000000..cd1f6cc
--- /dev/null
+++ b/crates/fluss/tests/integration/utils.rs
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+use fluss::client::FlussAdmin;
+use fluss::metadata::{TableDescriptor, TablePath};
+
+pub async fn create_table(
+ admin: &FlussAdmin,
+ table_path: &TablePath,
+ table_descriptor: &TableDescriptor,
+) {
+ admin
+ .create_table(&table_path, &table_descriptor, false)
+ .await
+ .expect("Failed to create table");
+}
diff --git a/crates/fluss/tests/test_fluss.rs b/crates/fluss/tests/test_fluss.rs
index 28b9bef..a15ca23 100644
--- a/crates/fluss/tests/test_fluss.rs
+++ b/crates/fluss/tests/test_fluss.rs
@@ -22,4 +22,7 @@ extern crate fluss;
mod integration {
mod admin;
mod fluss_cluster;
+ mod table;
+
+ mod utils;
}