This is an automated email from the ASF dual-hosted git repository. yuxia pushed a commit to branch improve-log-poll in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
commit 3bd59edc14997bd41691e657c73909e4d2df4f9f Author: luoyuxia <[email protected]> AuthorDate: Mon Dec 15 09:28:05 2025 +0800 improve log poll --- crates/fluss/src/client/table/log_fetch_buffer.rs | 417 ++++++++++++++++++++ crates/fluss/src/client/table/mod.rs | 1 + crates/fluss/src/client/table/remote_log.rs | 141 +++++-- crates/fluss/src/client/table/scanner.rs | 452 +++++++++++++++++----- crates/fluss/tests/integration/table.rs | 2 +- 5 files changed, 882 insertions(+), 131 deletions(-) diff --git a/crates/fluss/src/client/table/log_fetch_buffer.rs b/crates/fluss/src/client/table/log_fetch_buffer.rs new file mode 100644 index 0000000..eca4de9 --- /dev/null +++ b/crates/fluss/src/client/table/log_fetch_buffer.rs @@ -0,0 +1,417 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::error::{Error, Result}; +use crate::metadata::TableBucket; +use crate::proto::PbFetchLogRespForBucket; +use crate::record::{LogRecordsBatchs, ReadContext, ScanRecord}; +use parking_lot::Mutex; +use std::collections::{HashMap, VecDeque}; +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::time::Duration; +use tokio::sync::Notify; + +/// Represents a completed fetch that can be consumed +pub trait CompletedFetch: Send + Sync { + fn table_bucket(&self) -> &TableBucket; + fn fetch_records(&mut self, max_records: usize) -> Result<Vec<ScanRecord>>; + fn next_fetch_offset(&self) -> i64; + fn is_consumed(&self) -> bool; + fn drain(&mut self); + fn size_in_bytes(&self) -> usize; + fn high_watermark(&self) -> i64; + fn is_initialized(&self) -> bool; + fn set_initialized(&mut self); + fn fetch_offset(&self) -> i64; +} + +/// Represents a pending fetch that is waiting to be completed +pub trait PendingFetch: Send + Sync { + fn table_bucket(&self) -> &TableBucket; + fn is_completed(&self) -> bool; + fn to_completed_fetch(self: Box<Self>) -> Result<Box<dyn CompletedFetch>>; +} + +/// Thread-safe buffer for completed fetches +pub struct LogFetchBuffer { + completed_fetches: Mutex<VecDeque<Box<dyn CompletedFetch>>>, + pending_fetches: Mutex<HashMap<TableBucket, VecDeque<Box<dyn PendingFetch>>>>, + next_in_line_fetch: Mutex<Option<Box<dyn CompletedFetch>>>, + not_empty_notify: Notify, + woken_up: Arc<AtomicBool>, +} + +impl LogFetchBuffer { + pub fn new() -> Self { + Self { + completed_fetches: Mutex::new(VecDeque::new()), + pending_fetches: Mutex::new(HashMap::new()), + next_in_line_fetch: Mutex::new(None), + not_empty_notify: Notify::new(), + woken_up: Arc::new(AtomicBool::new(false)), + } + } + + /// Check if the buffer is empty + pub fn is_empty(&self) -> bool { + self.completed_fetches.lock().is_empty() + } + + /// Wait for the buffer to become non-empty, with timeout + /// Returns true if data became available, false if timeout + pub async fn await_not_empty(&self, timeout: Duration) -> bool { + let deadline = std::time::Instant::now() + timeout; + + loop { + // Check if buffer is not empty + if !self.is_empty() { + return true; + } + + // Check if woken up + if self.woken_up.swap(false, Ordering::Acquire) { + return true; + } + + // Check if timeout + let now = std::time::Instant::now(); + if now >= deadline { + return false; + } + + // Wait for notification with remaining time + let remaining = deadline - now; + let notified = self.not_empty_notify.notified(); + tokio::select! { + _ = tokio::time::sleep(remaining) => { + return false; // Timeout + } + _ = notified => { + // Got notification, check again + continue; + } + } + } + } + + /// Wake up any waiting threads + pub fn wakeup(&self) { + self.woken_up.store(true, Ordering::Release); + self.not_empty_notify.notify_waiters(); + } + + /// Add a pending fetch to the buffer + pub fn pend(&self, pending_fetch: Box<dyn PendingFetch>) { + let table_bucket = pending_fetch.table_bucket().clone(); + self.pending_fetches + .lock() + .entry(table_bucket) + .or_insert_with(VecDeque::new) + .push_back(pending_fetch); + } + + /// Try to complete pending fetches in order, converting them to completed fetches + pub fn try_complete(&self, table_bucket: &TableBucket) { + let mut pending_map = self.pending_fetches.lock(); + if let Some(pendings) = pending_map.get_mut(table_bucket) { + let mut has_completed = false; + while let Some(front) = pendings.front() { + if front.is_completed() { + if let Some(pending) = pendings.pop_front() { + match pending.to_completed_fetch() { + Ok(completed) => { + self.completed_fetches.lock().push_back(completed); + // Signal that buffer is not empty + self.not_empty_notify.notify_waiters(); + has_completed = true; + } + Err(_) => { + // Skip failed fetches + } + } + } + } else { + break; + } + } + if pendings.is_empty() { + pending_map.remove(table_bucket); + } + } + } + + /// Add a completed fetch to the buffer + pub fn add(&self, completed_fetch: Box<dyn CompletedFetch>) { + let table_bucket = completed_fetch.table_bucket().clone(); + let mut pending_map = self.pending_fetches.lock(); + let should_notify = if let Some(pendings) = pending_map.get_mut(&table_bucket) { + if pendings.is_empty() { + self.completed_fetches.lock().push_back(completed_fetch); + true + } else { + // Convert to pending fetch wrapper + let completed_pending = CompletedPendingFetch::new(completed_fetch); + pendings.push_back(Box::new(completed_pending)); + false + } + } else { + self.completed_fetches.lock().push_back(completed_fetch); + true + }; + + // Signal that buffer is not empty if we added to completed_fetches + if should_notify { + self.not_empty_notify.notify_waiters(); + } + } + + /// Poll the next completed fetch + pub fn poll(&self) -> Option<Box<dyn CompletedFetch>> { + self.completed_fetches.lock().pop_front() + } + + /// Get the next in line fetch + pub fn next_in_line_fetch(&self) -> Option<Box<dyn CompletedFetch>> { + self.next_in_line_fetch.lock().take() + } + + /// Set the next in line fetch + pub fn set_next_in_line_fetch(&self, fetch: Option<Box<dyn CompletedFetch>>) { + *self.next_in_line_fetch.lock() = fetch; + } + + /// Get the set of buckets that have buffered data + pub fn buffered_buckets(&self) -> Vec<TableBucket> { + let mut buckets = Vec::new(); + let completed = self.completed_fetches.lock(); + for fetch in completed.iter() { + buckets.push(fetch.table_bucket().clone()); + } + let pending = self.pending_fetches.lock(); + buckets.extend(pending.keys().cloned()); + buckets + } +} + +impl Default for LogFetchBuffer { + fn default() -> Self { + Self::new() + } +} + +/// Pending fetch that waits for fetch log response +pub struct FetchPendingFetch { + table_bucket: TableBucket, + response: Arc<Mutex<Option<Result<PbFetchLogRespForBucket>>>>, + read_context: ReadContext, + fetch_offset: i64, +} + +impl FetchPendingFetch { + pub fn new( + table_bucket: TableBucket, + read_context: ReadContext, + fetch_offset: i64, + ) -> (Self, Arc<Mutex<Option<Result<PbFetchLogRespForBucket>>>>) { + let response = Arc::new(Mutex::new(None)); + let pending = Self { + table_bucket, + response: Arc::clone(&response), + read_context, + fetch_offset, + }; + (pending, response) + } + + pub fn set_response(&self, response: Result<PbFetchLogRespForBucket>) { + *self.response.lock() = Some(response); + } +} + +impl PendingFetch for FetchPendingFetch { + fn table_bucket(&self) -> &TableBucket { + &self.table_bucket + } + + fn is_completed(&self) -> bool { + self.response.lock().is_some() + } + + fn to_completed_fetch(self: Box<Self>) -> Result<Box<dyn CompletedFetch>> { + let response = self.response.lock().take().ok_or_else(|| { + Error::Io(std::io::Error::other("Fetch response not available")) + })??; + let completed = DefaultCompletedFetch::new( + self.table_bucket, + &response, + self.read_context, + self.fetch_offset, + )?; + Ok(Box::new(completed)) + } +} + +/// A wrapper that makes a completed fetch look like a pending fetch +struct CompletedPendingFetch { + completed_fetch: Box<dyn CompletedFetch>, +} + +impl CompletedPendingFetch { + fn new(completed_fetch: Box<dyn CompletedFetch>) -> Self { + Self { completed_fetch } + } +} + +impl PendingFetch for CompletedPendingFetch { + fn table_bucket(&self) -> &TableBucket { + self.completed_fetch.table_bucket() + } + + fn is_completed(&self) -> bool { + true + } + + fn to_completed_fetch(self: Box<Self>) -> Result<Box<dyn CompletedFetch>> { + Ok(self.completed_fetch) + } +} + +/// Default implementation of CompletedFetch for in-memory log records +pub struct DefaultCompletedFetch { + table_bucket: TableBucket, + data: Vec<u8>, + read_context: ReadContext, + fetch_offset: i64, // The offset at which this fetch started + next_fetch_offset: i64, + high_watermark: i64, + size_in_bytes: usize, + consumed: bool, + initialized: bool, + // Pre-parsed records for efficient access + records: Vec<ScanRecord>, + current_index: usize, +} + +impl DefaultCompletedFetch { + pub fn new( + table_bucket: TableBucket, + fetch_response: &PbFetchLogRespForBucket, + read_context: ReadContext, + fetch_offset: i64, + ) -> Result<Self> { + let data = fetch_response.records.clone().unwrap_or_default(); + let size_in_bytes = data.len(); + let high_watermark = fetch_response.high_watermark.unwrap_or(-1); + + // Parse all records upfront + let mut records = Vec::new(); + for log_record in &mut LogRecordsBatchs::new(&data) { + let last_offset = log_record.last_log_offset(); + let batch_records = log_record.records(&read_context)?; + for record in batch_records { + records.push(record); + } + // Update next_fetch_offset based on the last batch + let next_offset = last_offset + 1; + // We'll update this when we actually consume records + } + + // Set next_fetch_offset based on the last record if available + let next_fetch_offset = if let Some(last_record) = records.last() { + last_record.offset() + 1 + } else { + fetch_offset + }; + + Ok(Self { + table_bucket, + data, + read_context, + fetch_offset, + next_fetch_offset, + high_watermark, + size_in_bytes, + consumed: false, + initialized: false, + records, + current_index: 0, + }) + } +} + +impl CompletedFetch for DefaultCompletedFetch { + fn table_bucket(&self) -> &TableBucket { + &self.table_bucket + } + + fn fetch_records(&mut self, max_records: usize) -> Result<Vec<ScanRecord>> { + if self.consumed { + return Ok(Vec::new()); + } + + let end_index = std::cmp::min(self.current_index + max_records, self.records.len()); + let records = self.records[self.current_index..end_index].to_vec(); + self.current_index = end_index; + + if self.current_index >= self.records.len() { + self.consumed = true; + // Update next_fetch_offset based on the last record + if let Some(last_record) = self.records.last() { + self.next_fetch_offset = last_record.offset() + 1; + } + } else if let Some(last_record) = records.last() { + // Update next_fetch_offset as we consume records + self.next_fetch_offset = last_record.offset() + 1; + } + + Ok(records) + } + + fn next_fetch_offset(&self) -> i64 { + self.next_fetch_offset + } + + fn is_consumed(&self) -> bool { + self.consumed + } + + fn drain(&mut self) { + self.consumed = true; + self.current_index = self.records.len(); + } + + fn size_in_bytes(&self) -> usize { + self.size_in_bytes + } + + fn high_watermark(&self) -> i64 { + self.high_watermark + } + + fn is_initialized(&self) -> bool { + self.initialized + } + + fn set_initialized(&mut self) { + self.initialized = true; + } + + fn fetch_offset(&self) -> i64 { + self.fetch_offset + } +} diff --git a/crates/fluss/src/client/table/mod.rs b/crates/fluss/src/client/table/mod.rs index 9972247..e2cf9e6 100644 --- a/crates/fluss/src/client/table/mod.rs +++ b/crates/fluss/src/client/table/mod.rs @@ -26,6 +26,7 @@ pub const EARLIEST_OFFSET: i64 = -2; mod append; +mod log_fetch_buffer; mod remote_log; mod scanner; mod writer; diff --git a/crates/fluss/src/client/table/remote_log.rs b/crates/fluss/src/client/table/remote_log.rs index 10273dd..4c835bb 100644 --- a/crates/fluss/src/client/table/remote_log.rs +++ b/crates/fluss/src/client/table/remote_log.rs @@ -14,16 +14,18 @@ // See the License for the specific language governing permissions and // limitations under the License. +use crate::client::table::log_fetch_buffer::{CompletedFetch, DefaultCompletedFetch, PendingFetch}; use crate::error::{Error, Result}; use crate::io::{FileIO, Storage}; use crate::metadata::TableBucket; +use crate::proto::PbFetchLogRespForBucket; use crate::proto::{PbRemoteLogFetchInfo, PbRemoteLogSegment}; -use crate::record::{LogRecordsBatchs, ReadContext, ScanRecord}; +use crate::record::ReadContext; use crate::util::delete_file; -use parking_lot::RwLock; -use std::collections::HashMap; +use parking_lot::Mutex; use std::io; use std::path::{Path, PathBuf}; +use std::sync::Arc; use tempfile::TempDir; use tokio::io::AsyncWriteExt; use tokio::sync::oneshot; @@ -86,15 +88,67 @@ impl RemoteLogFetchInfo { } } +type CompletionCallback = Box<dyn Fn() + Send + Sync>; + /// Future for a remote log download request pub struct RemoteLogDownloadFuture { - receiver: Option<oneshot::Receiver<Result<PathBuf>>>, + result: Arc<Mutex<Option<Result<PathBuf>>>>, + completion_callbacks: Arc<Mutex<Vec<CompletionCallback>>>, } impl RemoteLogDownloadFuture { pub fn new(receiver: oneshot::Receiver<Result<PathBuf>>) -> Self { + let result = Arc::new(Mutex::new(None)); + let result_clone = Arc::clone(&result); + let completion_callbacks: Arc<Mutex<Vec<CompletionCallback>>> = Arc::new(Mutex::new(Vec::new())); + let callbacks_clone = Arc::clone(&completion_callbacks); + + // Spawn a task to wait for the download and update result, then call callbacks + tokio::spawn(async move { + let download_result = match receiver.await { + Ok(Ok(path)) => Ok(path), + Ok(Err(e)) => Err(e), + Err(e) => Err(Error::Io(io::Error::other(format!("Download future cancelled: {e:?}")))), + }; + *result_clone.lock() = Some(download_result); + + // Call all registered callbacks + // We need to take the callbacks to avoid holding the lock while calling them + // This also ensures that any callbacks registered after this point will be called immediately + let callbacks: Vec<CompletionCallback> = { + let mut callbacks_guard = callbacks_clone.lock(); + std::mem::take(&mut *callbacks_guard) + }; + for callback in callbacks { + callback(); + } + + // After calling callbacks, any new callbacks registered will see is_done() == true + // and will be called immediately in on_complete() + }); + Self { - receiver: Some(receiver), + result, + completion_callbacks, + } + } + + /// Register a callback to be called when download completes (similar to Java's onComplete) + pub fn on_complete<F>(&self, callback: F) + where + F: Fn() + Send + Sync + 'static, + { + // Check if already completed - need to check while holding the lock to avoid race condition + let mut callbacks_guard = self.completion_callbacks.lock(); + let is_done = self.is_done(); + + if is_done { + // If already completed, call immediately (drop lock first to avoid deadlock) + drop(callbacks_guard); + callback(); + } else { + // Otherwise, register the callback + callbacks_guard.push(Box::new(callback)); } } @@ -135,7 +189,7 @@ impl RemoteLogDownloader { &self, remote_log_tablet_dir: &str, segment: &RemoteLogSegment, - ) -> Result<RemoteLogDownloadFuture> { + ) -> RemoteLogDownloadFuture { let (sender, receiver) = oneshot::channel(); let local_file_name = segment.local_file_name(); let local_file_path = self.local_log_dir.path().join(&local_file_name); @@ -144,16 +198,11 @@ impl RemoteLogDownloader { let remote_fs_props = self.remote_fs_props.read().clone(); // Spawn async download task tokio::spawn(async move { - let result = Self::download_file( - &remote_log_tablet_dir, - &remote_path, - &local_file_path, - &remote_fs_props, - ) - .await; + let result = + Self::download_file(&remote_log_tablet_dir, &remote_path, &local_file_path).await; let _ = sender.send(result); }); - Ok(RemoteLogDownloadFuture::new(receiver)) + RemoteLogDownloadFuture::new(receiver) } /// Build the remote path for a log segment @@ -256,14 +305,13 @@ impl RemoteLogDownloader { } } + /// Pending fetch that waits for remote log file to be downloaded pub struct RemotePendingFetch { segment: RemoteLogSegment, download_future: RemoteLogDownloadFuture, pos_in_log_segment: i32, - #[allow(dead_code)] fetch_offset: i64, - #[allow(dead_code)] high_watermark: i64, read_context: ReadContext, } @@ -286,13 +334,26 @@ impl RemotePendingFetch { read_context, } } +} - /// Convert to completed fetch by reading the downloaded file - pub async fn convert_to_completed_fetch( - mut self, - ) -> Result<HashMap<TableBucket, Vec<ScanRecord>>> { - let file_path = self.download_future.get_file_path().await?; - let file_data = tokio::fs::read(&file_path).await?; +impl PendingFetch for RemotePendingFetch { + fn table_bucket(&self) -> &TableBucket { + &self.segment.table_bucket + } + + fn is_completed(&self) -> bool { + self.download_future.is_done() + } + + fn to_completed_fetch(self: Box<Self>) -> Result<Box<dyn CompletedFetch>> { + // Get the file path (this should only be called when is_completed() returns true) + let file_path = self.download_future.get_file_path()?; + + // Read the file data synchronously (we're in a sync context) + // Note: This is a limitation - we need to use blocking I/O here + let file_data = std::fs::read(&file_path).map_err(|e| { + Error::Io(io::Error::other(format!("Failed to read downloaded file: {e:?}"))) + })?; // Slice the data if needed let data = if self.pos_in_log_segment > 0 { @@ -301,17 +362,33 @@ impl RemotePendingFetch { &file_data }; - // delete the downloaded local file to free disk - delete_file(file_path).await; + // Create a mock PbFetchLogRespForBucket for DefaultCompletedFetch + // We'll use the data we read from the file + let fetch_response = PbFetchLogRespForBucket { + bucket_id: self.segment.table_bucket.bucket_id(), + partition_id: None, + error_code: None, + error_message: None, + high_watermark: Some(self.high_watermark), + log_start_offset: None, + remote_log_fetch_info: None, + records: Some(data.to_vec()), + }; - // Parse log records (remote log contains full data, need client-side projection) - let mut fetch_records = vec![]; - for log_record in &mut LogRecordsBatchs::new(data) { - fetch_records.extend(log_record.records_for_remote_log(&self.read_context)?); - } + // Create DefaultCompletedFetch from the data + let completed_fetch = DefaultCompletedFetch::new( + self.segment.table_bucket, + &fetch_response, + self.read_context, + self.fetch_offset, + )?; + + // Delete the downloaded local file to free disk (async, but we'll do it in background) + let file_path_clone = file_path.clone(); + tokio::spawn(async move { + let _ = delete_file(file_path_clone).await; + }); - let mut result = HashMap::new(); - result.insert(self.segment.table_bucket.clone(), fetch_records); - Ok(result) + Ok(Box::new(completed_fetch)) } } diff --git a/crates/fluss/src/client/table/scanner.rs b/crates/fluss/src/client/table/scanner.rs index 1e70649..5053865 100644 --- a/crates/fluss/src/client/table/scanner.rs +++ b/crates/fluss/src/client/table/scanner.rs @@ -21,17 +21,20 @@ use crate::client::metadata::Metadata; use crate::error::{Error, Result}; use crate::metadata::{TableBucket, TableInfo, TablePath}; use crate::proto::{FetchLogRequest, PbFetchLogReqForBucket, PbFetchLogReqForTable}; -use crate::record::{LogRecordsBatchs, ReadContext, ScanRecord, ScanRecords, to_arrow_schema}; -use crate::rpc::RpcClient; +use crate::record::{to_arrow_schema, ReadContext, ScanRecord, ScanRecords}; +use crate::rpc::{message, RpcClient}; use crate::util::FairBucketStatusMap; use arrow_schema::SchemaRef; -use parking_lot::RwLock; -use std::collections::HashMap; +use parking_lot::{Mutex, RwLock}; +use std::collections::{HashMap, HashSet}; use std::slice::from_ref; use std::sync::Arc; use std::time::Duration; use tempfile::TempDir; - +use log::warn; +use crate::client::table::log_fetch_buffer::{ + CompletedFetch, DefaultCompletedFetch, LogFetchBuffer, +}; use crate::client::table::remote_log::{ RemoteLogDownloader, RemoteLogFetchInfo, RemotePendingFetch, }; @@ -171,8 +174,39 @@ impl LogScanner { }) } - pub async fn poll(&self, _timeout: Duration) -> Result<ScanRecords> { - Ok(ScanRecords::new(self.poll_for_fetches().await?)) + pub async fn poll(&self, timeout: Duration) -> Result<ScanRecords> { + let start = std::time::Instant::now(); + let deadline = start + timeout; + + loop { + // Try to collect fetches + let fetch_result = self.poll_for_fetches().await?; + + if !fetch_result.is_empty() { + // We have data, send next round of fetches and return + // This enables pipelining while user processes the data + self.log_fetcher.send_fetches().await?; + return Ok(ScanRecords::new(fetch_result)); + } + + // No data available, check if we should wait + let now = std::time::Instant::now(); + if now >= deadline { + // Timeout reached, return empty result + return Ok(ScanRecords::new(HashMap::new())); + } + + // Wait for buffer to become non-empty with remaining time + let remaining = deadline - now; + let has_data = self.log_fetcher.log_fetch_buffer.await_not_empty(remaining).await; + + if !has_data { + // Timeout while waiting + return Ok(ScanRecords::new(HashMap::new())); + } + + // Buffer became non-empty, try again + } } pub async fn subscribe(&self, bucket: i32, offset: i64) -> Result<()> { @@ -186,11 +220,14 @@ impl LogScanner { } async fn poll_for_fetches(&self) -> Result<HashMap<TableBucket, Vec<ScanRecord>>> { - self.log_fetcher.send_fetches_and_collect().await + // Send fetch requests (non-blocking) + self.log_fetcher.send_fetches().await?; + + // Collect completed fetches from buffer + self.log_fetcher.collect_fetches() } } -#[allow(dead_code)] struct LogFetcher { table_path: TablePath, conns: Arc<RpcClient>, @@ -198,8 +235,10 @@ struct LogFetcher { metadata: Arc<Metadata>, log_scanner_status: Arc<LogScannerStatus>, read_context: ReadContext, - remote_log_downloader: RemoteLogDownloader, + remote_log_downloader: Arc<RemoteLogDownloader>, credentials_cache: CredentialsCache, + log_fetch_buffer: Arc<LogFetchBuffer>, + nodes_with_pending_fetch_requests: Arc<Mutex<HashSet<i32>>>, } impl LogFetcher { @@ -222,8 +261,10 @@ impl LogFetcher { metadata, log_scanner_status, read_context, - remote_log_downloader: RemoteLogDownloader::new(tmp_dir)?, + remote_log_downloader: Arc::new(RemoteLogDownloader::new(tmp_dir)?), credentials_cache: CredentialsCache::new(), + log_fetch_buffer: Arc::new(LogFetchBuffer::new()), + nodes_with_pending_fetch_requests: Arc::new(Mutex::new(HashSet::new())), }) } @@ -237,51 +278,108 @@ impl LogFetcher { } } - async fn send_fetches_and_collect(&self) -> Result<HashMap<TableBucket, Vec<ScanRecord>>> { + /// Send fetch requests asynchronously without waiting for responses + async fn send_fetches(&self) -> Result<()> { let fetch_request = self.prepare_fetch_log_requests().await; - let mut result: HashMap<TableBucket, Vec<ScanRecord>> = HashMap::new(); + for (leader, fetch_request) in fetch_request { - let cluster = self.metadata.get_cluster(); - let server_node = cluster - .get_tablet_server(leader) - .expect("todo: handle leader not exist."); - let con = self.conns.get_connection(server_node).await?; - - let fetch_response = con - .request(crate::rpc::message::FetchLogRequest::new(fetch_request)) - .await?; - - for pb_fetch_log_resp in fetch_response.tables_resp { - let table_id = pb_fetch_log_resp.table_id; - let fetch_log_for_buckets = pb_fetch_log_resp.buckets_resp; - - for fetch_log_for_bucket in fetch_log_for_buckets { - let bucket: i32 = fetch_log_for_bucket.bucket_id; - let table_bucket = TableBucket::new(table_id, bucket); - - // Check if this is a remote log fetch - if let Some(ref remote_log_fetch_info) = - fetch_log_for_bucket.remote_log_fetch_info - { - let remote_fs_props = self - .credentials_cache - .get_or_refresh(&self.conns, &self.metadata) - .await?; - self.remote_log_downloader - .set_remote_fs_props(remote_fs_props); - let remote_fetch_info = RemoteLogFetchInfo::from_proto( - remote_log_fetch_info, - table_bucket.clone(), - )?; + // Check if we already have a pending request for this node + { + self.nodes_with_pending_fetch_requests.lock() + .insert(leader); + } + + let cluster = self.metadata.get_cluster().clone(); + + let conns = Arc::clone(&self.conns); + let log_fetch_buffer = self.log_fetch_buffer.clone(); + let log_scanner_status = self.log_scanner_status.clone(); + let read_context = self.read_context.clone(); + let remote_log_downloader = Arc::clone(&self.remote_log_downloader); + let nodes_with_pending = self.nodes_with_pending_fetch_requests.clone(); + + // Spawn async task to handle the fetch request + tokio::spawn(async move { + let server_node = cluster + .get_tablet_server(leader) + .expect("todo: handle leader not exist.").clone(); + let result = conns.get_connection(&server_node).await; + match result { + Ok(con) => { + let fetch_result = con + .request(message::FetchLogRequest::new(fetch_request)) + .await; + + match fetch_result { + Ok(fetch_response) => { + Self::handle_fetch_response( + fetch_response, + &log_fetch_buffer, + &log_scanner_status, + &read_context, + &remote_log_downloader, + ).await; + } + Err(e) => { + warn!("Failed to fetch log from destination node {:?}: {:?}", + server_node, + e); + } + } + } + Err(e) => { + warn!("Failed to get connection to destination node: {:?}", e); + } + } + + // Remove from pending set + nodes_with_pending.lock().remove(&leader); + }); + } + + Ok(()) + } + + /// Handle fetch response and add completed fetches to buffer + async fn handle_fetch_response( + fetch_response: crate::proto::FetchLogResponse, + log_fetch_buffer: &Arc<LogFetchBuffer>, + log_scanner_status: &Arc<LogScannerStatus>, + read_context: &ReadContext, + remote_log_downloader: &Arc<RemoteLogDownloader>, + ) { + for pb_fetch_log_resp in fetch_response.tables_resp { + let table_id = pb_fetch_log_resp.table_id; + let fetch_log_for_buckets = pb_fetch_log_resp.buckets_resp; + + for fetch_log_for_bucket in fetch_log_for_buckets { + let bucket: i32 = fetch_log_for_bucket.bucket_id; + let table_bucket = TableBucket::new(table_id, bucket); + + // Check if this is a remote log fetch + if let Some(ref remote_log_fetch_info) = + fetch_log_for_bucket.remote_log_fetch_info + { + + let remote_fetch_info = match RemoteLogFetchInfo::from_proto( + remote_log_fetch_info, + table_bucket.clone(), + ) { + Ok(info) => info, + Err(e) => { + eprintln!("Failed to parse remote log fetch info: {:?}", e); + continue; + } + }; if let Some(fetch_offset) = - self.log_scanner_status.get_bucket_offset(&table_bucket) + log_scanner_status.get_bucket_offset(&table_bucket) { let high_watermark = fetch_log_for_bucket.high_watermark.unwrap_or(-1); // Download and process remote log segments let mut pos_in_log_segment = remote_fetch_info.first_start_pos; let mut current_fetch_offset = fetch_offset; - // todo: make segment download in parallel + // todo: make segment download parallelly for (i, segment) in remote_fetch_info.remote_log_segments.iter().enumerate() { @@ -290,46 +388,115 @@ impl LogFetcher { current_fetch_offset = segment.start_offset; } - let download_future = - self.remote_log_downloader.request_remote_log( - &remote_fetch_info.remote_log_tablet_dir, - segment, - )?; - let pending_fetch = RemotePendingFetch::new( - segment.clone(), - download_future, - pos_in_log_segment, - current_fetch_offset, - high_watermark, - self.read_context.clone(), - ); - let remote_records = - pending_fetch.convert_to_completed_fetch().await?; - // Update offset and merge results - for (tb, records) in remote_records { - if let Some(last_record) = records.last() { - self.log_scanner_status - .update_offset(&tb, last_record.offset() + 1); - } - result.entry(tb).or_default().extend(records); - } + let download_future = remote_log_downloader.request_remote_log( + &remote_fetch_info.remote_log_tablet_dir, + segment, + ); + + let table_bucket_clone = table_bucket.clone(); + + // Register callback to be called when download completes + // (similar to Java's downloadFuture.onComplete) + // This must be done before creating RemotePendingFetch to avoid move issues + let log_fetch_buffer_clone = Arc::clone(log_fetch_buffer); + download_future.on_complete(move || { + log_fetch_buffer_clone.try_complete(&table_bucket_clone); + }); + + let pending_fetch = RemotePendingFetch::new( + segment.clone(), + download_future, + pos_in_log_segment, + current_fetch_offset, + high_watermark, + read_context.clone(), + ); + // Add to pending fetches in buffer (similar to Java's logFetchBuffer.pend) + log_fetch_buffer.pend(Box::new(pending_fetch)); + } + } else { + // if the offset is null, it means the bucket has been unsubscribed, + // skip processing and continue to the next bucket. + continue; + } + } else if fetch_log_for_bucket.records.is_some() { + // Handle regular in-memory records - create completed fetch directly + if let Some(fetch_offset) = log_scanner_status.get_bucket_offset(&table_bucket) { + match DefaultCompletedFetch::new( + table_bucket.clone(), + &fetch_log_for_bucket, + read_context.clone(), + fetch_offset, + ) { + Ok(completed_fetch) => { + log_fetch_buffer.add(Box::new(completed_fetch)); + } + Err(e) => { + // todo: handle error + eprintln!("Failed to create completed fetch: {:?}", e); } - } else { - // if the offset is null, it means the bucket has been unsubscribed, - // skip processing and continue to the next bucket. - continue; } - } else if fetch_log_for_bucket.records.is_some() { - // Handle regular in-memory records - let mut fetch_records = vec![]; - let data = fetch_log_for_bucket.records.unwrap(); - for log_record in &mut LogRecordsBatchs::new(&data) { - let last_offset = log_record.last_log_offset(); - fetch_records.extend(log_record.records(&self.read_context)?); - self.log_scanner_status - .update_offset(&table_bucket, last_offset + 1); + } + } + } + } + } + + /// Collect completed fetches from buffer + /// Reference: LogFetchCollector.collectFetch in Java + fn collect_fetches(&self) -> Result<HashMap<TableBucket, Vec<ScanRecord>>> { + const MAX_POLL_RECORDS: usize = 500; // Default max poll records + let mut result: HashMap<TableBucket, Vec<ScanRecord>> = HashMap::new(); + let mut records_remaining = MAX_POLL_RECORDS; + + while records_remaining > 0 { + // Get the next in line fetch, or get a new one from buffer + let next_in_line = self.log_fetch_buffer.next_in_line_fetch(); + + if next_in_line.is_none() || next_in_line.as_ref().unwrap().is_consumed() { + // Get a new fetch from buffer + if let Some(completed_fetch) = self.log_fetch_buffer.poll() { + // Initialize the fetch if not already initialized + if !completed_fetch.is_initialized() { + let size_in_bytes = completed_fetch.size_in_bytes(); + match self.initialize_fetch(completed_fetch) { + Ok(initialized) => { + self.log_fetch_buffer.set_next_in_line_fetch(initialized); + continue; + } + Err(e) => { + // Remove a completedFetch upon a parse with exception if + // (1) it contains no records, and + // (2) there are no fetched records with actual content preceding this + // exception. + if result.is_empty() && size_in_bytes == 0 { + // todo: consider it? + // self.log_fetch_buffer.poll(); + } + return Err(e); + } } - result.insert(table_bucket, fetch_records); + } else { + self.log_fetch_buffer.set_next_in_line_fetch(Some(completed_fetch)); + } + // Note: peek() already removed the fetch from buffer, so no need to call poll() + } else { + // No more fetches available + break; + } + } else { + // Fetch records from next_in_line + if let Some(mut next_fetch) = next_in_line { + let records = self.fetch_records_from_fetch(&mut next_fetch, records_remaining)?; + + if !records.is_empty() { + let table_bucket = next_fetch.table_bucket().clone(); + // Merge with existing records for this bucket + let existing = result.entry(table_bucket).or_default(); + let records_count = records.len(); + existing.extend(records); + + records_remaining = records_remaining.saturating_sub(records_count); } } } @@ -338,6 +505,90 @@ impl LogFetcher { Ok(result) } + /// Initialize a completed fetch, checking offset match and updating high watermark + fn initialize_fetch( + &self, + mut completed_fetch: Box<dyn CompletedFetch>, + ) -> Result<Option<Box<dyn CompletedFetch>>> { + + // todo: handle initialize failure + + let table_bucket = completed_fetch.table_bucket().clone(); + let fetch_offset = completed_fetch.fetch_offset(); + + // Check if bucket is still subscribed + let current_offset = self.log_scanner_status.get_bucket_offset(&table_bucket); + if current_offset.is_none() { + warn!( + "Discarding stale fetch response for bucket {:?} since the bucket has been unsubscribed", + table_bucket + ); + return Ok(None); + } + + let current_offset = current_offset.unwrap(); + + // Check if offset matches + if fetch_offset != current_offset { + warn!( + "Discarding stale fetch response for bucket {:?} since its offset {} does not match the expected offset {}", + table_bucket, fetch_offset, current_offset + ); + return Ok(None); + } + + // Update high watermark + let high_watermark = completed_fetch.high_watermark(); + if high_watermark >= 0 { + self.log_scanner_status.update_high_watermark(&table_bucket, high_watermark); + } + + completed_fetch.set_initialized(); + Ok(Some(completed_fetch)) + } + + /// Fetch records from a completed fetch, checking offset match + fn fetch_records_from_fetch( + &self, + next_in_line_fetch: &mut Box<dyn CompletedFetch>, + max_records: usize, + ) -> Result<Vec<ScanRecord>> { + let table_bucket = next_in_line_fetch.table_bucket().clone(); + let current_offset = self.log_scanner_status.get_bucket_offset(&table_bucket); + + if current_offset.is_none() { + warn!( + "Ignoring fetched records for {:?} since the bucket has been unsubscribed", + table_bucket + ); + next_in_line_fetch.drain(); + return Ok(Vec::new()); + } + + let current_offset = current_offset.unwrap(); + let fetch_offset = next_in_line_fetch.fetch_offset(); + + // Check if this fetch is next in line + if fetch_offset == current_offset { + let records = next_in_line_fetch.fetch_records(max_records)?; + let next_fetch_offset = next_in_line_fetch.next_fetch_offset(); + + if next_fetch_offset > current_offset { + self.log_scanner_status.update_offset(&table_bucket, next_fetch_offset); + } + + Ok(records) + } else { + // These records aren't next in line, ignore them + warn!( + "Ignoring fetched records for {:?} at offset {} since the current offset is {}", + table_bucket, fetch_offset, current_offset + ); + next_in_line_fetch.drain(); + Ok(Vec::new()) + } + } + async fn prepare_fetch_log_requests(&self) -> HashMap<i32, FetchLogRequest> { let mut fetch_log_req_for_buckets = HashMap::new(); let mut table_id = None; @@ -356,19 +607,22 @@ impl LogFetcher { }; if let Some(leader) = self.get_table_bucket_leader(&bucket) { - let fetch_log_req_for_bucket = PbFetchLogReqForBucket { - partition_id: None, - bucket_id: bucket.bucket_id(), - fetch_offset: offset, - // 1M - max_fetch_bytes: 1024 * 1024, - }; + if !self.nodes_with_pending_fetch_requests.lock() + .contains(&leader) { + let fetch_log_req_for_bucket = PbFetchLogReqForBucket { + partition_id: None, + bucket_id: bucket.bucket_id(), + fetch_offset: offset, + // 1M + max_fetch_bytes: 1024 * 1024, + }; - fetch_log_req_for_buckets - .entry(leader) - .or_insert_with(Vec::new) - .push(fetch_log_req_for_bucket); - ready_for_fetch_count += 1; + fetch_log_req_for_buckets + .entry(leader) + .or_insert_with(Vec::new) + .push(fetch_log_req_for_bucket); + ready_for_fetch_count += 1; + } } } @@ -405,8 +659,10 @@ impl LogFetcher { } fn fetchable_buckets(&self) -> Vec<TableBucket> { - // always available now - self.log_scanner_status.fetchable_buckets(|_| true) + // Get buckets that are not already in the buffer + let buffered = self.log_fetch_buffer.buffered_buckets(); + let buffered_set: HashSet<TableBucket> = buffered.into_iter().collect(); + self.log_scanner_status.fetchable_buckets(|tb| !buffered_set.contains(tb)) } fn get_table_bucket_leader(&self, tb: &TableBucket) -> Option<i32> { diff --git a/crates/fluss/tests/integration/table.rs b/crates/fluss/tests/integration/table.rs index a058bfe..a54f469 100644 --- a/crates/fluss/tests/integration/table.rs +++ b/crates/fluss/tests/integration/table.rs @@ -148,7 +148,7 @@ mod table_test { } let scan_records = log_scanner - .poll(std::time::Duration::from_secs(5)) + .poll(std::time::Duration::from_secs(60)) .await .expect("Failed to poll");
