This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 076cbd6 chore: introduce prefetch to improve log poll performance
(#103)
076cbd6 is described below
commit 076cbd62b0bf8646e4af330275cc23248ab39505
Author: yuxia Luo <[email protected]>
AuthorDate: Sun Dec 21 15:22:05 2025 +0800
chore: introduce prefetch to improve log poll performance (#103)
---
crates/fluss/Cargo.toml | 1 +
crates/fluss/src/client/credentials.rs | 30 +-
crates/fluss/src/client/table/log_fetch_buffer.rs | 376 +++++++++++++
crates/fluss/src/client/table/mod.rs | 1 +
crates/fluss/src/client/table/remote_log.rs | 195 +++++--
crates/fluss/src/client/table/scanner.rs | 586 ++++++++++++++++-----
crates/fluss/src/record/arrow.rs | 113 ++--
crates/fluss/tests/integration/table.rs | 6 +-
.../fluss/tests/integration/table_remote_scan.rs | 8 +-
9 files changed, 1077 insertions(+), 239 deletions(-)
diff --git a/crates/fluss/Cargo.toml b/crates/fluss/Cargo.toml
index cdba9de..27604ee 100644
--- a/crates/fluss/Cargo.toml
+++ b/crates/fluss/Cargo.toml
@@ -58,6 +58,7 @@ url = "2.5.7"
uuid = { version = "1.10", features = ["v4"] }
tempfile = "3.23.0"
snafu = "0.8.3"
+scopeguard = "1.2.0"
[target.'cfg(target_arch = "wasm32")'.dependencies]
jiff = { workspace = true, features = ["js"] }
diff --git a/crates/fluss/src/client/credentials.rs
b/crates/fluss/src/client/credentials.rs
index 6b07d08..8adfe48 100644
--- a/crates/fluss/src/client/credentials.rs
+++ b/crates/fluss/src/client/credentials.rs
@@ -90,20 +90,20 @@ fn convert_hadoop_key_to_opendal(hadoop_key: &str) ->
Option<(String, bool)> {
pub struct CredentialsCache {
inner: RwLock<Option<CachedToken>>,
+ rpc_client: Arc<RpcClient>,
+ metadata: Arc<Metadata>,
}
impl CredentialsCache {
- pub fn new() -> Self {
+ pub fn new(rpc_client: Arc<RpcClient>, metadata: Arc<Metadata>) -> Self {
Self {
inner: RwLock::new(None),
+ rpc_client,
+ metadata,
}
}
- pub async fn get_or_refresh(
- &self,
- rpc_client: &Arc<RpcClient>,
- metadata: &Arc<Metadata>,
- ) -> Result<HashMap<String, String>> {
+ pub async fn get_or_refresh(&self) -> Result<HashMap<String, String>> {
{
let guard = self.inner.read();
if let Some(cached) = guard.as_ref() {
@@ -113,17 +113,13 @@ impl CredentialsCache {
}
}
- self.refresh_from_server(rpc_client, metadata).await
+ self.refresh_from_server().await
}
- async fn refresh_from_server(
- &self,
- rpc_client: &Arc<RpcClient>,
- metadata: &Arc<Metadata>,
- ) -> Result<HashMap<String, String>> {
- let cluster = metadata.get_cluster();
+ async fn refresh_from_server(&self) -> Result<HashMap<String, String>> {
+ let cluster = self.metadata.get_cluster();
let server_node = cluster.get_one_available_server();
- let conn = rpc_client.get_connection(server_node).await?;
+ let conn = self.rpc_client.get_connection(server_node).await?;
let request = GetSecurityTokenRequest::new();
let response = conn.request(request).await?;
@@ -158,9 +154,3 @@ impl CredentialsCache {
Ok(props)
}
}
-
-impl Default for CredentialsCache {
- fn default() -> Self {
- Self::new()
- }
-}
diff --git a/crates/fluss/src/client/table/log_fetch_buffer.rs
b/crates/fluss/src/client/table/log_fetch_buffer.rs
new file mode 100644
index 0000000..cee104e
--- /dev/null
+++ b/crates/fluss/src/client/table/log_fetch_buffer.rs
@@ -0,0 +1,376 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::error::Result;
+use crate::metadata::TableBucket;
+use crate::record::{
+ LogRecordBatch, LogRecordIterator, LogRecordsBatches, ReadContext,
ScanRecord,
+};
+use parking_lot::Mutex;
+use std::collections::{HashMap, VecDeque};
+use std::sync::Arc;
+use std::sync::atomic::{AtomicBool, Ordering};
+use std::time::Duration;
+use tokio::sync::Notify;
+
+/// Represents a completed fetch that can be consumed
+pub trait CompletedFetch: Send + Sync {
+ fn table_bucket(&self) -> &TableBucket;
+ fn fetch_records(&mut self, max_records: usize) -> Result<Vec<ScanRecord>>;
+ fn is_consumed(&self) -> bool;
+ fn drain(&mut self);
+ fn size_in_bytes(&self) -> usize;
+ fn high_watermark(&self) -> i64;
+ fn is_initialized(&self) -> bool;
+ fn set_initialized(&mut self);
+ fn next_fetch_offset(&self) -> i64;
+}
+
+/// Represents a pending fetch that is waiting to be completed
+pub trait PendingFetch: Send + Sync {
+ fn table_bucket(&self) -> &TableBucket;
+ fn is_completed(&self) -> bool;
+ fn to_completed_fetch(self: Box<Self>) -> Result<Box<dyn CompletedFetch>>;
+}
+
+/// Thread-safe buffer for completed fetches
+pub struct LogFetchBuffer {
+ completed_fetches: Mutex<VecDeque<Box<dyn CompletedFetch>>>,
+ pending_fetches: Mutex<HashMap<TableBucket, VecDeque<Box<dyn
PendingFetch>>>>,
+ next_in_line_fetch: Mutex<Option<Box<dyn CompletedFetch>>>,
+ not_empty_notify: Notify,
+ woken_up: Arc<AtomicBool>,
+}
+
+impl LogFetchBuffer {
+ pub fn new() -> Self {
+ Self {
+ completed_fetches: Mutex::new(VecDeque::new()),
+ pending_fetches: Mutex::new(HashMap::new()),
+ next_in_line_fetch: Mutex::new(None),
+ not_empty_notify: Notify::new(),
+ woken_up: Arc::new(AtomicBool::new(false)),
+ }
+ }
+
+ /// Check if the buffer is empty
+ pub fn is_empty(&self) -> bool {
+ self.completed_fetches.lock().is_empty()
+ }
+
+ /// Wait for the buffer to become non-empty, with timeout
+ /// Returns true if data became available, false if timeout
+ pub async fn await_not_empty(&self, timeout: Duration) -> bool {
+ let deadline = std::time::Instant::now() + timeout;
+
+ loop {
+ // Check if buffer is not empty
+ if !self.is_empty() {
+ return true;
+ }
+
+ // Check if woken up
+ if self.woken_up.swap(false, Ordering::Acquire) {
+ return true;
+ }
+
+ // Check if timeout
+ let now = std::time::Instant::now();
+ if now >= deadline {
+ return false;
+ }
+
+ // Wait for notification with remaining time
+ let remaining = deadline - now;
+ let notified = self.not_empty_notify.notified();
+ tokio::select! {
+ _ = tokio::time::sleep(remaining) => {
+ return false; // Timeout
+ }
+ _ = notified => {
+ // Got notification, check again
+ continue;
+ }
+ }
+ }
+ }
+
+ #[allow(dead_code)]
+ /// Wake up any waiting threads
+ pub fn wakeup(&self) {
+ self.woken_up.store(true, Ordering::Release);
+ self.not_empty_notify.notify_waiters();
+ }
+
+ /// Add a pending fetch to the buffer
+ pub fn pend(&self, pending_fetch: Box<dyn PendingFetch>) {
+ let table_bucket = pending_fetch.table_bucket().clone();
+ self.pending_fetches
+ .lock()
+ .entry(table_bucket)
+ .or_default()
+ .push_back(pending_fetch);
+ }
+
+ /// Try to complete pending fetches in order, converting them to completed
fetches
+ pub fn try_complete(&self, table_bucket: &TableBucket) {
+ // Collect completed fetches while holding the pending_fetches lock,
+ // then push them to completed_fetches after releasing it to avoid
+ // holding both locks simultaneously.
+ let mut completed_to_push: Vec<Box<dyn CompletedFetch>> = Vec::new();
+ let mut has_completed = false;
+ {
+ let mut pending_map = self.pending_fetches.lock();
+ if let Some(pendings) = pending_map.get_mut(table_bucket) {
+ while let Some(front) = pendings.front() {
+ if front.is_completed() {
+ let pending = pendings.pop_front().unwrap();
+ match pending.to_completed_fetch() {
+ Ok(completed) => {
+ completed_to_push.push(completed);
+ has_completed = true;
+ }
+ Err(e) => {
+ // todo: handle exception?
+ log::error!("Error when completing: {e}");
+ }
+ }
+ } else {
+ break;
+ }
+ }
+ if has_completed && pendings.is_empty() {
+ pending_map.remove(table_bucket);
+ }
+ }
+ }
+
+ if !completed_to_push.is_empty() {
+ let mut completed_queue = self.completed_fetches.lock();
+ for completed in completed_to_push {
+ completed_queue.push_back(completed);
+ }
+ }
+
+ if has_completed {
+ // Signal that buffer is not empty
+ self.not_empty_notify.notify_waiters();
+ }
+ }
+
+ /// Add a completed fetch to the buffer
+ pub fn add(&self, completed_fetch: Box<dyn CompletedFetch>) {
+ let table_bucket = completed_fetch.table_bucket();
+ let mut pending_map = self.pending_fetches.lock();
+
+ if let Some(pendings) = pending_map.get_mut(table_bucket)
+ && !pendings.is_empty()
+ {
+
pendings.push_back(Box::new(CompletedPendingFetch::new(completed_fetch)));
+ return;
+ }
+ // If there's no pending fetch for this table_bucket,
+ // directly add to completed_fetches
+ self.completed_fetches.lock().push_back(completed_fetch);
+ self.not_empty_notify.notify_waiters();
+ }
+
+ /// Poll the next completed fetch
+ pub fn poll(&self) -> Option<Box<dyn CompletedFetch>> {
+ self.completed_fetches.lock().pop_front()
+ }
+
+ /// Get the next in line fetch
+ pub fn next_in_line_fetch(&self) -> Option<Box<dyn CompletedFetch>> {
+ self.next_in_line_fetch.lock().take()
+ }
+
+ /// Set the next in line fetch
+ pub fn set_next_in_line_fetch(&self, fetch: Option<Box<dyn
CompletedFetch>>) {
+ *self.next_in_line_fetch.lock() = fetch;
+ }
+
+ /// Get the set of buckets that have buffered data
+ pub fn buffered_buckets(&self) -> Vec<TableBucket> {
+ let mut buckets = Vec::new();
+
+ let next_in_line_fetch = self.next_in_line_fetch.lock();
+ if let Some(complete_fetch) = next_in_line_fetch.as_ref() {
+ if !complete_fetch.is_consumed() {
+ buckets.push(complete_fetch.table_bucket().clone());
+ }
+ }
+
+ let completed = self.completed_fetches.lock();
+ for fetch in completed.iter() {
+ buckets.push(fetch.table_bucket().clone());
+ }
+ let pending = self.pending_fetches.lock();
+ buckets.extend(pending.keys().cloned());
+ buckets
+ }
+}
+
+impl Default for LogFetchBuffer {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+/// A wrapper that makes a completed fetch look like a pending fetch
+struct CompletedPendingFetch {
+ completed_fetch: Box<dyn CompletedFetch>,
+}
+
+impl CompletedPendingFetch {
+ fn new(completed_fetch: Box<dyn CompletedFetch>) -> Self {
+ Self { completed_fetch }
+ }
+}
+
+impl PendingFetch for CompletedPendingFetch {
+ fn table_bucket(&self) -> &TableBucket {
+ self.completed_fetch.table_bucket()
+ }
+
+ fn is_completed(&self) -> bool {
+ true
+ }
+
+ fn to_completed_fetch(self: Box<Self>) -> Result<Box<dyn CompletedFetch>> {
+ Ok(self.completed_fetch)
+ }
+}
+
+/// Default implementation of CompletedFetch for in-memory log records
+pub struct DefaultCompletedFetch {
+ table_bucket: TableBucket,
+ log_record_batch: LogRecordsBatches,
+ read_context: ReadContext,
+ next_fetch_offset: i64,
+ high_watermark: i64,
+ size_in_bytes: usize,
+ consumed: bool,
+ initialized: bool,
+ records_read: usize,
+ current_record_iterator: Option<LogRecordIterator>,
+ current_record_batch: Option<LogRecordBatch>,
+}
+
+impl DefaultCompletedFetch {
+ pub fn new(
+ table_bucket: TableBucket,
+ log_record_batch: LogRecordsBatches,
+ size_in_bytes: usize,
+ read_context: ReadContext,
+ fetch_offset: i64,
+ high_watermark: i64,
+ ) -> Result<Self> {
+ Ok(Self {
+ table_bucket,
+ log_record_batch,
+ read_context,
+ next_fetch_offset: fetch_offset,
+ high_watermark,
+ size_in_bytes,
+ consumed: false,
+ initialized: false,
+ records_read: 0,
+ current_record_iterator: None,
+ current_record_batch: None,
+ })
+ }
+
+ /// Get the next fetched record, handling batch iteration and record
skipping
+ fn next_fetched_record(&mut self) -> Result<Option<ScanRecord>> {
+ loop {
+ if let Some(record) = self
+ .current_record_iterator
+ .as_mut()
+ .and_then(Iterator::next)
+ {
+ if record.offset() >= self.next_fetch_offset {
+ return Ok(Some(record));
+ }
+ } else if let Some(batch) = self.log_record_batch.next() {
+ self.current_record_iterator =
Some(batch.records(&self.read_context)?);
+ self.current_record_batch = Some(batch);
+ } else {
+ if let Some(batch) = self.current_record_batch.take() {
+ self.next_fetch_offset = batch.next_log_offset();
+ }
+ self.drain();
+ return Ok(None);
+ }
+ }
+ }
+}
+
+impl CompletedFetch for DefaultCompletedFetch {
+ fn table_bucket(&self) -> &TableBucket {
+ &self.table_bucket
+ }
+
+ fn fetch_records(&mut self, max_records: usize) -> Result<Vec<ScanRecord>>
{
+ // todo: handle corrupt_last_record
+ if self.consumed {
+ return Ok(Vec::new());
+ }
+
+ let mut scan_records = Vec::new();
+
+ for _ in 0..max_records {
+ if let Some(record) = self.next_fetched_record()? {
+ self.next_fetch_offset = record.offset() + 1;
+ self.records_read += 1;
+ scan_records.push(record);
+ } else {
+ break;
+ }
+ }
+
+ Ok(scan_records)
+ }
+
+ fn is_consumed(&self) -> bool {
+ self.consumed
+ }
+
+ fn drain(&mut self) {
+ self.consumed = true;
+ }
+
+ fn size_in_bytes(&self) -> usize {
+ self.size_in_bytes
+ }
+
+ fn high_watermark(&self) -> i64 {
+ self.high_watermark
+ }
+
+ fn is_initialized(&self) -> bool {
+ self.initialized
+ }
+
+ fn set_initialized(&mut self) {
+ self.initialized = true;
+ }
+
+ fn next_fetch_offset(&self) -> i64 {
+ self.next_fetch_offset
+ }
+}
diff --git a/crates/fluss/src/client/table/mod.rs
b/crates/fluss/src/client/table/mod.rs
index 9972247..e2cf9e6 100644
--- a/crates/fluss/src/client/table/mod.rs
+++ b/crates/fluss/src/client/table/mod.rs
@@ -26,6 +26,7 @@ pub const EARLIEST_OFFSET: i64 = -2;
mod append;
+mod log_fetch_buffer;
mod remote_log;
mod scanner;
mod writer;
diff --git a/crates/fluss/src/client/table/remote_log.rs
b/crates/fluss/src/client/table/remote_log.rs
index 10273dd..d9abd19 100644
--- a/crates/fluss/src/client/table/remote_log.rs
+++ b/crates/fluss/src/client/table/remote_log.rs
@@ -14,16 +14,18 @@
// See the License for the specific language governing permissions and
// limitations under the License.
+use crate::client::table::log_fetch_buffer::{CompletedFetch,
DefaultCompletedFetch, PendingFetch};
use crate::error::{Error, Result};
use crate::io::{FileIO, Storage};
use crate::metadata::TableBucket;
use crate::proto::{PbRemoteLogFetchInfo, PbRemoteLogSegment};
-use crate::record::{LogRecordsBatchs, ReadContext, ScanRecord};
+use crate::record::{LogRecordsBatches, ReadContext};
use crate::util::delete_file;
-use parking_lot::RwLock;
+use parking_lot::{Mutex, RwLock};
use std::collections::HashMap;
use std::io;
use std::path::{Path, PathBuf};
+use std::sync::Arc;
use tempfile::TempDir;
use tokio::io::AsyncWriteExt;
use tokio::sync::oneshot;
@@ -70,45 +72,121 @@ pub struct RemoteLogFetchInfo {
}
impl RemoteLogFetchInfo {
- pub fn from_proto(info: &PbRemoteLogFetchInfo, table_bucket: TableBucket)
-> Result<Self> {
+ pub fn from_proto(info: &PbRemoteLogFetchInfo, table_bucket: TableBucket)
-> Self {
let segments = info
.remote_log_segments
.iter()
.map(|s| RemoteLogSegment::from_proto(s, table_bucket.clone()))
.collect();
- Ok(Self {
+ Self {
remote_log_tablet_dir: info.remote_log_tablet_dir.clone(),
partition_name: info.partition_name.clone(),
remote_log_segments: segments,
first_start_pos: info.first_start_pos.unwrap_or(0),
- })
+ }
}
}
+type CompletionCallback = Box<dyn Fn() + Send + Sync>;
+
/// Future for a remote log download request
pub struct RemoteLogDownloadFuture {
- receiver: Option<oneshot::Receiver<Result<PathBuf>>>,
+ result: Arc<Mutex<Option<Result<Vec<u8>>>>>,
+ completion_callbacks: Arc<Mutex<Vec<CompletionCallback>>>,
+ // todo: add recycleCallback
}
impl RemoteLogDownloadFuture {
- pub fn new(receiver: oneshot::Receiver<Result<PathBuf>>) -> Self {
+ pub fn new(receiver: oneshot::Receiver<Result<Vec<u8>>>) -> Self {
+ let result = Arc::new(Mutex::new(None));
+ let result_clone = Arc::clone(&result);
+ let completion_callbacks: Arc<Mutex<Vec<CompletionCallback>>> =
+ Arc::new(Mutex::new(Vec::new()));
+ let callbacks_clone = Arc::clone(&completion_callbacks);
+
+ // Spawn a task to wait for the download and update result, then call
callbacks
+ tokio::spawn(async move {
+ let download_result = match receiver.await {
+ Ok(Ok(path)) => Ok(path),
+ Ok(Err(e)) => Err(e),
+ Err(e) => Err(Error::UnexpectedError {
+ message: format!("Download & Read future cancelled:
{e:?}"),
+ source: None,
+ }),
+ };
+
+ *result_clone.lock() = Some(download_result);
+
+ // Call all registered callbacks
+ // We need to take the callbacks to avoid holding the lock while
calling them
+ // This also ensures that any callbacks registered after this
point will be called immediately
+ let callbacks: Vec<CompletionCallback> = {
+ let mut callbacks_guard = callbacks_clone.lock();
+ std::mem::take(&mut *callbacks_guard)
+ };
+ for callback in callbacks {
+ callback();
+ }
+
+ // After calling callbacks, any new callbacks registered will see
is_done() == true
+ // and will be called immediately in on_complete()
+ });
+
Self {
- receiver: Some(receiver),
+ result,
+ completion_callbacks,
}
}
- /// Get the downloaded file path
- pub async fn get_file_path(&mut self) -> Result<PathBuf> {
- let receiver = self.receiver.take().ok_or_else(||
Error::UnexpectedError {
- message: "Downloaded file already consumed".to_string(),
- source: None,
- })?;
-
- receiver.await.map_err(|e| Error::UnexpectedError {
- message: format!("Download future cancelled: {e:?}"),
- source: None,
- })?
+ /// Register a callback to be called when download completes (similar to
Java's onComplete)
+ pub fn on_complete<F>(&self, callback: F)
+ where
+ F: Fn() + Send + Sync + 'static,
+ {
+ // Acquire callbacks lock first to ensure atomicity of the
check-and-register operation
+ let mut callbacks_guard = self.completion_callbacks.lock();
+
+ // Check completion status while holding the callbacks lock.
+ // This ensures that:
+ // 1. If the task completes between checking is_done() and registering
the callback,
+ // we'll see the completion state correctly
+ // 2. The background task cannot clear the callbacks list while we're
checking/registering
+ let is_done = self.result.lock().is_some();
+
+ if is_done {
+ // If already completed, call immediately (drop lock first to
avoid deadlock)
+ drop(callbacks_guard);
+ callback();
+ } else {
+ // Register the callback while holding the callbacks lock.
+ // This ensures that even if the background task completes right
after we check
+ // is_done(), it will wait for us to release the lock before
taking callbacks.
+ // When it does take callbacks, it will see our callback in the
list and execute it.
+ callbacks_guard.push(Box::new(callback));
+ // Lock is automatically released here
+ }
+ }
+
+ pub fn is_done(&self) -> bool {
+ self.result.lock().is_some()
+ }
+
+ /// Get the downloaded file path (synchronous, only works after is_done()
returns true)
+ pub fn get_remote_log_bytes(&self) -> Result<Vec<u8>> {
+ // todo: handle download fail
+ let guard = self.result.lock();
+ match guard.as_ref() {
+ Some(Ok(path)) => Ok(path.clone()),
+ Some(Err(e)) => Err(Error::IoUnexpectedError {
+ message: format!("Fail to get remote log bytes: {e}"),
+ source: io::Error::other(format!("{e:?}")),
+ }),
+ None => Err(Error::IoUnexpectedError {
+ message: "Get remote log bytes not completed yet".to_string(),
+ source: io::Error::other("Get remote log bytes not completed
yet"),
+ }),
+ }
}
}
@@ -135,25 +213,38 @@ impl RemoteLogDownloader {
&self,
remote_log_tablet_dir: &str,
segment: &RemoteLogSegment,
- ) -> Result<RemoteLogDownloadFuture> {
+ ) -> RemoteLogDownloadFuture {
let (sender, receiver) = oneshot::channel();
let local_file_name = segment.local_file_name();
let local_file_path = self.local_log_dir.path().join(&local_file_name);
let remote_path = self.build_remote_path(remote_log_tablet_dir,
segment);
let remote_log_tablet_dir = remote_log_tablet_dir.to_string();
let remote_fs_props = self.remote_fs_props.read().clone();
- // Spawn async download task
+ // Spawn async download & read task
tokio::spawn(async move {
- let result = Self::download_file(
- &remote_log_tablet_dir,
- &remote_path,
- &local_file_path,
- &remote_fs_props,
- )
+ let result = async {
+ let file_path = Self::download_file(
+ &remote_log_tablet_dir,
+ &remote_path,
+ &local_file_path,
+ &remote_fs_props,
+ )
+ .await?;
+ let bytes = tokio::fs::read(&file_path).await?;
+
+ // Delete the downloaded local file to free disk (async, but
we'll do it in background)
+ let file_path_clone = file_path.clone();
+ tokio::spawn(async move {
+ let _ = delete_file(file_path_clone).await;
+ });
+
+ Ok(bytes)
+ }
.await;
+
let _ = sender.send(result);
});
- Ok(RemoteLogDownloadFuture::new(receiver))
+ RemoteLogDownloadFuture::new(receiver)
}
/// Build the remote path for a log segment
@@ -261,9 +352,7 @@ pub struct RemotePendingFetch {
segment: RemoteLogSegment,
download_future: RemoteLogDownloadFuture,
pos_in_log_segment: i32,
- #[allow(dead_code)]
fetch_offset: i64,
- #[allow(dead_code)]
high_watermark: i64,
read_context: ReadContext,
}
@@ -286,32 +375,42 @@ impl RemotePendingFetch {
read_context,
}
}
+}
+
+impl PendingFetch for RemotePendingFetch {
+ fn table_bucket(&self) -> &TableBucket {
+ &self.segment.table_bucket
+ }
- /// Convert to completed fetch by reading the downloaded file
- pub async fn convert_to_completed_fetch(
- mut self,
- ) -> Result<HashMap<TableBucket, Vec<ScanRecord>>> {
- let file_path = self.download_future.get_file_path().await?;
- let file_data = tokio::fs::read(&file_path).await?;
+ fn is_completed(&self) -> bool {
+ self.download_future.is_done()
+ }
+
+ fn to_completed_fetch(self: Box<Self>) -> Result<Box<dyn CompletedFetch>> {
+ // Get the file path (this should only be called when is_completed()
returns true)
+ let mut data = self.download_future.get_remote_log_bytes()?;
// Slice the data if needed
let data = if self.pos_in_log_segment > 0 {
- &file_data[self.pos_in_log_segment as usize..]
+ data.split_off(self.pos_in_log_segment as usize)
} else {
- &file_data
+ data
};
- // delete the downloaded local file to free disk
- delete_file(file_path).await;
+ let size_in_bytes = data.len();
- // Parse log records (remote log contains full data, need client-side
projection)
- let mut fetch_records = vec![];
- for log_record in &mut LogRecordsBatchs::new(data) {
-
fetch_records.extend(log_record.records_for_remote_log(&self.read_context)?);
- }
+ let log_record_batch = LogRecordsBatches::new(data);
+
+ // Create DefaultCompletedFetch from the data
+ let completed_fetch = DefaultCompletedFetch::new(
+ self.segment.table_bucket,
+ log_record_batch,
+ size_in_bytes,
+ self.read_context,
+ self.fetch_offset,
+ self.high_watermark,
+ )?;
- let mut result = HashMap::new();
- result.insert(self.segment.table_bucket.clone(), fetch_records);
- Ok(result)
+ Ok(Box::new(completed_fetch))
}
}
diff --git a/crates/fluss/src/client/table/scanner.rs
b/crates/fluss/src/client/table/scanner.rs
index a9384d9..2246e2c 100644
--- a/crates/fluss/src/client/table/scanner.rs
+++ b/crates/fluss/src/client/table/scanner.rs
@@ -18,24 +18,27 @@
use crate::client::connection::FlussConnection;
use crate::client::credentials::CredentialsCache;
use crate::client::metadata::Metadata;
+use crate::client::table::log_fetch_buffer::{
+ CompletedFetch, DefaultCompletedFetch, LogFetchBuffer,
+};
+use crate::client::table::remote_log::{
+ RemoteLogDownloader, RemoteLogFetchInfo, RemotePendingFetch,
+};
use crate::error::{Error, Result};
use crate::metadata::{TableBucket, TableInfo, TablePath};
use crate::proto::{FetchLogRequest, PbFetchLogReqForBucket,
PbFetchLogReqForTable};
-use crate::record::{LogRecordsBatchs, ReadContext, ScanRecord, ScanRecords,
to_arrow_schema};
-use crate::rpc::RpcClient;
+use crate::record::{LogRecordsBatches, ReadContext, ScanRecord, ScanRecords,
to_arrow_schema};
+use crate::rpc::{RpcClient, message};
use crate::util::FairBucketStatusMap;
use arrow_schema::SchemaRef;
-use parking_lot::RwLock;
-use std::collections::HashMap;
+use log::{debug, error, warn};
+use parking_lot::{Mutex, RwLock};
+use std::collections::{HashMap, HashSet};
use std::slice::from_ref;
use std::sync::Arc;
use std::time::Duration;
use tempfile::TempDir;
-use crate::client::table::remote_log::{
- RemoteLogDownloader, RemoteLogFetchInfo, RemotePendingFetch,
-};
-
const LOG_FETCH_MAX_BYTES: i32 = 16 * 1024 * 1024;
#[allow(dead_code)]
const LOG_FETCH_MAX_BYTES_FOR_BUCKET: i32 = 1024;
@@ -171,8 +174,43 @@ impl LogScanner {
})
}
- pub async fn poll(&self, _timeout: Duration) -> Result<ScanRecords> {
- Ok(ScanRecords::new(self.poll_for_fetches().await?))
+ pub async fn poll(&self, timeout: Duration) -> Result<ScanRecords> {
+ let start = std::time::Instant::now();
+ let deadline = start + timeout;
+
+ loop {
+ // Try to collect fetches
+ let fetch_result = self.poll_for_fetches().await?;
+
+ if !fetch_result.is_empty() {
+ // We have data, send next round of fetches and return
+ // This enables pipelining while user processes the data
+ self.log_fetcher.send_fetches().await?;
+ return Ok(ScanRecords::new(fetch_result));
+ }
+
+ // No data available, check if we should wait
+ let now = std::time::Instant::now();
+ if now >= deadline {
+ // Timeout reached, return empty result
+ return Ok(ScanRecords::new(HashMap::new()));
+ }
+
+ // Wait for buffer to become non-empty with remaining time
+ let remaining = deadline - now;
+ let has_data = self
+ .log_fetcher
+ .log_fetch_buffer
+ .await_not_empty(remaining)
+ .await;
+
+ if !has_data {
+ // Timeout while waiting
+ return Ok(ScanRecords::new(HashMap::new()));
+ }
+
+ // Buffer became non-empty, try again
+ }
}
pub async fn subscribe(&self, bucket: i32, offset: i64) -> Result<()> {
@@ -208,20 +246,31 @@ impl LogScanner {
}
async fn poll_for_fetches(&self) -> Result<HashMap<TableBucket,
Vec<ScanRecord>>> {
- self.log_fetcher.send_fetches_and_collect().await
+ let result = self.log_fetcher.collect_fetches()?;
+ if !result.is_empty() {
+ return Ok(result);
+ }
+
+ // send any new fetches (won't resend pending fetches).
+ self.log_fetcher.send_fetches().await?;
+
+ // Collect completed fetches from buffer
+ self.log_fetcher.collect_fetches()
}
}
-#[allow(dead_code)]
struct LogFetcher {
- table_path: TablePath,
conns: Arc<RpcClient>,
- table_info: TableInfo,
metadata: Arc<Metadata>,
log_scanner_status: Arc<LogScannerStatus>,
read_context: ReadContext,
- remote_log_downloader: RemoteLogDownloader,
- credentials_cache: CredentialsCache,
+ remote_read_context: ReadContext,
+ remote_log_downloader: Arc<RemoteLogDownloader>,
+ // todo: consider schedule a background thread to update
+ // token instead of update in fetch phase
+ credentials_cache: Arc<CredentialsCache>,
+ log_fetch_buffer: Arc<LogFetchBuffer>,
+ nodes_with_pending_fetch_requests: Arc<Mutex<HashSet<i32>>>,
}
impl LogFetcher {
@@ -233,126 +282,306 @@ impl LogFetcher {
projected_fields: Option<Vec<usize>>,
) -> Result<Self> {
let full_arrow_schema = to_arrow_schema(table_info.get_row_type());
- let read_context = Self::create_read_context(full_arrow_schema,
projected_fields.clone());
+ let read_context =
+ Self::create_read_context(full_arrow_schema.clone(),
projected_fields.clone(), false);
+ let remote_read_context =
+ Self::create_read_context(full_arrow_schema,
projected_fields.clone(), true);
let tmp_dir = TempDir::with_prefix("fluss-remote-logs")?;
Ok(LogFetcher {
- table_path: table_info.table_path.clone(),
- conns,
- table_info,
- metadata,
+ conns: conns.clone(),
+ metadata: metadata.clone(),
log_scanner_status,
read_context,
- remote_log_downloader: RemoteLogDownloader::new(tmp_dir)?,
- credentials_cache: CredentialsCache::new(),
+ remote_read_context,
+ remote_log_downloader:
Arc::new(RemoteLogDownloader::new(tmp_dir)?),
+ credentials_cache: Arc::new(CredentialsCache::new(conns.clone(),
metadata.clone())),
+ log_fetch_buffer: Arc::new(LogFetchBuffer::new()),
+ nodes_with_pending_fetch_requests:
Arc::new(Mutex::new(HashSet::new())),
})
}
fn create_read_context(
full_arrow_schema: SchemaRef,
projected_fields: Option<Vec<usize>>,
+ is_from_remote: bool,
) -> ReadContext {
match projected_fields {
- None => ReadContext::new(full_arrow_schema),
- Some(fields) =>
ReadContext::with_projection_pushdown(full_arrow_schema, fields),
+ None => ReadContext::new(full_arrow_schema, is_from_remote),
+ Some(fields) => {
+ ReadContext::with_projection_pushdown(full_arrow_schema,
fields, is_from_remote)
+ }
}
}
- async fn send_fetches_and_collect(&self) -> Result<HashMap<TableBucket,
Vec<ScanRecord>>> {
+ /// Send fetch requests asynchronously without waiting for responses
+ async fn send_fetches(&self) -> Result<()> {
+ // todo: check update metadata like fluss-java in case leader changes
let fetch_request = self.prepare_fetch_log_requests().await;
- let mut result: HashMap<TableBucket, Vec<ScanRecord>> = HashMap::new();
+
for (leader, fetch_request) in fetch_request {
- let cluster = self.metadata.get_cluster();
- let server_node = cluster
- .get_tablet_server(leader)
- .expect("todo: handle leader not exist.");
- let con = self.conns.get_connection(server_node).await?;
-
- let fetch_response = con
-
.request(crate::rpc::message::FetchLogRequest::new(fetch_request))
- .await?;
-
- for pb_fetch_log_resp in fetch_response.tables_resp {
- let table_id = pb_fetch_log_resp.table_id;
- let fetch_log_for_buckets = pb_fetch_log_resp.buckets_resp;
-
- for fetch_log_for_bucket in fetch_log_for_buckets {
- let bucket: i32 = fetch_log_for_bucket.bucket_id;
- let table_bucket = TableBucket::new(table_id, bucket);
-
- // Check if this is a remote log fetch
- if let Some(ref remote_log_fetch_info) =
- fetch_log_for_bucket.remote_log_fetch_info
- {
- let remote_fs_props = self
- .credentials_cache
- .get_or_refresh(&self.conns, &self.metadata)
- .await?;
- self.remote_log_downloader
- .set_remote_fs_props(remote_fs_props);
- let remote_fetch_info = RemoteLogFetchInfo::from_proto(
- remote_log_fetch_info,
- table_bucket.clone(),
- )?;
-
- if let Some(fetch_offset) =
-
self.log_scanner_status.get_bucket_offset(&table_bucket)
- {
- let high_watermark =
fetch_log_for_bucket.high_watermark.unwrap_or(-1);
- // Download and process remote log segments
- let mut pos_in_log_segment =
remote_fetch_info.first_start_pos;
- let mut current_fetch_offset = fetch_offset;
- // todo: make segment download in parallel
- for (i, segment) in
-
remote_fetch_info.remote_log_segments.iter().enumerate()
- {
- if i > 0 {
- pos_in_log_segment = 0;
- current_fetch_offset =
segment.start_offset;
- }
+ debug!("Adding pending request for node id {leader}");
+ // Check if we already have a pending request for this node
+ {
+ self.nodes_with_pending_fetch_requests.lock().insert(leader);
+ }
+
+ let cluster = self.metadata.get_cluster().clone();
+
+ let conns = Arc::clone(&self.conns);
+ let log_fetch_buffer = self.log_fetch_buffer.clone();
+ let log_scanner_status = self.log_scanner_status.clone();
+ let read_context = self.read_context.clone();
+ let remote_read_context = self.remote_read_context.clone();
+ let remote_log_downloader =
Arc::clone(&self.remote_log_downloader);
+ let creds_cache = self.credentials_cache.clone();
+ let nodes_with_pending =
self.nodes_with_pending_fetch_requests.clone();
+
+ // Spawn async task to handle the fetch request
+ // Note: These tasks are not explicitly tracked or cancelled when
LogFetcher is dropped.
+ // This is acceptable because:
+ // 1. Tasks will naturally complete (network requests will return
or timeout)
+ // 2. Tasks use Arc references, so resources are properly shared
+ // 3. When the program exits, tokio runtime will clean up all tasks
+ // 4. Tasks are short-lived (network I/O operations)
+ tokio::spawn(async move {
+ // make sure it will always remove leader from pending nodes
+ let _guard = scopeguard::guard((), |_| {
+ nodes_with_pending.lock().remove(&leader);
+ });
+
+ let server_node = cluster
+ .get_tablet_server(leader)
+ .expect("todo: handle leader not exist.");
+
+ let con = match conns.get_connection(server_node).await {
+ Ok(con) => con,
+ Err(e) => {
+ // todo: handle failed to get connection
+ warn!("Failed to get connection to destination node:
{e:?}");
+ return;
+ }
+ };
+
+ let fetch_response = match con
+ .request(message::FetchLogRequest::new(fetch_request))
+ .await
+ {
+ Ok(resp) => resp,
+ Err(e) => {
+ // todo: handle fetch log from destination node
+ warn!("Failed to fetch log from destination node
{server_node:?}: {e:?}");
+ return;
+ }
+ };
+
+ if let Err(e) = Self::handle_fetch_response(
+ fetch_response,
+ &log_fetch_buffer,
+ &log_scanner_status,
+ &read_context,
+ &remote_read_context,
+ &remote_log_downloader,
+ &creds_cache,
+ )
+ .await
+ {
+ // todo: handle fail to handle fetch response
+ error!("Fail to handle fetch response: {e:?}");
+ }
+ });
+ }
+
+ Ok(())
+ }
- let download_future =
-
self.remote_log_downloader.request_remote_log(
-
&remote_fetch_info.remote_log_tablet_dir,
- segment,
- )?;
- let pending_fetch = RemotePendingFetch::new(
- segment.clone(),
- download_future,
- pos_in_log_segment,
- current_fetch_offset,
- high_watermark,
- self.read_context.clone(),
- );
- let remote_records =
-
pending_fetch.convert_to_completed_fetch().await?;
- // Update offset and merge results
- for (tb, records) in remote_records {
- if let Some(last_record) = records.last() {
- self.log_scanner_status
- .update_offset(&tb,
last_record.offset() + 1);
- }
-
result.entry(tb).or_default().extend(records);
+ /// Handle fetch response and add completed fetches to buffer
+ async fn handle_fetch_response(
+ fetch_response: crate::proto::FetchLogResponse,
+ log_fetch_buffer: &Arc<LogFetchBuffer>,
+ log_scanner_status: &Arc<LogScannerStatus>,
+ read_context: &ReadContext,
+ remote_read_context: &ReadContext,
+ remote_log_downloader: &Arc<RemoteLogDownloader>,
+ credentials_cache: &Arc<CredentialsCache>,
+ ) -> Result<()> {
+ for pb_fetch_log_resp in fetch_response.tables_resp {
+ let table_id = pb_fetch_log_resp.table_id;
+ let fetch_log_for_buckets = pb_fetch_log_resp.buckets_resp;
+
+ for fetch_log_for_bucket in fetch_log_for_buckets {
+ let bucket: i32 = fetch_log_for_bucket.bucket_id;
+ let table_bucket = TableBucket::new(table_id, bucket);
+
+ // todo: check fetch result code for per-bucket
+ let Some(fetch_offset) =
log_scanner_status.get_bucket_offset(&table_bucket) else {
+ debug!(
+ "Ignoring fetch log response for bucket {table_bucket}
because the bucket has been unsubscribed."
+ );
+ continue;
+ };
+
+ // Check if this is a remote log fetch
+ if let Some(ref remote_log_fetch_info) =
fetch_log_for_bucket.remote_log_fetch_info
+ {
+ // set remote fs props
+ let remote_fs_props =
credentials_cache.get_or_refresh().await?;
+ remote_log_downloader.set_remote_fs_props(remote_fs_props);
+
+ let remote_fetch_info =
+ RemoteLogFetchInfo::from_proto(remote_log_fetch_info,
table_bucket.clone());
+
+ let high_watermark =
fetch_log_for_bucket.high_watermark.unwrap_or(-1);
+ Self::pending_remote_fetches(
+ remote_log_downloader.clone(),
+ log_fetch_buffer.clone(),
+ remote_read_context.clone(),
+ &table_bucket,
+ remote_fetch_info,
+ fetch_offset,
+ high_watermark,
+ );
+ } else if fetch_log_for_bucket.records.is_some() {
+ // Handle regular in-memory records - create completed
fetch directly
+ let high_watermark =
fetch_log_for_bucket.high_watermark.unwrap_or(-1);
+ let records =
fetch_log_for_bucket.records.unwrap_or(vec![]);
+ let size_in_bytes = records.len();
+ let log_record_batch = LogRecordsBatches::new(records);
+
+ match DefaultCompletedFetch::new(
+ table_bucket.clone(),
+ log_record_batch,
+ size_in_bytes,
+ read_context.clone(),
+ fetch_offset,
+ high_watermark,
+ ) {
+ Ok(completed_fetch) => {
+ log_fetch_buffer.add(Box::new(completed_fetch));
+ }
+ Err(e) => {
+ // todo: handle error
+ log::warn!("Failed to create completed fetch:
{e:?}");
+ }
+ }
+ }
+ }
+ }
+ Ok(())
+ }
+
+ fn pending_remote_fetches(
+ remote_log_downloader: Arc<RemoteLogDownloader>,
+ log_fetch_buffer: Arc<LogFetchBuffer>,
+ read_context: ReadContext,
+ table_bucket: &TableBucket,
+ remote_fetch_info: RemoteLogFetchInfo,
+ fetch_offset: i64,
+ high_watermark: i64,
+ ) {
+ // Download and process remote log segments
+ let mut pos_in_log_segment = remote_fetch_info.first_start_pos;
+ let mut current_fetch_offset = fetch_offset;
+ for (i, segment) in
remote_fetch_info.remote_log_segments.iter().enumerate() {
+ if i > 0 {
+ pos_in_log_segment = 0;
+ current_fetch_offset = segment.start_offset;
+ }
+
+ // todo:
+ // 1: control the max threads to download remote segment
+ // 2: introduce priority queue to priority highest for earliest
segment
+ let download_future = remote_log_downloader
+ .request_remote_log(&remote_fetch_info.remote_log_tablet_dir,
segment);
+
+ // Register callback to be called when download completes
+ // (similar to Java's downloadFuture.onComplete)
+ // This must be done before creating RemotePendingFetch to avoid
move issues
+ let table_bucket = table_bucket.clone();
+ let log_fetch_buffer_clone = log_fetch_buffer.clone();
+ download_future.on_complete(move || {
+ log_fetch_buffer_clone.try_complete(&table_bucket);
+ });
+
+ let pending_fetch = RemotePendingFetch::new(
+ segment.clone(),
+ download_future,
+ pos_in_log_segment,
+ current_fetch_offset,
+ high_watermark,
+ read_context.clone(),
+ );
+ // Add to pending fetches in buffer (similar to Java's
logFetchBuffer.pend)
+ log_fetch_buffer.pend(Box::new(pending_fetch));
+ }
+ }
+
+ /// Collect completed fetches from buffer
+ /// Reference: LogFetchCollector.collectFetch in Java
+ fn collect_fetches(&self) -> Result<HashMap<TableBucket, Vec<ScanRecord>>>
{
+ const MAX_POLL_RECORDS: usize = 500; // Default max poll records
+ let mut result: HashMap<TableBucket, Vec<ScanRecord>> = HashMap::new();
+ let mut records_remaining = MAX_POLL_RECORDS;
+
+ while records_remaining > 0 {
+ // Get the next in line fetch, or get a new one from buffer
+ let next_in_line = self.log_fetch_buffer.next_in_line_fetch();
+
+ if next_in_line.is_none() ||
next_in_line.as_ref().unwrap().is_consumed() {
+ // Get a new fetch from buffer
+ if let Some(completed_fetch) = self.log_fetch_buffer.poll() {
+ // Initialize the fetch if not already initialized
+ if !completed_fetch.is_initialized() {
+ let size_in_bytes = completed_fetch.size_in_bytes();
+ match self.initialize_fetch(completed_fetch) {
+ Ok(initialized) => {
+
self.log_fetch_buffer.set_next_in_line_fetch(initialized);
+ continue;
+ }
+ Err(e) => {
+ // Remove a completedFetch upon a parse with
exception if
+ // (1) it contains no records, and
+ // (2) there are no fetched records with
actual content preceding this
+ // exception.
+ if result.is_empty() && size_in_bytes == 0 {
+ // todo: do we need to consider it like
java ?
+ // self.log_fetch_buffer.poll();
}
+ return Err(e);
}
- } else {
- // if the offset is null, it means the bucket has
been unsubscribed,
- // skip processing and continue to the next bucket.
- continue;
}
- } else if fetch_log_for_bucket.records.is_some() {
- // Handle regular in-memory records
- let mut fetch_records = vec![];
- let data = fetch_log_for_bucket.records.unwrap();
- for log_record in &mut LogRecordsBatchs::new(&data) {
- let last_offset = log_record.last_log_offset();
-
fetch_records.extend(log_record.records(&self.read_context)?);
- self.log_scanner_status
- .update_offset(&table_bucket, last_offset + 1);
- }
- result.insert(table_bucket, fetch_records);
+ } else {
+ self.log_fetch_buffer
+ .set_next_in_line_fetch(Some(completed_fetch));
+ }
+ // Note: poll() already removed the fetch from buffer, so
no need to call poll()
+ } else {
+ // No more fetches available
+ break;
+ }
+ } else {
+ // Fetch records from next_in_line
+ if let Some(mut next_fetch) = next_in_line {
+ let records =
+ self.fetch_records_from_fetch(&mut next_fetch,
records_remaining)?;
+
+ if !records.is_empty() {
+ let table_bucket = next_fetch.table_bucket().clone();
+ // Merge with existing records for this bucket
+ let existing = result.entry(table_bucket).or_default();
+ let records_count = records.len();
+ existing.extend(records);
+
+ records_remaining =
records_remaining.saturating_sub(records_count);
}
+
+ // If the fetch is not fully consumed, put it back for the
next round
+ if !next_fetch.is_consumed() {
+ self.log_fetch_buffer
+ .set_next_in_line_fetch(Some(next_fetch));
+ }
+ // If consumed, next_fetch will be dropped here (which is
correct)
}
}
}
@@ -360,6 +589,83 @@ impl LogFetcher {
Ok(result)
}
+ /// Initialize a completed fetch, checking offset match and updating high
watermark
+ fn initialize_fetch(
+ &self,
+ mut completed_fetch: Box<dyn CompletedFetch>,
+ ) -> Result<Option<Box<dyn CompletedFetch>>> {
+ // todo: handle error in initialize fetch
+ let table_bucket = completed_fetch.table_bucket();
+ let fetch_offset = completed_fetch.next_fetch_offset();
+
+ // Check if bucket is still subscribed
+ let Some(current_offset) =
self.log_scanner_status.get_bucket_offset(table_bucket) else {
+ warn!(
+ "Discarding stale fetch response for bucket {table_bucket:?}
since the bucket has been unsubscribed"
+ );
+ return Ok(None);
+ };
+
+ // Check if offset matches
+ if fetch_offset != current_offset {
+ warn!(
+ "Discarding stale fetch response for bucket {table_bucket:?}
since its offset {fetch_offset} does not match the expected offset
{current_offset}"
+ );
+ return Ok(None);
+ }
+
+ // Update high watermark
+ let high_watermark = completed_fetch.high_watermark();
+ if high_watermark >= 0 {
+ self.log_scanner_status
+ .update_high_watermark(table_bucket, high_watermark);
+ }
+
+ completed_fetch.set_initialized();
+ Ok(Some(completed_fetch))
+ }
+
+ /// Fetch records from a completed fetch, checking offset match
+ fn fetch_records_from_fetch(
+ &self,
+ next_in_line_fetch: &mut Box<dyn CompletedFetch>,
+ max_records: usize,
+ ) -> Result<Vec<ScanRecord>> {
+ let table_bucket = next_in_line_fetch.table_bucket().clone();
+ let current_offset =
self.log_scanner_status.get_bucket_offset(&table_bucket);
+
+ if current_offset.is_none() {
+ warn!(
+ "Ignoring fetched records for {table_bucket:?} since the
bucket has been unsubscribed"
+ );
+ next_in_line_fetch.drain();
+ return Ok(Vec::new());
+ }
+
+ let current_offset = current_offset.unwrap();
+ let fetch_offset = next_in_line_fetch.next_fetch_offset();
+
+ // Check if this fetch is next in line
+ if fetch_offset == current_offset {
+ let records = next_in_line_fetch.fetch_records(max_records)?;
+ let next_fetch_offset = next_in_line_fetch.next_fetch_offset();
+
+ if next_fetch_offset > current_offset {
+ self.log_scanner_status
+ .update_offset(&table_bucket, next_fetch_offset);
+ }
+
+ Ok(records)
+ } else {
+ // These records aren't next in line, ignore them
+ warn!(
+ "Ignoring fetched records for {table_bucket:?} at offset
{fetch_offset} since the current offset is {current_offset}"
+ );
+ next_in_line_fetch.drain();
+ Ok(Vec::new())
+ }
+ }
+
async fn prepare_fetch_log_requests(&self) -> HashMap<i32,
FetchLogRequest> {
let mut fetch_log_req_for_buckets = HashMap::new();
let mut table_id = None;
@@ -372,25 +678,44 @@ impl LogFetcher {
let offset = match
self.log_scanner_status.get_bucket_offset(&bucket) {
Some(offset) => offset,
None => {
- // todo: debug
+ debug!(
+ "Skipping fetch request for bucket {bucket} because
the bucket has been unsubscribed."
+ );
continue;
}
};
- if let Some(leader) = self.get_table_bucket_leader(&bucket) {
- let fetch_log_req_for_bucket = PbFetchLogReqForBucket {
- partition_id: None,
- bucket_id: bucket.bucket_id(),
- fetch_offset: offset,
- // 1M
- max_fetch_bytes: 1024 * 1024,
- };
-
- fetch_log_req_for_buckets
- .entry(leader)
- .or_insert_with(Vec::new)
- .push(fetch_log_req_for_bucket);
- ready_for_fetch_count += 1;
+ match self.get_table_bucket_leader(&bucket) {
+ None => {
+ log::trace!(
+ "Skipping fetch request for bucket {bucket} because
leader is not available."
+ )
+ }
+ Some(leader) => {
+ if self
+ .nodes_with_pending_fetch_requests
+ .lock()
+ .contains(&leader)
+ {
+ log::trace!(
+ "Skipping fetch request for bucket {bucket}
because previous request to server {leader} has not been processed."
+ )
+ } else {
+ let fetch_log_req_for_bucket = PbFetchLogReqForBucket {
+ partition_id: None,
+ bucket_id: bucket.bucket_id(),
+ fetch_offset: offset,
+ // 1M
+ max_fetch_bytes: 1024 * 1024,
+ };
+
+ fetch_log_req_for_buckets
+ .entry(leader)
+ .or_insert_with(Vec::new)
+ .push(fetch_log_req_for_bucket);
+ ready_for_fetch_count += 1;
+ }
+ }
}
}
@@ -427,8 +752,11 @@ impl LogFetcher {
}
fn fetchable_buckets(&self) -> Vec<TableBucket> {
- // always available now
- self.log_scanner_status.fetchable_buckets(|_| true)
+ // Get buckets that are not already in the buffer
+ let buffered = self.log_fetch_buffer.buffered_buckets();
+ let buffered_set: HashSet<TableBucket> =
buffered.into_iter().collect();
+ self.log_scanner_status
+ .fetchable_buckets(|tb| !buffered_set.contains(tb))
}
fn get_table_bucket_leader(&self, tb: &TableBucket) -> Option<i32> {
diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs
index 9295713..0a803ae 100644
--- a/crates/fluss/src/record/arrow.rs
+++ b/crates/fluss/src/record/arrow.rs
@@ -39,6 +39,7 @@ use arrow_schema::SchemaRef;
use arrow_schema::{DataType as ArrowDataType, Field};
use byteorder::WriteBytesExt;
use byteorder::{ByteOrder, LittleEndian};
+use bytes::Bytes;
use crc32c::crc32c;
use parking_lot::Mutex;
use std::{
@@ -347,17 +348,17 @@ pub trait ToArrow {
fn append_to(&self, builder: &mut dyn ArrayBuilder) -> Result<()>;
}
-pub struct LogRecordsBatchs<'a> {
- data: &'a [u8],
+pub struct LogRecordsBatches {
+ data: Bytes,
current_pos: usize,
remaining_bytes: usize,
}
-impl<'a> LogRecordsBatchs<'a> {
- pub fn new(data: &'a [u8]) -> Self {
+impl LogRecordsBatches {
+ pub fn new(data: Vec<u8>) -> Self {
let remaining_bytes: usize = data.len();
Self {
- data,
+ data: Bytes::from(data),
current_pos: 0,
remaining_bytes,
}
@@ -378,14 +379,17 @@ impl<'a> LogRecordsBatchs<'a> {
}
}
-impl<'a> Iterator for &'a mut LogRecordsBatchs<'a> {
- type Item = LogRecordBatch<'a>;
+impl Iterator for LogRecordsBatches {
+ type Item = LogRecordBatch;
fn next(&mut self) -> Option<Self::Item> {
match self.next_batch_size() {
Some(batch_size) => {
- let data_slice = &self.data[self.current_pos..self.current_pos
+ batch_size];
- let record_batch = LogRecordBatch::new(data_slice);
+ let start = self.current_pos;
+ let end = start + batch_size;
+ // Since LogRecordsBatches owns the Vec<u8>, the slice is valid
+ // as long as the mutable reference exists, which is 'a
+ let record_batch =
LogRecordBatch::new(self.data.slice(start..end));
self.current_pos += batch_size;
self.remaining_bytes -= batch_size;
Some(record_batch)
@@ -395,13 +399,13 @@ impl<'a> Iterator for &'a mut LogRecordsBatchs<'a> {
}
}
-pub struct LogRecordBatch<'a> {
- data: &'a [u8],
+pub struct LogRecordBatch {
+ data: Bytes,
}
#[allow(dead_code)]
-impl<'a> LogRecordBatch<'a> {
- pub fn new(data: &'a [u8]) -> Self {
+impl LogRecordBatch {
+ pub fn new(data: Bytes) -> Self {
LogRecordBatch { data }
}
@@ -710,6 +714,7 @@ pub struct ReadContext {
target_schema: SchemaRef,
full_schema: SchemaRef,
projection: Option<Projection>,
+ is_from_remote: bool,
}
#[derive(Clone)]
@@ -723,24 +728,39 @@ struct Projection {
}
impl ReadContext {
- pub fn new(arrow_schema: SchemaRef) -> ReadContext {
+ pub fn new(arrow_schema: SchemaRef, is_from_remote: bool) -> ReadContext {
ReadContext {
target_schema: arrow_schema.clone(),
full_schema: arrow_schema,
projection: None,
+ is_from_remote,
}
}
pub fn with_projection_pushdown(
arrow_schema: SchemaRef,
projected_fields: Vec<usize>,
+ is_from_remote: bool,
) -> ReadContext {
let target_schema = Self::project_schema(arrow_schema.clone(),
projected_fields.as_slice());
- let mut sorted_fields = projected_fields.clone();
- sorted_fields.sort_unstable();
+ // the logic is little bit of hard to understand, to refactor it to
follow
+ // java side
+ let (need_do_reorder, sorted_fields) = {
+ // currently, for remote read, arrow log doesn't support
projection pushdown,
+ // so, only need to do reordering when is not from remote
+ if !is_from_remote {
+ let mut sorted_fields = projected_fields.clone();
+ sorted_fields.sort_unstable();
+ (!sorted_fields.eq(&projected_fields), sorted_fields)
+ } else {
+ // sorted_fields won't be used when need_do_reorder is false,
+ // let's use an empty vec directly
+ (false, vec![])
+ }
+ };
let project = {
- if !sorted_fields.eq(&projected_fields) {
+ if need_do_reorder {
// reordering is required
// Calculate reordering indexes to transform from sorted order
to user-requested order
let mut reordering_indexes =
Vec::with_capacity(projected_fields.len());
@@ -778,6 +798,7 @@ impl ReadContext {
target_schema,
full_schema: arrow_schema,
projection: Some(project),
+ is_from_remote,
}
}
@@ -805,17 +826,24 @@ impl ReadContext {
pub fn record_batch(&self, data: &[u8]) -> Result<RecordBatch> {
let (batch_metadata, body_buffer, version) = parse_ipc_message(data)?;
- // the record batch from server must be ordered by field pos,
- // according to project to decide what arrow schema to use
- // to parse the record batch
- let resolve_schema = match self.projection {
- Some(ref projection) => {
- // projection, should use ordered schema by project field pos
- projection.ordered_schema.clone()
- }
- None => {
- // no projection, use target output schema
- self.target_schema.clone()
+ let resolve_schema = {
+ // if from remote, no projection, need to use full schema
+ if self.is_from_remote {
+ self.full_schema.clone()
+ } else {
+ // the record batch from server must be ordered by field pos,
+ // according to project to decide what arrow schema to use
+ // to parse the record batch
+ match self.projection {
+ Some(ref projection) => {
+ // projection, should use ordered schema by project
field pos
+ projection.ordered_schema.clone()
+ }
+ None => {
+ // no projection, use target output schema
+ self.target_schema.clone()
+ }
+ }
}
};
@@ -829,14 +857,27 @@ impl ReadContext {
)?;
let record_batch = match &self.projection {
- Some(projection) if projection.reordering_needed => {
- // Reorder columns if needed (when projection pushdown with
non-sorted order)
- let reordered_columns: Vec<_> = projection
- .reordering_indexes
- .iter()
- .map(|&idx| record_batch.column(idx).clone())
- .collect();
- RecordBatch::try_new(self.target_schema.clone(),
reordered_columns)?
+ Some(projection) => {
+ let reordered_columns = {
+ // need to do reorder
+ if self.is_from_remote {
+ Some(&projection.projected_fields)
+ } else if projection.reordering_needed {
+ Some(&projection.reordering_indexes)
+ } else {
+ None
+ }
+ };
+ match reordered_columns {
+ Some(reordered_columns) => {
+ let arrow_columns = reordered_columns
+ .iter()
+ .map(|&idx| record_batch.column(idx).clone())
+ .collect();
+ RecordBatch::try_new(self.target_schema.clone(),
arrow_columns)?
+ }
+ _ => record_batch,
+ }
}
_ => record_batch,
};
diff --git a/crates/fluss/tests/integration/table.rs
b/crates/fluss/tests/integration/table.rs
index a058bfe..9eec98e 100644
--- a/crates/fluss/tests/integration/table.rs
+++ b/crates/fluss/tests/integration/table.rs
@@ -148,7 +148,7 @@ mod table_test {
}
let scan_records = log_scanner
- .poll(std::time::Duration::from_secs(5))
+ .poll(std::time::Duration::from_secs(60))
.await
.expect("Failed to poll");
@@ -178,7 +178,7 @@ mod table_test {
}
let scan_records_projected = log_scanner_projected
- .poll(std::time::Duration::from_secs(5))
+ .poll(std::time::Duration::from_secs(10))
.await
.expect("Failed to poll");
@@ -227,7 +227,7 @@ mod table_test {
// Poll for records
let scan_records = log_scanner
- .poll(tokio::time::Duration::from_secs(5))
+ .poll(tokio::time::Duration::from_secs(10))
.await
.expect("Failed to poll records");
diff --git a/crates/fluss/tests/integration/table_remote_scan.rs
b/crates/fluss/tests/integration/table_remote_scan.rs
index ca61ff8..bdbced9 100644
--- a/crates/fluss/tests/integration/table_remote_scan.rs
+++ b/crates/fluss/tests/integration/table_remote_scan.rs
@@ -175,6 +175,8 @@ mod table_remote_scan_test {
let num_buckets = table.table_info().get_num_buckets();
let log_scanner = table
.new_scan()
+ .project(&[1, 0])
+ .unwrap()
.create_log_scanner()
.expect("Failed to create log scanner");
for bucket_id in 0..num_buckets {
@@ -186,7 +188,7 @@ mod table_remote_scan_test {
let mut records = Vec::with_capacity(record_count);
let start = std::time::Instant::now();
- const MAX_WAIT_DURATION: Duration = Duration::from_secs(30);
+ const MAX_WAIT_DURATION: Duration = Duration::from_secs(60);
while records.len() < record_count {
if start.elapsed() > MAX_WAIT_DURATION {
panic!(
@@ -208,8 +210,8 @@ mod table_remote_scan_test {
let row = record.row();
let expected_c1 = i as i32;
let expected_c2 = format!("v{}", i);
- assert_eq!(row.get_int(0), expected_c1, "c1 mismatch at index {}",
i);
- assert_eq!(row.get_string(1), expected_c2, "c2 mismatch at index
{}", i);
+ assert_eq!(row.get_int(1), expected_c1, "c1 mismatch at index {}",
i);
+ assert_eq!(row.get_string(0), expected_c2, "c2 mismatch at index
{}", i);
}
}