This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new f32411e [chore] Move list offsets to admin (#35)
f32411e is described below
commit f32411e9fef40bb42b3237fffd029dd7def84681
Author: yuxia Luo <[email protected]>
AuthorDate: Mon Oct 20 14:14:28 2025 +0800
[chore] Move list offsets to admin (#35)
---
bindings/python/src/table.rs | 68 +++++++++++---------
bindings/python/src/utils.rs | 4 +-
crates/fluss/src/client/admin.rs | 104 ++++++++++++++++++++++++++++++-
crates/fluss/src/client/metadata.rs | 3 +-
crates/fluss/src/client/table/mod.rs | 2 +
crates/fluss/src/client/table/scanner.rs | 101 ------------------------------
crates/fluss/src/client/write/sender.rs | 2 +-
crates/fluss/src/record/mod.rs | 4 ++
crates/fluss/src/rpc/mod.rs | 2 -
9 files changed, 149 insertions(+), 141 deletions(-)
diff --git a/bindings/python/src/table.rs b/bindings/python/src/table.rs
index 98943b9..c255fa6 100644
--- a/bindings/python/src/table.rs
+++ b/bindings/python/src/table.rs
@@ -17,12 +17,11 @@
use crate::TOKIO_RUNTIME;
use crate::*;
+use fluss::client::EARLIEST_OFFSET;
+use fluss::rpc::message::OffsetSpec;
use pyo3_async_runtimes::tokio::future_into_py;
-use std::collections::HashSet;
use std::sync::Arc;
-const EARLIEST_OFFSET: i64 = -2;
-
/// Represents a Fluss table for data operations
#[pyclass]
pub struct FlussTable {
@@ -70,8 +69,12 @@ impl FlussTable {
let rust_scanner = table_scan.create_log_scanner();
- let py_scanner = LogScanner::from_core(rust_scanner,
table_info.clone());
+ let admin = conn
+ .get_admin()
+ .await
+ .map_err(|e| FlussError::new_err(e.to_string()))?;
+ let py_scanner = LogScanner::from_core(rust_scanner, admin,
table_info.clone());
Python::with_gil(|py| Py::new(py, py_scanner))
})
}
@@ -275,6 +278,7 @@ impl AppendWriter {
#[pyclass]
pub struct LogScanner {
inner: fcore::client::LogScanner,
+ admin: fcore::client::FlussAdmin,
table_info: fcore::metadata::TableInfo,
#[allow(dead_code)]
start_timestamp: Option<i64>,
@@ -327,50 +331,50 @@ impl LogScanner {
let bucket_ids: Vec<i32> = (0..num_buckets).collect();
// todo: after supporting list_offsets with timestamp, we can use
start_timestamp and end_timestamp here
- let target_offsets: HashMap<i32, i64> = TOKIO_RUNTIME
- .block_on(async { self.inner.list_offsets_latest(bucket_ids).await
})
+ let mut stopping_offsets: HashMap<i32, i64> = TOKIO_RUNTIME
+ .block_on(async {
+ self.admin
+ .list_offsets(
+ &self.table_info.table_path,
+ bucket_ids.as_slice(),
+ OffsetSpec::Latest,
+ )
+ .await
+ })
.map_err(|e| FlussError::new_err(e.to_string()))?;
- let mut current_offsets: HashMap<i32, i64> = HashMap::new();
- let mut completed_buckets: HashSet<i32> = HashSet::new();
-
- if !target_offsets.is_empty() {
+ if !stopping_offsets.is_empty() {
loop {
let batch_result = TOKIO_RUNTIME
.block_on(async {
self.inner.poll(Duration::from_millis(500)).await });
match batch_result {
Ok(scan_records) => {
- let mut filtered_records: HashMap<
- fcore::metadata::TableBucket,
- Vec<fcore::record::ScanRecord>,
- > = HashMap::new();
- for (bucket, records) in
scan_records.records_by_buckets() {
- let bucket_id = bucket.bucket_id();
- if completed_buckets.contains(&bucket_id) {
+ let mut result_records: Vec<fcore::record::ScanRecord>
= vec![];
+ for (bucket, records) in
scan_records.into_records_by_buckets() {
+ let stopping_offset =
stopping_offsets.get(&bucket.bucket_id());
+
+ if stopping_offset.is_none() {
+ // not to include this bucket, skip records
for this bucket
+ // since we already reach end offset for this
bucket
continue;
}
if let Some(last_record) = records.last() {
let offset = last_record.offset();
- current_offsets.insert(bucket_id, offset);
- filtered_records.insert(bucket.clone(),
records.clone());
- if offset >= target_offsets[&bucket_id] - 1 {
- completed_buckets.insert(bucket_id);
+ result_records.extend(records);
+ if offset >= stopping_offset.unwrap() - 1 {
+
stopping_offsets.remove(&bucket.bucket_id());
}
}
}
- if !filtered_records.is_empty() {
- let filtered_scan_records =
-
fcore::record::ScanRecords::new(filtered_records);
- let arrow_batch =
-
Utils::convert_scan_records_to_arrow(filtered_scan_records);
+ if !result_records.is_empty() {
+ let arrow_batch =
Utils::convert_scan_records_to_arrow(result_records);
all_batches.extend(arrow_batch);
}
- // completed bucket is equal to all target buckets,
- // we can break scan records
- if completed_buckets.len() == target_offsets.len() {
+ // we have reach end offsets of all bucket
+ if stopping_offsets.is_empty() {
break;
}
}
@@ -399,11 +403,13 @@ impl LogScanner {
impl LogScanner {
/// Create LogScanner from core LogScanner
pub fn from_core(
- inner: fcore::client::LogScanner,
+ inner_scanner: fcore::client::LogScanner,
+ admin: fcore::client::FlussAdmin,
table_info: fcore::metadata::TableInfo,
) -> Self {
Self {
- inner,
+ inner: inner_scanner,
+ admin,
table_info,
start_timestamp: None,
end_timestamp: None,
diff --git a/bindings/python/src/utils.rs b/bindings/python/src/utils.rs
index 9642e9d..93933b3 100644
--- a/bindings/python/src/utils.rs
+++ b/bindings/python/src/utils.rs
@@ -152,9 +152,9 @@ impl Utils {
.map_err(|e| FlussError::new_err(format!("Invalid kv format
'{format_str}': {e}")))
}
- /// Convert ScanRecords to Arrow RecordBatch
+ /// Convert Vec<ScanRecord> to Arrow RecordBatch
pub fn convert_scan_records_to_arrow(
- _scan_records: fcore::record::ScanRecords,
+ _scan_records: Vec<fcore::record::ScanRecord>,
) -> Vec<Arc<arrow::record_batch::RecordBatch>> {
let mut result = Vec::new();
for record in _scan_records {
diff --git a/crates/fluss/src/client/admin.rs b/crates/fluss/src/client/admin.rs
index fd0f316..fefab43 100644
--- a/crates/fluss/src/client/admin.rs
+++ b/crates/fluss/src/client/admin.rs
@@ -25,13 +25,16 @@ use crate::rpc::message::{
DropTableRequest, GetDatabaseInfoRequest, GetLatestLakeSnapshotRequest,
GetTableRequest,
ListDatabasesRequest, ListTablesRequest, TableExistsRequest,
};
+use crate::rpc::message::{ListOffsetsRequest, OffsetSpec};
use crate::rpc::{RpcClient, ServerConnection};
-use std::collections::HashMap;
-use std::sync::Arc;
-
+use crate::BucketId;
use crate::error::Result;
use crate::proto::GetTableInfoResponse;
+use std::collections::HashMap;
+use std::slice::from_ref;
+use std::sync::Arc;
+use tokio::task::JoinHandle;
pub struct FlussAdmin {
admin_gateway: ServerConnection,
@@ -216,4 +219,99 @@ impl FlussAdmin {
table_buckets_offset,
))
}
+
+ /// List offset for the specified buckets. This operation enables to find
the beginning offset,
+ /// end offset as well as the offset matching a timestamp in buckets.
+ pub async fn list_offsets(
+ &self,
+ table_path: &TablePath,
+ buckets_id: &[BucketId],
+ offset_spec: OffsetSpec,
+ ) -> Result<HashMap<i32, i64>> {
+ self.metadata
+ .check_and_update_table_metadata(from_ref(table_path))
+ .await?;
+
+ let cluster = self.metadata.get_cluster();
+ let table_id = cluster.get_table(table_path).table_id;
+
+ // Prepare requests
+ let requests_by_server =
+ self.prepare_list_offsets_requests(table_id, None, buckets_id,
offset_spec)?;
+
+ // Send Requests
+ let response_futures =
self.send_list_offsets_request(requests_by_server).await?;
+
+ let mut results = HashMap::new();
+
+ for response_future in response_futures {
+ let offsets = response_future.await.map_err(
+ // todo: consider use suitable error
+ |e| crate::error::Error::WriteError(format!("Fail to get
result: {e}")),
+ )?;
+ results.extend(offsets?);
+ }
+ Ok(results)
+ }
+
+ fn prepare_list_offsets_requests(
+ &self,
+ table_id: i64,
+ partition_id: Option<i64>,
+ buckets: &[BucketId],
+ offset_spec: OffsetSpec,
+ ) -> Result<HashMap<i32, ListOffsetsRequest>> {
+ let cluster = self.metadata.get_cluster();
+ let mut node_for_bucket_list: HashMap<i32, Vec<i32>> = HashMap::new();
+
+ for bucket_id in buckets {
+ let table_bucket = TableBucket::new(table_id, *bucket_id);
+ let leader = cluster.leader_for(&table_bucket).ok_or_else(|| {
+ // todo: consider use another suitable error
+ crate::error::Error::InvalidTableError(format!(
+ "No leader found for table bucket: table_id={table_id},
bucket_id={bucket_id}"
+ ))
+ })?;
+
+ node_for_bucket_list
+ .entry(leader.id())
+ .or_default()
+ .push(*bucket_id);
+ }
+
+ let mut list_offsets_requests = HashMap::new();
+ for (leader_id, bucket_ids) in node_for_bucket_list {
+ let request =
+ ListOffsetsRequest::new(table_id, partition_id, bucket_ids,
offset_spec.clone());
+ list_offsets_requests.insert(leader_id, request);
+ }
+ Ok(list_offsets_requests)
+ }
+
+ async fn send_list_offsets_request(
+ &self,
+ request_map: HashMap<i32, ListOffsetsRequest>,
+ ) -> Result<Vec<JoinHandle<Result<HashMap<i32, i64>>>>> {
+ let mut tasks = Vec::new();
+
+ for (leader_id, request) in request_map {
+ let rpc_client = self.rpc_client.clone();
+ let metadata = self.metadata.clone();
+
+ let task = tokio::spawn(async move {
+ let cluster = metadata.get_cluster();
+ let tablet_server =
cluster.get_tablet_server(leader_id).ok_or_else(|| {
+ // todo: consider use more suitable error
+ crate::error::Error::InvalidTableError(format!(
+ "Tablet server {leader_id} not found"
+ ))
+ })?;
+ let connection =
rpc_client.get_connection(tablet_server).await?;
+ let list_offsets_response = connection.request(request).await?;
+ list_offsets_response.offsets()
+ });
+ tasks.push(task);
+ }
+ Ok(tasks)
+ }
}
diff --git a/crates/fluss/src/client/metadata.rs
b/crates/fluss/src/client/metadata.rs
index ebfb959..3c3ba4b 100644
--- a/crates/fluss/src/client/metadata.rs
+++ b/crates/fluss/src/client/metadata.rs
@@ -17,7 +17,8 @@
use crate::cluster::{Cluster, ServerNode, ServerType};
use crate::metadata::{TableBucket, TablePath};
-use crate::rpc::{RpcClient, ServerConnection, UpdateMetadataRequest};
+use crate::rpc::message::UpdateMetadataRequest;
+use crate::rpc::{RpcClient, ServerConnection};
use parking_lot::RwLock;
use std::collections::HashSet;
use std::net::SocketAddr;
diff --git a/crates/fluss/src/client/table/mod.rs
b/crates/fluss/src/client/table/mod.rs
index 07e6494..52ae700 100644
--- a/crates/fluss/src/client/table/mod.rs
+++ b/crates/fluss/src/client/table/mod.rs
@@ -22,6 +22,8 @@ use std::sync::Arc;
use crate::error::Result;
+pub const EARLIEST_OFFSET: i64 = -2;
+
mod append;
mod scanner;
diff --git a/crates/fluss/src/client/table/scanner.rs
b/crates/fluss/src/client/table/scanner.rs
index cbe7248..e1ab59f 100644
--- a/crates/fluss/src/client/table/scanner.rs
+++ b/crates/fluss/src/client/table/scanner.rs
@@ -22,14 +22,12 @@ use crate::metadata::{TableBucket, TableInfo, TablePath};
use crate::proto::{FetchLogRequest, PbFetchLogReqForBucket,
PbFetchLogReqForTable};
use crate::record::{LogRecordsBatchs, ReadContext, ScanRecord, ScanRecords,
to_arrow_schema};
use crate::rpc::RpcClient;
-use crate::rpc::message::{ListOffsetsRequest, OffsetSpec};
use crate::util::FairBucketStatusMap;
use parking_lot::RwLock;
use std::collections::HashMap;
use std::slice::from_ref;
use std::sync::Arc;
use std::time::Duration;
-use tokio::task::JoinHandle;
const LOG_FETCH_MAX_BYTES: i32 = 16 * 1024 * 1024;
#[allow(dead_code)]
@@ -67,7 +65,6 @@ pub struct LogScanner {
metadata: Arc<Metadata>,
log_scanner_status: Arc<LogScannerStatus>,
log_fetcher: LogFetcher,
- conns: Arc<RpcClient>,
}
impl LogScanner {
@@ -88,7 +85,6 @@ impl LogScanner {
metadata.clone(),
log_scanner_status.clone(),
),
- conns: connections.clone(),
}
}
@@ -106,103 +102,6 @@ impl LogScanner {
Ok(())
}
- pub async fn list_offsets_latest(&self, buckets: Vec<i32>) ->
Result<HashMap<i32, i64>> {
- // TODO: support partition_id
- let partition_id = None;
- let offset_spec = OffsetSpec::Latest;
-
- self.metadata
- .check_and_update_table_metadata(from_ref(&self.table_path))
- .await?;
-
- let cluster = self.metadata.get_cluster();
- let table_id = cluster.get_table(&self.table_path).table_id;
-
- // Prepare requests
- let requests_by_server = self.prepare_list_offsets_requests(
- table_id,
- partition_id,
- buckets.clone(),
- offset_spec,
- )?;
-
- // Send Requests
- let response_futures =
self.send_list_offsets_request(requests_by_server).await?;
-
- let mut results = HashMap::new();
-
- for response_future in response_futures {
- let offsets = response_future.await.map_err(
- // todo: consider use suitable error
- |e| crate::error::Error::WriteError(format!("Fail to get
result: {e}")),
- )?;
- results.extend(offsets?);
- }
- Ok(results)
- }
-
- fn prepare_list_offsets_requests(
- &self,
- table_id: i64,
- partition_id: Option<i64>,
- buckets: Vec<i32>,
- offset_spec: OffsetSpec,
- ) -> Result<HashMap<i32, ListOffsetsRequest>> {
- let cluster = self.metadata.get_cluster();
- let mut node_for_bucket_list: HashMap<i32, Vec<i32>> = HashMap::new();
-
- for bucket_id in buckets {
- let table_bucket = TableBucket::new(table_id, bucket_id);
- let leader = cluster.leader_for(&table_bucket).ok_or_else(|| {
- // todo: consider use another suitable error
- crate::error::Error::InvalidTableError(format!(
- "No leader found for table bucket: table_id={table_id},
bucket_id={bucket_id}"
- ))
- })?;
-
- node_for_bucket_list
- .entry(leader.id())
- .or_default()
- .push(bucket_id);
- }
-
- let mut list_offsets_requests = HashMap::new();
- for (leader_id, bucket_ids) in node_for_bucket_list {
- let request =
- ListOffsetsRequest::new(table_id, partition_id, bucket_ids,
offset_spec.clone());
- list_offsets_requests.insert(leader_id, request);
- }
- Ok(list_offsets_requests)
- }
-
- async fn send_list_offsets_request(
- &self,
- request_map: HashMap<i32, ListOffsetsRequest>,
- ) -> Result<Vec<JoinHandle<Result<HashMap<i32, i64>>>>> {
- let mut tasks = Vec::new();
-
- for (leader_id, request) in request_map {
- let rpc_client = self.conns.clone();
- let metadata = self.metadata.clone();
-
- let task = tokio::spawn(async move {
- let cluster = metadata.get_cluster();
- let tablet_server =
cluster.get_tablet_server(leader_id).ok_or_else(|| {
- // todo: consider use more suitable error
- crate::error::Error::InvalidTableError(format!(
- "Tablet server {leader_id} not found"
- ))
- })?;
- let connection =
rpc_client.get_connection(tablet_server).await?;
- let list_offsets_response = connection.request(request).await?;
- list_offsets_response.offsets()
- });
- tasks.push(task);
- }
-
- Ok(tasks)
- }
-
async fn poll_for_fetches(&self) -> Result<HashMap<TableBucket,
Vec<ScanRecord>>> {
self.log_fetcher.send_fetches_and_collect().await
}
diff --git a/crates/fluss/src/client/write/sender.rs
b/crates/fluss/src/client/write/sender.rs
index 381e10c..e25e2ba 100644
--- a/crates/fluss/src/client/write/sender.rs
+++ b/crates/fluss/src/client/write/sender.rs
@@ -21,7 +21,7 @@ use crate::error::Error::WriteError;
use crate::error::Result;
use crate::metadata::TableBucket;
use crate::proto::ProduceLogResponse;
-use crate::rpc::ProduceLogRequest;
+use crate::rpc::message::ProduceLogRequest;
use parking_lot::Mutex;
use std::collections::HashMap;
use std::sync::Arc;
diff --git a/crates/fluss/src/record/mod.rs b/crates/fluss/src/record/mod.rs
index 07fbe08..35928ea 100644
--- a/crates/fluss/src/record/mod.rs
+++ b/crates/fluss/src/record/mod.rs
@@ -163,6 +163,10 @@ impl ScanRecords {
pub fn records_by_buckets(&self) -> &HashMap<TableBucket, Vec<ScanRecord>>
{
&self.records
}
+
+ pub fn into_records_by_buckets(self) -> HashMap<TableBucket,
Vec<ScanRecord>> {
+ self.records
+ }
}
impl IntoIterator for ScanRecords {
diff --git a/crates/fluss/src/rpc/mod.rs b/crates/fluss/src/rpc/mod.rs
index 496c015..b8705a3 100644
--- a/crates/fluss/src/rpc/mod.rs
+++ b/crates/fluss/src/rpc/mod.rs
@@ -26,6 +26,4 @@ pub use server_connection::*;
mod convert;
mod transport;
-pub use message::*;
-
pub use convert::*;