Copilot commented on code in PR #404:
URL: https://github.com/apache/fluss-rust/pull/404#discussion_r2882566860
##########
crates/fluss/src/client/write/accumulator.rs:
##########
@@ -15,21 +15,142 @@
// specific language governing permissions and limitations
// under the License.
+use crate::client::broadcast;
+use crate::client::write::IdempotenceManager;
use crate::client::write::batch::WriteBatch::{ArrowLog, Kv};
use crate::client::write::batch::{ArrowLogWriteBatch, KvWriteBatch,
WriteBatch};
use crate::client::{LogWriteRecord, Record, ResultHandle, WriteRecord};
use crate::cluster::{BucketLocation, Cluster, ServerNode};
use crate::config::Config;
-use crate::error::Result;
+use crate::error::{Error, Result};
use crate::metadata::{PhysicalTablePath, TableBucket};
+use crate::record::{NO_BATCH_SEQUENCE, NO_WRITER_ID};
use crate::util::current_time_ms;
use crate::{BucketId, PartitionId, TableId};
use dashmap::DashMap;
-use parking_lot::Mutex;
-use parking_lot::RwLock;
+use parking_lot::{Condvar, Mutex, RwLock};
use std::collections::{HashMap, HashSet, VecDeque};
use std::sync::Arc;
-use std::sync::atomic::{AtomicI32, AtomicI64, Ordering};
+use std::sync::atomic::{AtomicBool, AtomicI32, AtomicI64, AtomicUsize,
Ordering};
+use std::time::{Duration, Instant};
+
+/// Byte-counting semaphore that blocks producers when total buffered memory
+/// exceeds the configured limit. Matches Java's `LazyMemorySegmentPool`
behavior.
+///
+/// TODO: Replace `notify_all()` with per-waiter FIFO signaling (Java uses
per-request
+/// Condition objects in a Deque) to avoid thundering herd under high
contention.
+///
+/// TODO: Track actual batch memory usage instead of reserving a fixed
`writer_batch_size`
+/// per batch. This over-counts when batches don't fill completely, reducing
effective
+/// throughput. Requires tighter coupling with batch internals.
+pub(crate) struct MemoryLimiter {
+ state: Mutex<usize>,
+ cond: Condvar,
+ max_memory: usize,
+ wait_timeout: Duration,
+ closed: AtomicBool,
+ waiting_count: AtomicUsize,
+}
+
+impl MemoryLimiter {
+ pub fn new(max_memory: usize, wait_timeout: Duration) -> Self {
+ Self {
+ state: Mutex::new(0),
+ cond: Condvar::new(),
+ max_memory,
+ wait_timeout,
+ closed: AtomicBool::new(false),
+ waiting_count: AtomicUsize::new(0),
+ }
+ }
+
+ /// Try to acquire `size` bytes. Blocks until memory is available,
+ /// the timeout expires, or the limiter is closed.
+ /// Returns a `MemoryPermit` on success.
+ pub fn acquire(self: &Arc<Self>, size: usize) -> Result<MemoryPermit> {
+ if self.closed.load(Ordering::Acquire) {
+ return Err(Error::IllegalArgument {
+ message: "Memory limiter is closed".to_string(),
+ });
+ }
Review Comment:
`MemoryLimiter::acquire` returns `Error::IllegalArgument` when the limiter
is closed. That condition is a runtime shutdown/backpressure state rather than
an invalid argument, which can be misleading for callers that treat
`IllegalArgument` as a configuration/programming error. Consider using a more
appropriate error variant (e.g., `UnexpectedError` / a dedicated "closed"
error) so callers can distinguish shutdown from bad input.
##########
crates/fluss/src/client/write/sender.rs:
##########
@@ -54,29 +65,137 @@ impl Sender {
max_request_timeout_ms: i32,
ack: i16,
retries: i32,
+ idempotence_manager: Arc<IdempotenceManager>,
) -> Self {
Self {
- running: true,
+ running: AtomicBool::new(true),
metadata,
accumulator,
in_flight_batches: Default::default(),
max_request_size,
ack,
max_request_timeout_ms,
retries,
+ idempotence_manager,
}
}
+ #[allow(dead_code)]
pub async fn run(&self) -> Result<()> {
loop {
- if !self.running {
+ if !self.running.load(Ordering::Relaxed) {
return Ok(());
}
self.run_once().await?;
}
}
- async fn run_once(&self) -> Result<()> {
+ const WRITER_ID_RETRY_TIMES: u32 = 3;
+ const WRITER_ID_RETRY_INTERVAL_MS: u64 = 100;
+
+ async fn maybe_wait_for_writer_id(&self) -> Result<()> {
+ if !self.idempotence_manager.is_enabled() ||
self.idempotence_manager.has_writer_id() {
+ return Ok(());
+ }
+ let mut retry_count = 0u32;
+ loop {
+ match self.try_init_writer_id().await {
+ Ok(()) => return Ok(()),
+ Err(e) => {
+ // Authorization errors are not transient — fail
immediately.
+ if e.api_error() ==
Some(FlussError::AuthorizationException) {
+ return Err(e);
+ }
+ if retry_count >= Self::WRITER_ID_RETRY_TIMES {
+ return Err(e);
+ }
+ if
e.api_error().is_some_and(Self::is_invalid_metadata_error) {
+ let physical_paths =
self.accumulator.get_physical_table_paths_in_batches();
+ let physical_refs: HashSet<&Arc<PhysicalTablePath>> =
+ physical_paths.iter().collect();
+ if let Err(meta_err) = self
+ .metadata
+ .update_tables_metadata(&HashSet::new(),
&physical_refs, vec![])
+ .await
+ {
+ warn!("Failed to refresh metadata after writer ID
error: {meta_err}");
+ }
+ }
+ retry_count += 1;
+ let delay_ms = Self::WRITER_ID_RETRY_INTERVAL_MS *
2u64.pow(retry_count);
+ warn!(
+ "Failed to allocate writer ID (attempt
{retry_count}/{}), retrying in {delay_ms}ms: {e}",
+ Self::WRITER_ID_RETRY_TIMES,
+ );
+ tokio::time::sleep(Duration::from_millis(delay_ms)).await;
+ }
+ }
+ }
+ }
+
+ async fn try_init_writer_id(&self) -> Result<()> {
+ // Deduplicate by (database, table) since multiple physical paths
(partitions)
+ // may share the same table. Matches Java's Set<TablePath> dedup.
+ let mut seen = HashSet::new();
+ let table_paths: Vec<PbTablePath> = self
+ .accumulator
+ .get_physical_table_paths_in_batches()
+ .iter()
+ .filter_map(|path| {
+ let key = (
+ path.get_database_name().to_string(),
+ path.get_table_name().to_string(),
+ );
+ if seen.insert(key.clone()) {
+ Some(PbTablePath {
+ database_name: key.0,
+ table_name: key.1,
+ })
+ } else {
+ None
+ }
+ })
+ .collect();
+ if table_paths.is_empty() {
+ debug!("No table paths in batches, skipping writer ID allocation");
+ return Ok(());
+ }
+ let cluster = self.metadata.get_cluster();
+ let server = cluster.get_one_available_server().ok_or(UnexpectedError {
+ message: "No tablet server available to allocate writer
ID".to_string(),
+ source: None,
+ })?;
+ let connection = self.metadata.get_connection(server).await?;
+ let response = connection
+ .request(InitWriterRequest::new(table_paths))
+ .await?;
+ self.idempotence_manager.set_writer_id(response.writer_id);
+ debug!(
+ "Allocated writer ID {} for idempotent writes",
+ response.writer_id
+ );
+ Ok(())
+ }
+
+ fn maybe_abort_batches(&self, error: &crate::error::Error) {
+ if self.accumulator.has_incomplete() {
+ warn!("Aborting write batches due to fatal error: {error}");
+ self.accumulator.abort_batches(broadcast::Error::Client {
+ message: format!("Writer ID allocation failed: {error}"),
+ });
+ }
+ }
+
+ /// Drain batches and return per-leader send futures without awaiting them.
+ /// Returns `(send_futures, next_check_delay)` where `next_check_delay` is
+ /// `Some(ms)` when no nodes were ready (caller should sleep).
+ async fn prepare_sends(&self) -> Result<(Vec<SendFuture<'_>>,
Option<u64>)> {
+ if let Err(e) = self.maybe_wait_for_writer_id().await {
+ warn!("Failed to allocate writer ID after retries: {e}");
+ self.maybe_abort_batches(&e);
+ return Ok((vec![], None));
Review Comment:
`prepare_sends` swallows writer-ID initialization failures by returning
`Ok((vec![], None))`. In `run_with_shutdown`, an empty future set + `None`
delay leads to a ~1ms sleep loop, so a permanent failure (especially
`AuthorizationException`, which you explicitly treat as non-transient) can
cause log spam and unnecessary CPU usage. Consider propagating the error (so
the sender loop can exit) or returning an appropriate backoff delay for fatal
init failures.
```suggestion
return Err(e);
```
##########
crates/fluss/src/client/write/accumulator.rs:
##########
@@ -15,21 +15,142 @@
// specific language governing permissions and limitations
// under the License.
+use crate::client::broadcast;
+use crate::client::write::IdempotenceManager;
use crate::client::write::batch::WriteBatch::{ArrowLog, Kv};
use crate::client::write::batch::{ArrowLogWriteBatch, KvWriteBatch,
WriteBatch};
use crate::client::{LogWriteRecord, Record, ResultHandle, WriteRecord};
use crate::cluster::{BucketLocation, Cluster, ServerNode};
use crate::config::Config;
-use crate::error::Result;
+use crate::error::{Error, Result};
use crate::metadata::{PhysicalTablePath, TableBucket};
+use crate::record::{NO_BATCH_SEQUENCE, NO_WRITER_ID};
use crate::util::current_time_ms;
use crate::{BucketId, PartitionId, TableId};
use dashmap::DashMap;
-use parking_lot::Mutex;
-use parking_lot::RwLock;
+use parking_lot::{Condvar, Mutex, RwLock};
use std::collections::{HashMap, HashSet, VecDeque};
use std::sync::Arc;
-use std::sync::atomic::{AtomicI32, AtomicI64, Ordering};
+use std::sync::atomic::{AtomicBool, AtomicI32, AtomicI64, AtomicUsize,
Ordering};
+use std::time::{Duration, Instant};
+
+/// Byte-counting semaphore that blocks producers when total buffered memory
+/// exceeds the configured limit. Matches Java's `LazyMemorySegmentPool`
behavior.
+///
+/// TODO: Replace `notify_all()` with per-waiter FIFO signaling (Java uses
per-request
+/// Condition objects in a Deque) to avoid thundering herd under high
contention.
+///
+/// TODO: Track actual batch memory usage instead of reserving a fixed
`writer_batch_size`
+/// per batch. This over-counts when batches don't fill completely, reducing
effective
+/// throughput. Requires tighter coupling with batch internals.
+pub(crate) struct MemoryLimiter {
+ state: Mutex<usize>,
+ cond: Condvar,
+ max_memory: usize,
+ wait_timeout: Duration,
+ closed: AtomicBool,
+ waiting_count: AtomicUsize,
+}
+
+impl MemoryLimiter {
+ pub fn new(max_memory: usize, wait_timeout: Duration) -> Self {
+ Self {
+ state: Mutex::new(0),
+ cond: Condvar::new(),
+ max_memory,
+ wait_timeout,
+ closed: AtomicBool::new(false),
+ waiting_count: AtomicUsize::new(0),
+ }
+ }
+
+ /// Try to acquire `size` bytes. Blocks until memory is available,
+ /// the timeout expires, or the limiter is closed.
+ /// Returns a `MemoryPermit` on success.
+ pub fn acquire(self: &Arc<Self>, size: usize) -> Result<MemoryPermit> {
+ if self.closed.load(Ordering::Acquire) {
+ return Err(Error::IllegalArgument {
+ message: "Memory limiter is closed".to_string(),
+ });
+ }
+
+ if size > self.max_memory {
+ return Err(Error::IllegalArgument {
+ message: format!(
+ "Batch size {} exceeds total buffer memory limit {}",
+ size, self.max_memory
+ ),
+ });
+ }
+
+ let mut used = self.state.lock();
+ let deadline = Instant::now() + self.wait_timeout;
+ while *used + size > self.max_memory {
+ self.waiting_count.fetch_add(1, Ordering::Relaxed);
+ let result = self.cond.wait_until(&mut used, deadline);
+ self.waiting_count.fetch_sub(1, Ordering::Relaxed);
+
+ if self.closed.load(Ordering::Acquire) {
+ return Err(Error::IllegalArgument {
+ message: "Memory limiter is closed".to_string(),
+ });
+ }
+ if result.timed_out() && *used + size > self.max_memory {
+ return Err(Error::IllegalArgument {
Review Comment:
`MemoryLimiter::acquire` reports allocation timeouts as
`Error::IllegalArgument`. A timeout is a runtime/operational failure (resource
exhaustion) rather than an invalid argument, and classifying it as
`IllegalArgument` makes it harder to handle separately (e.g., retry vs.
fail-fast). Consider returning a more suitable error variant (or introducing a
dedicated timeout/backpressure error) for this path.
```suggestion
return Err(Error::Timeout {
```
##########
crates/fluss/src/client/write/writer_client.rs:
##########
@@ -146,21 +163,48 @@ impl WriterClient {
Ok((bucket_assigner, bucket_id))
}
- pub async fn close(self) -> Result<()> {
- self.shutdown_tx
- .send(())
- .await
- .map_err(|e| Error::UnexpectedError {
- message: format!("Failed to close write client: {e:?}"),
- source: None,
- })?;
-
- self.sender_join_handle
- .await
- .map_err(|e| Error::UnexpectedError {
- message: format!("Failed to close write client: {e:?}"),
- source: None,
- })?;
+ /// Close the writer with a timeout. Matches Java's two-phase shutdown:
+ ///
+ /// 1. **Graceful**: Signal the sender to drain all remaining batches.
+ /// `accumulator.close()` makes all batches immediately ready (no need
+ /// to wait for `batch_timeout_ms`).
+ /// 2. **Force** (if timeout exceeded): Abort the sender task and fail
+ /// all remaining batches with an error.
+ ///
+ /// Idempotent: calling `close` a second time returns `Ok(())` immediately.
+ pub async fn close(&self, timeout: Duration) -> Result<()> {
+ // Take shutdown_tx and join_handle out of their Mutexes.
Review Comment:
`WriterClient::close` changed from consuming `self` with no args to `&self`
and requiring a `timeout: Duration`. Since `WriterClient` is re-exported
publicly, this is a breaking API change for downstream callers. Consider
keeping a backwards-compatible `close()` (with a default timeout) and/or
introducing `close_with_timeout()` while preserving the old signature during a
deprecation period.
##########
crates/fluss/src/client/write/writer_client.rs:
##########
@@ -146,21 +163,48 @@ impl WriterClient {
Ok((bucket_assigner, bucket_id))
}
- pub async fn close(self) -> Result<()> {
- self.shutdown_tx
- .send(())
- .await
- .map_err(|e| Error::UnexpectedError {
- message: format!("Failed to close write client: {e:?}"),
- source: None,
- })?;
-
- self.sender_join_handle
- .await
- .map_err(|e| Error::UnexpectedError {
- message: format!("Failed to close write client: {e:?}"),
- source: None,
- })?;
+ /// Close the writer with a timeout. Matches Java's two-phase shutdown:
+ ///
+ /// 1. **Graceful**: Signal the sender to drain all remaining batches.
+ /// `accumulator.close()` makes all batches immediately ready (no need
+ /// to wait for `batch_timeout_ms`).
+ /// 2. **Force** (if timeout exceeded): Abort the sender task and fail
+ /// all remaining batches with an error.
+ ///
+ /// Idempotent: calling `close` a second time returns `Ok(())` immediately.
+ pub async fn close(&self, timeout: Duration) -> Result<()> {
+ // Take shutdown_tx and join_handle out of their Mutexes.
+ // Second call sees None and returns early.
+ let shutdown_tx = self.shutdown_tx.lock().take();
+ let join_handle = self.sender_join_handle.lock().take();
+
+ let Some(mut join_handle) = join_handle else {
+ return Ok(());
+ };
+
+ // Phase 1: Signal graceful shutdown.
+ // Mark accumulator closed so all batches become immediately sendable.
+ self.accumulate.close();
+ // Drop the shutdown sender — recv() returns None, breaking the sender
loop.
+ drop(shutdown_tx);
Review Comment:
Because `close` now takes `&self` (instead of consuming `self`), other tasks
can keep calling `send()` concurrently while shutdown is in progress. That can
prevent the sender loop from ever finishing its “graceful drain” phase (it will
keep finding new undrained batches) and makes timeouts/force-close much more
likely. Consider adding an explicit "closing/closed" flag checked by `send()`
(returning an error) or otherwise preventing new appends once shutdown begins.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]