fresh-borzoni commented on code in PR #404:
URL: https://github.com/apache/fluss-rust/pull/404#discussion_r2886311018


##########
crates/fluss/src/client/write/sender.rs:
##########
@@ -54,29 +65,137 @@ impl Sender {
         max_request_timeout_ms: i32,
         ack: i16,
         retries: i32,
+        idempotence_manager: Arc<IdempotenceManager>,
     ) -> Self {
         Self {
-            running: true,
+            running: AtomicBool::new(true),
             metadata,
             accumulator,
             in_flight_batches: Default::default(),
             max_request_size,
             ack,
             max_request_timeout_ms,
             retries,
+            idempotence_manager,
         }
     }
 
+    #[allow(dead_code)]
     pub async fn run(&self) -> Result<()> {
         loop {
-            if !self.running {
+            if !self.running.load(Ordering::Relaxed) {
                 return Ok(());
             }
             self.run_once().await?;
         }
     }
 
-    async fn run_once(&self) -> Result<()> {
+    const WRITER_ID_RETRY_TIMES: u32 = 3;
+    const WRITER_ID_RETRY_INTERVAL_MS: u64 = 100;
+
+    async fn maybe_wait_for_writer_id(&self) -> Result<()> {
+        if !self.idempotence_manager.is_enabled() || 
self.idempotence_manager.has_writer_id() {
+            return Ok(());
+        }
+        let mut retry_count = 0u32;
+        loop {
+            match self.try_init_writer_id().await {
+                Ok(()) => return Ok(()),
+                Err(e) => {
+                    // Authorization errors are not transient — fail 
immediately.
+                    if e.api_error() == 
Some(FlussError::AuthorizationException) {
+                        return Err(e);
+                    }
+                    if retry_count >= Self::WRITER_ID_RETRY_TIMES {
+                        return Err(e);
+                    }
+                    if 
e.api_error().is_some_and(Self::is_invalid_metadata_error) {
+                        let physical_paths = 
self.accumulator.get_physical_table_paths_in_batches();
+                        let physical_refs: HashSet<&Arc<PhysicalTablePath>> =
+                            physical_paths.iter().collect();
+                        if let Err(meta_err) = self
+                            .metadata
+                            .update_tables_metadata(&HashSet::new(), 
&physical_refs, vec![])
+                            .await
+                        {
+                            warn!("Failed to refresh metadata after writer ID 
error: {meta_err}");
+                        }
+                    }
+                    retry_count += 1;
+                    let delay_ms = Self::WRITER_ID_RETRY_INTERVAL_MS * 
2u64.pow(retry_count);
+                    warn!(
+                        "Failed to allocate writer ID (attempt 
{retry_count}/{}), retrying in {delay_ms}ms: {e}",
+                        Self::WRITER_ID_RETRY_TIMES,
+                    );
+                    tokio::time::sleep(Duration::from_millis(delay_ms)).await;
+                }
+            }
+        }
+    }
+
+    async fn try_init_writer_id(&self) -> Result<()> {
+        // Deduplicate by (database, table) since multiple physical paths 
(partitions)
+        // may share the same table. Matches Java's Set<TablePath> dedup.
+        let mut seen = HashSet::new();
+        let table_paths: Vec<PbTablePath> = self
+            .accumulator
+            .get_physical_table_paths_in_batches()
+            .iter()
+            .filter_map(|path| {
+                let key = (
+                    path.get_database_name().to_string(),
+                    path.get_table_name().to_string(),
+                );
+                if seen.insert(key.clone()) {
+                    Some(PbTablePath {
+                        database_name: key.0,
+                        table_name: key.1,
+                    })
+                } else {
+                    None
+                }
+            })
+            .collect();
+        if table_paths.is_empty() {
+            debug!("No table paths in batches, skipping writer ID allocation");
+            return Ok(());
+        }
+        let cluster = self.metadata.get_cluster();
+        let server = cluster.get_one_available_server().ok_or(UnexpectedError {
+            message: "No tablet server available to allocate writer 
ID".to_string(),
+            source: None,
+        })?;
+        let connection = self.metadata.get_connection(server).await?;
+        let response = connection
+            .request(InitWriterRequest::new(table_paths))
+            .await?;
+        self.idempotence_manager.set_writer_id(response.writer_id);
+        debug!(
+            "Allocated writer ID {} for idempotent writes",
+            response.writer_id
+        );
+        Ok(())
+    }
+
+    fn maybe_abort_batches(&self, error: &crate::error::Error) {
+        if self.accumulator.has_incomplete() {
+            warn!("Aborting write batches due to fatal error: {error}");
+            self.accumulator.abort_batches(broadcast::Error::Client {
+                message: format!("Writer ID allocation failed: {error}"),
+            });
+        }
+    }
+
+    /// Drain batches and return per-leader send futures without awaiting them.
+    /// Returns `(send_futures, next_check_delay)` where `next_check_delay` is
+    /// `Some(ms)` when no nodes were ready (caller should sleep).
+    async fn prepare_sends(&self) -> Result<(Vec<SendFuture<'_>>, 
Option<u64>)> {
+        if let Err(e) = self.maybe_wait_for_writer_id().await {
+            warn!("Failed to allocate writer ID after retries: {e}");
+            self.maybe_abort_batches(&e);
+            return Ok((vec![], None));

Review Comment:
   The 1ms spin doesn't happen in practice. `maybe_abort_batches` clears all 
pending batches, so the next iteration's `try_init_writer_id` finds no table 
paths and returns Ok(()) early, falling through to `accumulator.ready()` which  
provides a proper delay.                                                        
                                                                                
                                                                                
   
                             



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to