This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch improve-log-poll
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git

commit 3bd59edc14997bd41691e657c73909e4d2df4f9f
Author: luoyuxia <[email protected]>
AuthorDate: Mon Dec 15 09:28:05 2025 +0800

    improve log poll
---
 crates/fluss/src/client/table/log_fetch_buffer.rs | 417 ++++++++++++++++++++
 crates/fluss/src/client/table/mod.rs              |   1 +
 crates/fluss/src/client/table/remote_log.rs       | 141 +++++--
 crates/fluss/src/client/table/scanner.rs          | 452 +++++++++++++++++-----
 crates/fluss/tests/integration/table.rs           |   2 +-
 5 files changed, 882 insertions(+), 131 deletions(-)

diff --git a/crates/fluss/src/client/table/log_fetch_buffer.rs 
b/crates/fluss/src/client/table/log_fetch_buffer.rs
new file mode 100644
index 0000000..eca4de9
--- /dev/null
+++ b/crates/fluss/src/client/table/log_fetch_buffer.rs
@@ -0,0 +1,417 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::error::{Error, Result};
+use crate::metadata::TableBucket;
+use crate::proto::PbFetchLogRespForBucket;
+use crate::record::{LogRecordsBatchs, ReadContext, ScanRecord};
+use parking_lot::Mutex;
+use std::collections::{HashMap, VecDeque};
+use std::sync::Arc;
+use std::sync::atomic::{AtomicBool, Ordering};
+use std::time::Duration;
+use tokio::sync::Notify;
+
+/// Represents a completed fetch that can be consumed
+pub trait CompletedFetch: Send + Sync {
+    fn table_bucket(&self) -> &TableBucket;
+    fn fetch_records(&mut self, max_records: usize) -> Result<Vec<ScanRecord>>;
+    fn next_fetch_offset(&self) -> i64;
+    fn is_consumed(&self) -> bool;
+    fn drain(&mut self);
+    fn size_in_bytes(&self) -> usize;
+    fn high_watermark(&self) -> i64;
+    fn is_initialized(&self) -> bool;
+    fn set_initialized(&mut self);
+    fn fetch_offset(&self) -> i64;
+}
+
+/// Represents a pending fetch that is waiting to be completed
+pub trait PendingFetch: Send + Sync {
+    fn table_bucket(&self) -> &TableBucket;
+    fn is_completed(&self) -> bool;
+    fn to_completed_fetch(self: Box<Self>) -> Result<Box<dyn CompletedFetch>>;
+}
+
+/// Thread-safe buffer for completed fetches
+pub struct LogFetchBuffer {
+    completed_fetches: Mutex<VecDeque<Box<dyn CompletedFetch>>>,
+    pending_fetches: Mutex<HashMap<TableBucket, VecDeque<Box<dyn 
PendingFetch>>>>,
+    next_in_line_fetch: Mutex<Option<Box<dyn CompletedFetch>>>,
+    not_empty_notify: Notify,
+    woken_up: Arc<AtomicBool>,
+}
+
+impl LogFetchBuffer {
+    pub fn new() -> Self {
+        Self {
+            completed_fetches: Mutex::new(VecDeque::new()),
+            pending_fetches: Mutex::new(HashMap::new()),
+            next_in_line_fetch: Mutex::new(None),
+            not_empty_notify: Notify::new(),
+            woken_up: Arc::new(AtomicBool::new(false)),
+        }
+    }
+    
+    /// Check if the buffer is empty
+    pub fn is_empty(&self) -> bool {
+        self.completed_fetches.lock().is_empty()
+    }
+    
+    /// Wait for the buffer to become non-empty, with timeout
+    /// Returns true if data became available, false if timeout
+    pub async fn await_not_empty(&self, timeout: Duration) -> bool {
+        let deadline = std::time::Instant::now() + timeout;
+        
+        loop {
+            // Check if buffer is not empty
+            if !self.is_empty() {
+                return true;
+            }
+            
+            // Check if woken up
+            if self.woken_up.swap(false, Ordering::Acquire) {
+                return true;
+            }
+            
+            // Check if timeout
+            let now = std::time::Instant::now();
+            if now >= deadline {
+                return false;
+            }
+            
+            // Wait for notification with remaining time
+            let remaining = deadline - now;
+            let notified = self.not_empty_notify.notified();
+            tokio::select! {
+                _ = tokio::time::sleep(remaining) => {
+                    return false; // Timeout
+                }
+                _ = notified => {
+                    // Got notification, check again
+                    continue;
+                }
+            }
+        }
+    }
+    
+    /// Wake up any waiting threads
+    pub fn wakeup(&self) {
+        self.woken_up.store(true, Ordering::Release);
+        self.not_empty_notify.notify_waiters();
+    }
+
+    /// Add a pending fetch to the buffer
+    pub fn pend(&self, pending_fetch: Box<dyn PendingFetch>) {
+        let table_bucket = pending_fetch.table_bucket().clone();
+        self.pending_fetches
+            .lock()
+            .entry(table_bucket)
+            .or_insert_with(VecDeque::new)
+            .push_back(pending_fetch);
+    }
+
+    /// Try to complete pending fetches in order, converting them to completed 
fetches
+    pub fn try_complete(&self, table_bucket: &TableBucket) {
+        let mut pending_map = self.pending_fetches.lock();
+        if let Some(pendings) = pending_map.get_mut(table_bucket) {
+            let mut has_completed = false;
+            while let Some(front) = pendings.front() {
+                if front.is_completed() {
+                    if let Some(pending) = pendings.pop_front() {
+                        match pending.to_completed_fetch() {
+                            Ok(completed) => {
+                                
self.completed_fetches.lock().push_back(completed);
+                                // Signal that buffer is not empty
+                                self.not_empty_notify.notify_waiters();
+                                has_completed = true;
+                            }
+                            Err(_) => {
+                                // Skip failed fetches
+                            }
+                        }
+                    }
+                } else {
+                    break;
+                }
+            }
+            if pendings.is_empty() {
+                pending_map.remove(table_bucket);
+            }
+        }
+    }
+
+    /// Add a completed fetch to the buffer
+    pub fn add(&self, completed_fetch: Box<dyn CompletedFetch>) {
+        let table_bucket = completed_fetch.table_bucket().clone();
+        let mut pending_map = self.pending_fetches.lock();
+        let should_notify = if let Some(pendings) = 
pending_map.get_mut(&table_bucket) {
+            if pendings.is_empty() {
+                self.completed_fetches.lock().push_back(completed_fetch);
+                true
+            } else {
+                // Convert to pending fetch wrapper
+                let completed_pending = 
CompletedPendingFetch::new(completed_fetch);
+                pendings.push_back(Box::new(completed_pending));
+                false
+            }
+        } else {
+            self.completed_fetches.lock().push_back(completed_fetch);
+            true
+        };
+        
+        // Signal that buffer is not empty if we added to completed_fetches
+        if should_notify {
+            self.not_empty_notify.notify_waiters();
+        }
+    }
+
+    /// Poll the next completed fetch
+    pub fn poll(&self) -> Option<Box<dyn CompletedFetch>> {
+        self.completed_fetches.lock().pop_front()
+    }
+
+    /// Get the next in line fetch
+    pub fn next_in_line_fetch(&self) -> Option<Box<dyn CompletedFetch>> {
+        self.next_in_line_fetch.lock().take()
+    }
+
+    /// Set the next in line fetch
+    pub fn set_next_in_line_fetch(&self, fetch: Option<Box<dyn 
CompletedFetch>>) {
+        *self.next_in_line_fetch.lock() = fetch;
+    }
+
+    /// Get the set of buckets that have buffered data
+    pub fn buffered_buckets(&self) -> Vec<TableBucket> {
+        let mut buckets = Vec::new();
+        let completed = self.completed_fetches.lock();
+        for fetch in completed.iter() {
+            buckets.push(fetch.table_bucket().clone());
+        }
+        let pending = self.pending_fetches.lock();
+        buckets.extend(pending.keys().cloned());
+        buckets
+    }
+}
+
+impl Default for LogFetchBuffer {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+/// Pending fetch that waits for fetch log response
+pub struct FetchPendingFetch {
+    table_bucket: TableBucket,
+    response: Arc<Mutex<Option<Result<PbFetchLogRespForBucket>>>>,
+    read_context: ReadContext,
+    fetch_offset: i64,
+}
+
+impl FetchPendingFetch {
+    pub fn new(
+        table_bucket: TableBucket,
+        read_context: ReadContext,
+        fetch_offset: i64,
+    ) -> (Self, Arc<Mutex<Option<Result<PbFetchLogRespForBucket>>>>) {
+        let response = Arc::new(Mutex::new(None));
+        let pending = Self {
+            table_bucket,
+            response: Arc::clone(&response),
+            read_context,
+            fetch_offset,
+        };
+        (pending, response)
+    }
+
+    pub fn set_response(&self, response: Result<PbFetchLogRespForBucket>) {
+        *self.response.lock() = Some(response);
+    }
+}
+
+impl PendingFetch for FetchPendingFetch {
+    fn table_bucket(&self) -> &TableBucket {
+        &self.table_bucket
+    }
+
+    fn is_completed(&self) -> bool {
+        self.response.lock().is_some()
+    }
+
+    fn to_completed_fetch(self: Box<Self>) -> Result<Box<dyn CompletedFetch>> {
+        let response = self.response.lock().take().ok_or_else(|| {
+            Error::Io(std::io::Error::other("Fetch response not available"))
+        })??;
+        let completed = DefaultCompletedFetch::new(
+            self.table_bucket,
+            &response,
+            self.read_context,
+            self.fetch_offset,
+        )?;
+        Ok(Box::new(completed))
+    }
+}
+
+/// A wrapper that makes a completed fetch look like a pending fetch
+struct CompletedPendingFetch {
+    completed_fetch: Box<dyn CompletedFetch>,
+}
+
+impl CompletedPendingFetch {
+    fn new(completed_fetch: Box<dyn CompletedFetch>) -> Self {
+        Self { completed_fetch }
+    }
+}
+
+impl PendingFetch for CompletedPendingFetch {
+    fn table_bucket(&self) -> &TableBucket {
+        self.completed_fetch.table_bucket()
+    }
+
+    fn is_completed(&self) -> bool {
+        true
+    }
+
+    fn to_completed_fetch(self: Box<Self>) -> Result<Box<dyn CompletedFetch>> {
+        Ok(self.completed_fetch)
+    }
+}
+
+/// Default implementation of CompletedFetch for in-memory log records
+pub struct DefaultCompletedFetch {
+    table_bucket: TableBucket,
+    data: Vec<u8>,
+    read_context: ReadContext,
+    fetch_offset: i64, // The offset at which this fetch started
+    next_fetch_offset: i64,
+    high_watermark: i64,
+    size_in_bytes: usize,
+    consumed: bool,
+    initialized: bool,
+    // Pre-parsed records for efficient access
+    records: Vec<ScanRecord>,
+    current_index: usize,
+}
+
+impl DefaultCompletedFetch {
+    pub fn new(
+        table_bucket: TableBucket,
+        fetch_response: &PbFetchLogRespForBucket,
+        read_context: ReadContext,
+        fetch_offset: i64,
+    ) -> Result<Self> {
+        let data = fetch_response.records.clone().unwrap_or_default();
+        let size_in_bytes = data.len();
+        let high_watermark = fetch_response.high_watermark.unwrap_or(-1);
+
+        // Parse all records upfront
+        let mut records = Vec::new();
+        for log_record in &mut LogRecordsBatchs::new(&data) {
+            let last_offset = log_record.last_log_offset();
+            let batch_records = log_record.records(&read_context)?;
+            for record in batch_records {
+                records.push(record);
+            }
+            // Update next_fetch_offset based on the last batch
+            let next_offset = last_offset + 1;
+            // We'll update this when we actually consume records
+        }
+
+        // Set next_fetch_offset based on the last record if available
+        let next_fetch_offset = if let Some(last_record) = records.last() {
+            last_record.offset() + 1
+        } else {
+            fetch_offset
+        };
+
+        Ok(Self {
+            table_bucket,
+            data,
+            read_context,
+            fetch_offset,
+            next_fetch_offset,
+            high_watermark,
+            size_in_bytes,
+            consumed: false,
+            initialized: false,
+            records,
+            current_index: 0,
+        })
+    }
+}
+
+impl CompletedFetch for DefaultCompletedFetch {
+    fn table_bucket(&self) -> &TableBucket {
+        &self.table_bucket
+    }
+
+    fn fetch_records(&mut self, max_records: usize) -> Result<Vec<ScanRecord>> 
{
+        if self.consumed {
+            return Ok(Vec::new());
+        }
+
+        let end_index = std::cmp::min(self.current_index + max_records, 
self.records.len());
+        let records = self.records[self.current_index..end_index].to_vec();
+        self.current_index = end_index;
+
+        if self.current_index >= self.records.len() {
+            self.consumed = true;
+            // Update next_fetch_offset based on the last record
+            if let Some(last_record) = self.records.last() {
+                self.next_fetch_offset = last_record.offset() + 1;
+            }
+        } else if let Some(last_record) = records.last() {
+            // Update next_fetch_offset as we consume records
+            self.next_fetch_offset = last_record.offset() + 1;
+        }
+
+        Ok(records)
+    }
+
+    fn next_fetch_offset(&self) -> i64 {
+        self.next_fetch_offset
+    }
+
+    fn is_consumed(&self) -> bool {
+        self.consumed
+    }
+
+    fn drain(&mut self) {
+        self.consumed = true;
+        self.current_index = self.records.len();
+    }
+
+    fn size_in_bytes(&self) -> usize {
+        self.size_in_bytes
+    }
+
+    fn high_watermark(&self) -> i64 {
+        self.high_watermark
+    }
+
+    fn is_initialized(&self) -> bool {
+        self.initialized
+    }
+
+    fn set_initialized(&mut self) {
+        self.initialized = true;
+    }
+
+    fn fetch_offset(&self) -> i64 {
+        self.fetch_offset
+    }
+}
diff --git a/crates/fluss/src/client/table/mod.rs 
b/crates/fluss/src/client/table/mod.rs
index 9972247..e2cf9e6 100644
--- a/crates/fluss/src/client/table/mod.rs
+++ b/crates/fluss/src/client/table/mod.rs
@@ -26,6 +26,7 @@ pub const EARLIEST_OFFSET: i64 = -2;
 
 mod append;
 
+mod log_fetch_buffer;
 mod remote_log;
 mod scanner;
 mod writer;
diff --git a/crates/fluss/src/client/table/remote_log.rs 
b/crates/fluss/src/client/table/remote_log.rs
index 10273dd..4c835bb 100644
--- a/crates/fluss/src/client/table/remote_log.rs
+++ b/crates/fluss/src/client/table/remote_log.rs
@@ -14,16 +14,18 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
+use crate::client::table::log_fetch_buffer::{CompletedFetch, 
DefaultCompletedFetch, PendingFetch};
 use crate::error::{Error, Result};
 use crate::io::{FileIO, Storage};
 use crate::metadata::TableBucket;
+use crate::proto::PbFetchLogRespForBucket;
 use crate::proto::{PbRemoteLogFetchInfo, PbRemoteLogSegment};
-use crate::record::{LogRecordsBatchs, ReadContext, ScanRecord};
+use crate::record::ReadContext;
 use crate::util::delete_file;
-use parking_lot::RwLock;
-use std::collections::HashMap;
+use parking_lot::Mutex;
 use std::io;
 use std::path::{Path, PathBuf};
+use std::sync::Arc;
 use tempfile::TempDir;
 use tokio::io::AsyncWriteExt;
 use tokio::sync::oneshot;
@@ -86,15 +88,67 @@ impl RemoteLogFetchInfo {
     }
 }
 
+type CompletionCallback = Box<dyn Fn() + Send + Sync>;
+
 /// Future for a remote log download request
 pub struct RemoteLogDownloadFuture {
-    receiver: Option<oneshot::Receiver<Result<PathBuf>>>,
+    result: Arc<Mutex<Option<Result<PathBuf>>>>,
+    completion_callbacks: Arc<Mutex<Vec<CompletionCallback>>>,
 }
 
 impl RemoteLogDownloadFuture {
     pub fn new(receiver: oneshot::Receiver<Result<PathBuf>>) -> Self {
+        let result = Arc::new(Mutex::new(None));
+        let result_clone = Arc::clone(&result);
+        let completion_callbacks: Arc<Mutex<Vec<CompletionCallback>>> = 
Arc::new(Mutex::new(Vec::new()));
+        let callbacks_clone = Arc::clone(&completion_callbacks);
+
+        // Spawn a task to wait for the download and update result, then call 
callbacks
+        tokio::spawn(async move {
+            let download_result = match receiver.await {
+                Ok(Ok(path)) => Ok(path),
+                Ok(Err(e)) => Err(e),
+                Err(e) => Err(Error::Io(io::Error::other(format!("Download 
future cancelled: {e:?}")))),
+            };
+            *result_clone.lock() = Some(download_result);
+
+            // Call all registered callbacks
+            // We need to take the callbacks to avoid holding the lock while 
calling them
+            // This also ensures that any callbacks registered after this 
point will be called immediately
+            let callbacks: Vec<CompletionCallback> = {
+                let mut callbacks_guard = callbacks_clone.lock();
+                std::mem::take(&mut *callbacks_guard)
+            };
+            for callback in callbacks {
+                callback();
+            }
+
+            // After calling callbacks, any new callbacks registered will see 
is_done() == true
+            // and will be called immediately in on_complete()
+        });
+
         Self {
-            receiver: Some(receiver),
+            result,
+            completion_callbacks,
+        }
+    }
+
+    /// Register a callback to be called when download completes (similar to 
Java's onComplete)
+    pub fn on_complete<F>(&self, callback: F)
+    where
+        F: Fn() + Send + Sync + 'static,
+    {
+        // Check if already completed - need to check while holding the lock 
to avoid race condition
+        let mut callbacks_guard = self.completion_callbacks.lock();
+        let is_done = self.is_done();
+
+        if is_done {
+            // If already completed, call immediately (drop lock first to 
avoid deadlock)
+            drop(callbacks_guard);
+            callback();
+        } else {
+            // Otherwise, register the callback
+            callbacks_guard.push(Box::new(callback));
         }
     }
 
@@ -135,7 +189,7 @@ impl RemoteLogDownloader {
         &self,
         remote_log_tablet_dir: &str,
         segment: &RemoteLogSegment,
-    ) -> Result<RemoteLogDownloadFuture> {
+    ) -> RemoteLogDownloadFuture {
         let (sender, receiver) = oneshot::channel();
         let local_file_name = segment.local_file_name();
         let local_file_path = self.local_log_dir.path().join(&local_file_name);
@@ -144,16 +198,11 @@ impl RemoteLogDownloader {
         let remote_fs_props = self.remote_fs_props.read().clone();
         // Spawn async download task
         tokio::spawn(async move {
-            let result = Self::download_file(
-                &remote_log_tablet_dir,
-                &remote_path,
-                &local_file_path,
-                &remote_fs_props,
-            )
-            .await;
+            let result =
+                Self::download_file(&remote_log_tablet_dir, &remote_path, 
&local_file_path).await;
             let _ = sender.send(result);
         });
-        Ok(RemoteLogDownloadFuture::new(receiver))
+        RemoteLogDownloadFuture::new(receiver)
     }
 
     /// Build the remote path for a log segment
@@ -256,14 +305,13 @@ impl RemoteLogDownloader {
     }
 }
 
+
 /// Pending fetch that waits for remote log file to be downloaded
 pub struct RemotePendingFetch {
     segment: RemoteLogSegment,
     download_future: RemoteLogDownloadFuture,
     pos_in_log_segment: i32,
-    #[allow(dead_code)]
     fetch_offset: i64,
-    #[allow(dead_code)]
     high_watermark: i64,
     read_context: ReadContext,
 }
@@ -286,13 +334,26 @@ impl RemotePendingFetch {
             read_context,
         }
     }
+}
 
-    /// Convert to completed fetch by reading the downloaded file
-    pub async fn convert_to_completed_fetch(
-        mut self,
-    ) -> Result<HashMap<TableBucket, Vec<ScanRecord>>> {
-        let file_path = self.download_future.get_file_path().await?;
-        let file_data = tokio::fs::read(&file_path).await?;
+impl PendingFetch for RemotePendingFetch {
+    fn table_bucket(&self) -> &TableBucket {
+        &self.segment.table_bucket
+    }
+
+    fn is_completed(&self) -> bool {
+        self.download_future.is_done()
+    }
+
+    fn to_completed_fetch(self: Box<Self>) -> Result<Box<dyn CompletedFetch>> {
+        // Get the file path (this should only be called when is_completed() 
returns true)
+        let file_path = self.download_future.get_file_path()?;
+
+        // Read the file data synchronously (we're in a sync context)
+        // Note: This is a limitation - we need to use blocking I/O here
+        let file_data = std::fs::read(&file_path).map_err(|e| {
+            Error::Io(io::Error::other(format!("Failed to read downloaded 
file: {e:?}")))
+        })?;
 
         // Slice the data if needed
         let data = if self.pos_in_log_segment > 0 {
@@ -301,17 +362,33 @@ impl RemotePendingFetch {
             &file_data
         };
 
-        // delete the downloaded local file to free disk
-        delete_file(file_path).await;
+        // Create a mock PbFetchLogRespForBucket for DefaultCompletedFetch
+        // We'll use the data we read from the file
+        let fetch_response = PbFetchLogRespForBucket {
+            bucket_id: self.segment.table_bucket.bucket_id(),
+            partition_id: None,
+            error_code: None,
+            error_message: None,
+            high_watermark: Some(self.high_watermark),
+            log_start_offset: None,
+            remote_log_fetch_info: None,
+            records: Some(data.to_vec()),
+        };
 
-        // Parse log records (remote log contains full data, need client-side 
projection)
-        let mut fetch_records = vec![];
-        for log_record in &mut LogRecordsBatchs::new(data) {
-            
fetch_records.extend(log_record.records_for_remote_log(&self.read_context)?);
-        }
+        // Create DefaultCompletedFetch from the data
+        let completed_fetch = DefaultCompletedFetch::new(
+            self.segment.table_bucket,
+            &fetch_response,
+            self.read_context,
+            self.fetch_offset,
+        )?;
+
+        // Delete the downloaded local file to free disk (async, but we'll do 
it in background)
+        let file_path_clone = file_path.clone();
+        tokio::spawn(async move {
+            let _ = delete_file(file_path_clone).await;
+        });
 
-        let mut result = HashMap::new();
-        result.insert(self.segment.table_bucket.clone(), fetch_records);
-        Ok(result)
+        Ok(Box::new(completed_fetch))
     }
 }
diff --git a/crates/fluss/src/client/table/scanner.rs 
b/crates/fluss/src/client/table/scanner.rs
index 1e70649..5053865 100644
--- a/crates/fluss/src/client/table/scanner.rs
+++ b/crates/fluss/src/client/table/scanner.rs
@@ -21,17 +21,20 @@ use crate::client::metadata::Metadata;
 use crate::error::{Error, Result};
 use crate::metadata::{TableBucket, TableInfo, TablePath};
 use crate::proto::{FetchLogRequest, PbFetchLogReqForBucket, 
PbFetchLogReqForTable};
-use crate::record::{LogRecordsBatchs, ReadContext, ScanRecord, ScanRecords, 
to_arrow_schema};
-use crate::rpc::RpcClient;
+use crate::record::{to_arrow_schema, ReadContext, ScanRecord, ScanRecords};
+use crate::rpc::{message, RpcClient};
 use crate::util::FairBucketStatusMap;
 use arrow_schema::SchemaRef;
-use parking_lot::RwLock;
-use std::collections::HashMap;
+use parking_lot::{Mutex, RwLock};
+use std::collections::{HashMap, HashSet};
 use std::slice::from_ref;
 use std::sync::Arc;
 use std::time::Duration;
 use tempfile::TempDir;
-
+use log::warn;
+use crate::client::table::log_fetch_buffer::{
+    CompletedFetch, DefaultCompletedFetch, LogFetchBuffer,
+};
 use crate::client::table::remote_log::{
     RemoteLogDownloader, RemoteLogFetchInfo, RemotePendingFetch,
 };
@@ -171,8 +174,39 @@ impl LogScanner {
         })
     }
 
-    pub async fn poll(&self, _timeout: Duration) -> Result<ScanRecords> {
-        Ok(ScanRecords::new(self.poll_for_fetches().await?))
+    pub async fn poll(&self, timeout: Duration) -> Result<ScanRecords> {
+        let start = std::time::Instant::now();
+        let deadline = start + timeout;
+
+        loop {
+            // Try to collect fetches
+            let fetch_result = self.poll_for_fetches().await?;
+
+            if !fetch_result.is_empty() {
+                // We have data, send next round of fetches and return
+                // This enables pipelining while user processes the data
+                self.log_fetcher.send_fetches().await?;
+                return Ok(ScanRecords::new(fetch_result));
+            }
+
+            // No data available, check if we should wait
+            let now = std::time::Instant::now();
+            if now >= deadline {
+                // Timeout reached, return empty result
+                return Ok(ScanRecords::new(HashMap::new()));
+            }
+
+            // Wait for buffer to become non-empty with remaining time
+            let remaining = deadline - now;
+            let has_data = 
self.log_fetcher.log_fetch_buffer.await_not_empty(remaining).await;
+
+            if !has_data {
+                // Timeout while waiting
+                return Ok(ScanRecords::new(HashMap::new()));
+            }
+
+            // Buffer became non-empty, try again
+        }
     }
 
     pub async fn subscribe(&self, bucket: i32, offset: i64) -> Result<()> {
@@ -186,11 +220,14 @@ impl LogScanner {
     }
 
     async fn poll_for_fetches(&self) -> Result<HashMap<TableBucket, 
Vec<ScanRecord>>> {
-        self.log_fetcher.send_fetches_and_collect().await
+        // Send fetch requests (non-blocking)
+        self.log_fetcher.send_fetches().await?;
+
+        // Collect completed fetches from buffer
+        self.log_fetcher.collect_fetches()
     }
 }
 
-#[allow(dead_code)]
 struct LogFetcher {
     table_path: TablePath,
     conns: Arc<RpcClient>,
@@ -198,8 +235,10 @@ struct LogFetcher {
     metadata: Arc<Metadata>,
     log_scanner_status: Arc<LogScannerStatus>,
     read_context: ReadContext,
-    remote_log_downloader: RemoteLogDownloader,
+    remote_log_downloader: Arc<RemoteLogDownloader>,
     credentials_cache: CredentialsCache,
+    log_fetch_buffer: Arc<LogFetchBuffer>,
+    nodes_with_pending_fetch_requests: Arc<Mutex<HashSet<i32>>>,
 }
 
 impl LogFetcher {
@@ -222,8 +261,10 @@ impl LogFetcher {
             metadata,
             log_scanner_status,
             read_context,
-            remote_log_downloader: RemoteLogDownloader::new(tmp_dir)?,
+            remote_log_downloader: 
Arc::new(RemoteLogDownloader::new(tmp_dir)?),
             credentials_cache: CredentialsCache::new(),
+            log_fetch_buffer: Arc::new(LogFetchBuffer::new()),
+            nodes_with_pending_fetch_requests: 
Arc::new(Mutex::new(HashSet::new())),
         })
     }
 
@@ -237,51 +278,108 @@ impl LogFetcher {
         }
     }
 
-    async fn send_fetches_and_collect(&self) -> Result<HashMap<TableBucket, 
Vec<ScanRecord>>> {
+    /// Send fetch requests asynchronously without waiting for responses
+    async fn send_fetches(&self) -> Result<()> {
         let fetch_request = self.prepare_fetch_log_requests().await;
-        let mut result: HashMap<TableBucket, Vec<ScanRecord>> = HashMap::new();
+
         for (leader, fetch_request) in fetch_request {
-            let cluster = self.metadata.get_cluster();
-            let server_node = cluster
-                .get_tablet_server(leader)
-                .expect("todo: handle leader not exist.");
-            let con = self.conns.get_connection(server_node).await?;
-
-            let fetch_response = con
-                
.request(crate::rpc::message::FetchLogRequest::new(fetch_request))
-                .await?;
-
-            for pb_fetch_log_resp in fetch_response.tables_resp {
-                let table_id = pb_fetch_log_resp.table_id;
-                let fetch_log_for_buckets = pb_fetch_log_resp.buckets_resp;
-
-                for fetch_log_for_bucket in fetch_log_for_buckets {
-                    let bucket: i32 = fetch_log_for_bucket.bucket_id;
-                    let table_bucket = TableBucket::new(table_id, bucket);
-
-                    // Check if this is a remote log fetch
-                    if let Some(ref remote_log_fetch_info) =
-                        fetch_log_for_bucket.remote_log_fetch_info
-                    {
-                        let remote_fs_props = self
-                            .credentials_cache
-                            .get_or_refresh(&self.conns, &self.metadata)
-                            .await?;
-                        self.remote_log_downloader
-                            .set_remote_fs_props(remote_fs_props);
-                        let remote_fetch_info = RemoteLogFetchInfo::from_proto(
-                            remote_log_fetch_info,
-                            table_bucket.clone(),
-                        )?;
+            // Check if we already have a pending request for this node
+            {
+                self.nodes_with_pending_fetch_requests.lock()
+                    .insert(leader);
+            }
+
+            let cluster = self.metadata.get_cluster().clone();
+
+            let conns = Arc::clone(&self.conns);
+            let log_fetch_buffer = self.log_fetch_buffer.clone();
+            let log_scanner_status = self.log_scanner_status.clone();
+            let read_context = self.read_context.clone();
+            let remote_log_downloader = 
Arc::clone(&self.remote_log_downloader);
+            let nodes_with_pending = 
self.nodes_with_pending_fetch_requests.clone();
+
+            // Spawn async task to handle the fetch request
+            tokio::spawn(async move {
+                let server_node = cluster
+                    .get_tablet_server(leader)
+                    .expect("todo: handle leader not exist.").clone();
+                let result = conns.get_connection(&server_node).await;
+                match result {
+                    Ok(con) => {
+                        let fetch_result = con
+                            
.request(message::FetchLogRequest::new(fetch_request))
+                            .await;
+
+                        match fetch_result {
+                            Ok(fetch_response) => {
+                                Self::handle_fetch_response(
+                                    fetch_response,
+                                    &log_fetch_buffer,
+                                    &log_scanner_status,
+                                    &read_context,
+                                    &remote_log_downloader,
+                                ).await;
+                            }
+                            Err(e) => {
+                                warn!("Failed to fetch log from destination 
node {:?}: {:?}",
+                                    server_node,
+                                    e);
+                            }
+                        }
+                    }
+                    Err(e) => {
+                        warn!("Failed to get connection to destination node: 
{:?}", e);
+                    }
+                }
+
+                // Remove from pending set
+                nodes_with_pending.lock().remove(&leader);
+            });
+        }
+
+        Ok(())
+    }
+
+    /// Handle fetch response and add completed fetches to buffer
+    async fn handle_fetch_response(
+        fetch_response: crate::proto::FetchLogResponse,
+        log_fetch_buffer: &Arc<LogFetchBuffer>,
+        log_scanner_status: &Arc<LogScannerStatus>,
+        read_context: &ReadContext,
+        remote_log_downloader: &Arc<RemoteLogDownloader>,
+    ) {
+        for pb_fetch_log_resp in fetch_response.tables_resp {
+            let table_id = pb_fetch_log_resp.table_id;
+            let fetch_log_for_buckets = pb_fetch_log_resp.buckets_resp;
+
+            for fetch_log_for_bucket in fetch_log_for_buckets {
+                let bucket: i32 = fetch_log_for_bucket.bucket_id;
+                let table_bucket = TableBucket::new(table_id, bucket);
+
+                // Check if this is a remote log fetch
+                if let Some(ref remote_log_fetch_info) =
+                    fetch_log_for_bucket.remote_log_fetch_info
+                {
+
+                    let remote_fetch_info = match 
RemoteLogFetchInfo::from_proto(
+                        remote_log_fetch_info,
+                        table_bucket.clone(),
+                    ) {
+                        Ok(info) => info,
+                        Err(e) => {
+                            eprintln!("Failed to parse remote log fetch info: 
{:?}", e);
+                            continue;
+                        }
+                    };
 
                         if let Some(fetch_offset) =
-                            
self.log_scanner_status.get_bucket_offset(&table_bucket)
+                            log_scanner_status.get_bucket_offset(&table_bucket)
                         {
                             let high_watermark = 
fetch_log_for_bucket.high_watermark.unwrap_or(-1);
                             // Download and process remote log segments
                             let mut pos_in_log_segment = 
remote_fetch_info.first_start_pos;
                             let mut current_fetch_offset = fetch_offset;
-                            // todo: make segment download in parallel
+                            // todo: make segment download parallelly
                             for (i, segment) in
                                 
remote_fetch_info.remote_log_segments.iter().enumerate()
                             {
@@ -290,46 +388,115 @@ impl LogFetcher {
                                     current_fetch_offset = 
segment.start_offset;
                                 }
 
-                                let download_future =
-                                    
self.remote_log_downloader.request_remote_log(
-                                        
&remote_fetch_info.remote_log_tablet_dir,
-                                        segment,
-                                    )?;
-                                let pending_fetch = RemotePendingFetch::new(
-                                    segment.clone(),
-                                    download_future,
-                                    pos_in_log_segment,
-                                    current_fetch_offset,
-                                    high_watermark,
-                                    self.read_context.clone(),
-                                );
-                                let remote_records =
-                                    
pending_fetch.convert_to_completed_fetch().await?;
-                                // Update offset and merge results
-                                for (tb, records) in remote_records {
-                                    if let Some(last_record) = records.last() {
-                                        self.log_scanner_status
-                                            .update_offset(&tb, 
last_record.offset() + 1);
-                                    }
-                                    
result.entry(tb).or_default().extend(records);
-                                }
+                            let download_future = 
remote_log_downloader.request_remote_log(
+                                &remote_fetch_info.remote_log_tablet_dir,
+                                segment,
+                            );
+
+                            let table_bucket_clone = table_bucket.clone();
+
+                            // Register callback to be called when download 
completes
+                            // (similar to Java's downloadFuture.onComplete)
+                            // This must be done before creating 
RemotePendingFetch to avoid move issues
+                            let log_fetch_buffer_clone = 
Arc::clone(log_fetch_buffer);
+                            download_future.on_complete(move || {
+                                
log_fetch_buffer_clone.try_complete(&table_bucket_clone);
+                            });
+
+                            let pending_fetch = RemotePendingFetch::new(
+                                segment.clone(),
+                                download_future,
+                                pos_in_log_segment,
+                                current_fetch_offset,
+                                high_watermark,
+                                read_context.clone(),
+                            );
+                            // Add to pending fetches in buffer (similar to 
Java's logFetchBuffer.pend)
+                            log_fetch_buffer.pend(Box::new(pending_fetch));
+                        }
+                    } else {
+                        // if the offset is null, it means the bucket has been 
unsubscribed,
+                        // skip processing and continue to the next bucket.
+                        continue;
+                    }
+                } else if fetch_log_for_bucket.records.is_some() {
+                    // Handle regular in-memory records - create completed 
fetch directly
+                    if let Some(fetch_offset) = 
log_scanner_status.get_bucket_offset(&table_bucket) {
+                        match DefaultCompletedFetch::new(
+                            table_bucket.clone(),
+                            &fetch_log_for_bucket,
+                            read_context.clone(),
+                            fetch_offset,
+                        ) {
+                            Ok(completed_fetch) => {
+                                
log_fetch_buffer.add(Box::new(completed_fetch));
+                            }
+                            Err(e) => {
+                                // todo: handle error
+                                eprintln!("Failed to create completed fetch: 
{:?}", e);
                             }
-                        } else {
-                            // if the offset is null, it means the bucket has 
been unsubscribed,
-                            // skip processing and continue to the next bucket.
-                            continue;
                         }
-                    } else if fetch_log_for_bucket.records.is_some() {
-                        // Handle regular in-memory records
-                        let mut fetch_records = vec![];
-                        let data = fetch_log_for_bucket.records.unwrap();
-                        for log_record in &mut LogRecordsBatchs::new(&data) {
-                            let last_offset = log_record.last_log_offset();
-                            
fetch_records.extend(log_record.records(&self.read_context)?);
-                            self.log_scanner_status
-                                .update_offset(&table_bucket, last_offset + 1);
+                    }
+                }
+            }
+        }
+    }
+
+    /// Collect completed fetches from buffer
+    /// Reference: LogFetchCollector.collectFetch in Java
+    fn collect_fetches(&self) -> Result<HashMap<TableBucket, Vec<ScanRecord>>> 
{
+        const MAX_POLL_RECORDS: usize = 500; // Default max poll records
+        let mut result: HashMap<TableBucket, Vec<ScanRecord>> = HashMap::new();
+        let mut records_remaining = MAX_POLL_RECORDS;
+
+        while records_remaining > 0 {
+            // Get the next in line fetch, or get a new one from buffer
+            let next_in_line = self.log_fetch_buffer.next_in_line_fetch();
+
+            if next_in_line.is_none() || 
next_in_line.as_ref().unwrap().is_consumed() {
+                // Get a new fetch from buffer
+                if let Some(completed_fetch) = self.log_fetch_buffer.poll() {
+                    // Initialize the fetch if not already initialized
+                    if !completed_fetch.is_initialized() {
+                        let size_in_bytes = completed_fetch.size_in_bytes();
+                        match self.initialize_fetch(completed_fetch) {
+                            Ok(initialized) => {
+                                
self.log_fetch_buffer.set_next_in_line_fetch(initialized);
+                                continue;
+                            }
+                            Err(e) => {
+                                // Remove a completedFetch upon a parse with 
exception if
+                                // (1) it contains no records, and
+                                // (2) there are no fetched records with 
actual content preceding this
+                                // exception.
+                                if result.is_empty() && size_in_bytes == 0 {
+                                    // todo: consider it?
+                                    // self.log_fetch_buffer.poll();
+                                }
+                                return Err(e);
+                            }
                         }
-                        result.insert(table_bucket, fetch_records);
+                    } else {
+                        
self.log_fetch_buffer.set_next_in_line_fetch(Some(completed_fetch));
+                    }
+                    // Note: peek() already removed the fetch from buffer, so 
no need to call poll()
+                } else {
+                    // No more fetches available
+                    break;
+                }
+            } else {
+                // Fetch records from next_in_line
+                if let Some(mut next_fetch) = next_in_line {
+                    let records = self.fetch_records_from_fetch(&mut 
next_fetch, records_remaining)?;
+
+                    if !records.is_empty() {
+                        let table_bucket = next_fetch.table_bucket().clone();
+                        // Merge with existing records for this bucket
+                        let existing = result.entry(table_bucket).or_default();
+                        let records_count = records.len();
+                        existing.extend(records);
+
+                        records_remaining = 
records_remaining.saturating_sub(records_count);
                     }
                 }
             }
@@ -338,6 +505,90 @@ impl LogFetcher {
         Ok(result)
     }
 
+    /// Initialize a completed fetch, checking offset match and updating high 
watermark
+    fn initialize_fetch(
+        &self,
+        mut completed_fetch: Box<dyn CompletedFetch>,
+    ) -> Result<Option<Box<dyn CompletedFetch>>> {
+
+        // todo: handle initialize failure
+
+        let table_bucket = completed_fetch.table_bucket().clone();
+        let fetch_offset = completed_fetch.fetch_offset();
+
+        // Check if bucket is still subscribed
+        let current_offset = 
self.log_scanner_status.get_bucket_offset(&table_bucket);
+        if current_offset.is_none() {
+            warn!(
+                "Discarding stale fetch response for bucket {:?} since the 
bucket has been unsubscribed",
+                table_bucket
+            );
+            return Ok(None);
+        }
+
+        let current_offset = current_offset.unwrap();
+
+        // Check if offset matches
+        if fetch_offset != current_offset {
+            warn!(
+                "Discarding stale fetch response for bucket {:?} since its 
offset {} does not match the expected offset {}",
+                table_bucket, fetch_offset, current_offset
+            );
+            return Ok(None);
+        }
+
+        // Update high watermark
+        let high_watermark = completed_fetch.high_watermark();
+        if high_watermark >= 0 {
+            self.log_scanner_status.update_high_watermark(&table_bucket, 
high_watermark);
+        }
+
+        completed_fetch.set_initialized();
+        Ok(Some(completed_fetch))
+    }
+
+    /// Fetch records from a completed fetch, checking offset match
+    fn fetch_records_from_fetch(
+        &self,
+        next_in_line_fetch: &mut Box<dyn CompletedFetch>,
+        max_records: usize,
+    ) -> Result<Vec<ScanRecord>> {
+        let table_bucket = next_in_line_fetch.table_bucket().clone();
+        let current_offset = 
self.log_scanner_status.get_bucket_offset(&table_bucket);
+
+        if current_offset.is_none() {
+            warn!(
+                "Ignoring fetched records for {:?} since the bucket has been 
unsubscribed",
+                table_bucket
+            );
+            next_in_line_fetch.drain();
+            return Ok(Vec::new());
+        }
+
+        let current_offset = current_offset.unwrap();
+        let fetch_offset = next_in_line_fetch.fetch_offset();
+
+        // Check if this fetch is next in line
+        if fetch_offset == current_offset {
+            let records = next_in_line_fetch.fetch_records(max_records)?;
+            let next_fetch_offset = next_in_line_fetch.next_fetch_offset();
+
+            if next_fetch_offset > current_offset {
+                self.log_scanner_status.update_offset(&table_bucket, 
next_fetch_offset);
+            }
+
+            Ok(records)
+        } else {
+            // These records aren't next in line, ignore them
+            warn!(
+                "Ignoring fetched records for {:?} at offset {} since the 
current offset is {}",
+                table_bucket, fetch_offset, current_offset
+            );
+            next_in_line_fetch.drain();
+            Ok(Vec::new())
+        }
+    }
+
     async fn prepare_fetch_log_requests(&self) -> HashMap<i32, 
FetchLogRequest> {
         let mut fetch_log_req_for_buckets = HashMap::new();
         let mut table_id = None;
@@ -356,19 +607,22 @@ impl LogFetcher {
             };
 
             if let Some(leader) = self.get_table_bucket_leader(&bucket) {
-                let fetch_log_req_for_bucket = PbFetchLogReqForBucket {
-                    partition_id: None,
-                    bucket_id: bucket.bucket_id(),
-                    fetch_offset: offset,
-                    // 1M
-                    max_fetch_bytes: 1024 * 1024,
-                };
+                if !self.nodes_with_pending_fetch_requests.lock()
+                    .contains(&leader) {
+                    let fetch_log_req_for_bucket = PbFetchLogReqForBucket {
+                        partition_id: None,
+                        bucket_id: bucket.bucket_id(),
+                        fetch_offset: offset,
+                        // 1M
+                        max_fetch_bytes: 1024 * 1024,
+                    };
 
-                fetch_log_req_for_buckets
-                    .entry(leader)
-                    .or_insert_with(Vec::new)
-                    .push(fetch_log_req_for_bucket);
-                ready_for_fetch_count += 1;
+                    fetch_log_req_for_buckets
+                        .entry(leader)
+                        .or_insert_with(Vec::new)
+                        .push(fetch_log_req_for_bucket);
+                    ready_for_fetch_count += 1;
+                }
             }
         }
 
@@ -405,8 +659,10 @@ impl LogFetcher {
     }
 
     fn fetchable_buckets(&self) -> Vec<TableBucket> {
-        // always available now
-        self.log_scanner_status.fetchable_buckets(|_| true)
+        // Get buckets that are not already in the buffer
+        let buffered = self.log_fetch_buffer.buffered_buckets();
+        let buffered_set: HashSet<TableBucket> = 
buffered.into_iter().collect();
+        self.log_scanner_status.fetchable_buckets(|tb| 
!buffered_set.contains(tb))
     }
 
     fn get_table_bucket_leader(&self, tb: &TableBucket) -> Option<i32> {
diff --git a/crates/fluss/tests/integration/table.rs 
b/crates/fluss/tests/integration/table.rs
index a058bfe..a54f469 100644
--- a/crates/fluss/tests/integration/table.rs
+++ b/crates/fluss/tests/integration/table.rs
@@ -148,7 +148,7 @@ mod table_test {
         }
 
         let scan_records = log_scanner
-            .poll(std::time::Duration::from_secs(5))
+            .poll(std::time::Duration::from_secs(60))
             .await
             .expect("Failed to poll");
 


Reply via email to