This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 0c8a37f feat: support ListOffset/SubscribeBatch/DropTable for cpp
bindings (#100)
0c8a37f is described below
commit 0c8a37f417858d458230bdcbb03ef3a4eaec7115
Author: AlexZhao <[email protected]>
AuthorDate: Sun Dec 21 15:19:56 2025 +0800
feat: support ListOffset/SubscribeBatch/DropTable for cpp bindings (#100)
---------
Co-authored-by: 赵海源 <[email protected]>
---
bindings/cpp/examples/example.cpp | 99 +++++++++++++++++++++++--
bindings/cpp/include/fluss.hpp | 32 ++++++++
bindings/cpp/src/admin.cpp | 42 +++++++++++
bindings/cpp/src/ffi_converter.hpp | 1 +
bindings/cpp/src/lib.rs | 122 +++++++++++++++++++++++++++++++
bindings/cpp/src/table.cpp | 17 +++++
bindings/cpp/src/types.rs | 4 +-
crates/fluss/src/client/admin.rs | 7 ++
crates/fluss/src/client/table/scanner.rs | 22 ++++++
9 files changed, 340 insertions(+), 6 deletions(-)
diff --git a/bindings/cpp/examples/example.cpp
b/bindings/cpp/examples/example.cpp
index 5146f28..04f9ac6 100644
--- a/bindings/cpp/examples/example.cpp
+++ b/bindings/cpp/examples/example.cpp
@@ -19,6 +19,8 @@
#include <iostream>
#include <vector>
+#include <unordered_map>
+#include <chrono>
static void check(const char* step, const fluss::Result& r) {
if (!r.Ok()) {
@@ -37,6 +39,17 @@ int main() {
fluss::Admin admin;
check("get_admin", conn.GetAdmin(admin));
+ fluss::TablePath table_path("fluss", "sample_table_cpp_v1");
+
+ // 2.1) Drop table if exists
+ std::cout << "Dropping table if exists..." << std::endl;
+ auto drop_result = admin.DropTable(table_path, true);
+ if (drop_result.Ok()) {
+ std::cout << "Table dropped successfully" << std::endl;
+ } else {
+ std::cout << "Table drop result: " << drop_result.error_message <<
std::endl;
+ }
+
// 3) Schema & descriptor
auto schema = fluss::Schema::NewBuilder()
.AddColumn("id", fluss::DataType::Int)
@@ -47,14 +60,14 @@ int main() {
auto descriptor = fluss::TableDescriptor::NewBuilder()
.SetSchema(schema)
- .SetBucketCount(1)
+ .SetBucketCount(3)
.SetProperty("table.log.arrow.compression.type",
"NONE")
- .SetComment("cpp example table")
+ .SetComment("cpp example table with 3 buckets")
.Build();
- fluss::TablePath table_path("fluss", "sample_table_cpp_v1");
- // ignore_if_exists=true to allow re-run
- check("create_table", admin.CreateTable(table_path, descriptor, true));
+ // 3.1) Create table with 3 buckets
+ std::cout << "Creating table with 3 buckets..." << std::endl;
+ check("create_table", admin.CreateTable(table_path, descriptor, false));
// 4) Get table
fluss::Table table;
@@ -162,5 +175,81 @@ int main() {
std::exit(1);
}
+ // 8) List offsets examples
+ std::cout << "\n=== List Offsets Examples ===" << std::endl;
+
+ // 8.1) Query earliest offsets for all buckets
+ std::vector<int32_t> all_bucket_ids;
+ for (int b = 0; b < buckets; ++b) {
+ all_bucket_ids.push_back(b);
+ }
+
+ std::unordered_map<int32_t, int64_t> earliest_offsets;
+ check("list_earliest_offsets",
+ admin.ListOffsets(table_path, all_bucket_ids,
+ fluss::OffsetQuery::Earliest(),
+ earliest_offsets));
+ std::cout << "Earliest offsets:" << std::endl;
+ for (const auto& [bucket_id, offset] : earliest_offsets) {
+ std::cout << " Bucket " << bucket_id << ": offset=" << offset <<
std::endl;
+ }
+
+ // 8.2) Query latest offsets for all buckets
+ std::unordered_map<int32_t, int64_t> latest_offsets;
+ check("list_latest_offsets",
+ admin.ListOffsets(table_path, all_bucket_ids,
+ fluss::OffsetQuery::Latest(),
+ latest_offsets));
+ std::cout << "Latest offsets:" << std::endl;
+ for (const auto& [bucket_id, offset] : latest_offsets) {
+ std::cout << " Bucket " << bucket_id << ": offset=" << offset <<
std::endl;
+ }
+
+ // 8.3) Query offsets for a specific timestamp (current time - 1 hour)
+ auto now = std::chrono::system_clock::now();
+ auto one_hour_ago = now - std::chrono::hours(1);
+ auto timestamp_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
+ one_hour_ago.time_since_epoch()).count();
+
+ std::unordered_map<int32_t, int64_t> timestamp_offsets;
+ check("list_timestamp_offsets",
+ admin.ListOffsets(table_path, all_bucket_ids,
+ fluss::OffsetQuery::FromTimestamp(timestamp_ms),
+ timestamp_offsets));
+ std::cout << "Offsets for timestamp " << timestamp_ms << " (1 hour ago):"
<< std::endl;
+ for (const auto& [bucket_id, offset] : timestamp_offsets) {
+ std::cout << " Bucket " << bucket_id << ": offset=" << offset <<
std::endl;
+ }
+
+ // 8.4) Use batch subscribe with offsets from list_offsets
+ std::cout << "\n=== Batch Subscribe Example ===" << std::endl;
+ fluss::LogScanner batch_scanner;
+ check("new_log_scanner_for_batch", table.NewLogScanner(batch_scanner));
+
+ std::vector<fluss::BucketSubscription> subscriptions;
+ for (const auto& [bucket_id, offset] : earliest_offsets) {
+ subscriptions.push_back({bucket_id, offset});
+ std::cout << "Preparing subscription: bucket=" << bucket_id
+ << ", offset=" << offset << std::endl;
+ }
+
+ check("subscribe_batch", batch_scanner.Subscribe(subscriptions));
+ std::cout << "Batch subscribed to " << subscriptions.size() << " buckets"
<< std::endl;
+
+ // 8.5) Poll and verify bucket_id in records
+ fluss::ScanRecords batch_records;
+ check("poll_batch", batch_scanner.Poll(5000, batch_records));
+
+ std::cout << "Scanned " << batch_records.Size() << " records from batch
subscription" << std::endl;
+ for (size_t i = 0; i < batch_records.Size() && i < 5; ++i) {
+ const auto& rec = batch_records[i];
+ std::cout << " Record " << i << ": bucket_id=" << rec.bucket_id
+ << ", offset=" << rec.offset
+ << ", timestamp=" << rec.timestamp << std::endl;
+ }
+ if (batch_records.Size() > 5) {
+ std::cout << " ... and " << (batch_records.Size() - 5) << " more
records" << std::endl;
+ }
+
return 0;
}
diff --git a/bindings/cpp/include/fluss.hpp b/bindings/cpp/include/fluss.hpp
index 002f806..479adf9 100644
--- a/bindings/cpp/include/fluss.hpp
+++ b/bindings/cpp/include/fluss.hpp
@@ -63,6 +63,24 @@ enum class DatumType {
Bytes = 7,
};
+constexpr int64_t EARLIEST_OFFSET = -2;
+constexpr int64_t LATEST_OFFSET = -1;
+
+enum class OffsetSpec {
+ Earliest = 0,
+ Latest = 1,
+ Timestamp = 2,
+};
+
+struct OffsetQuery {
+ OffsetSpec spec;
+ int64_t timestamp{0};
+
+ static OffsetQuery Earliest() { return {OffsetSpec::Earliest, 0}; }
+ static OffsetQuery Latest() { return {OffsetSpec::Latest, 0}; }
+ static OffsetQuery FromTimestamp(int64_t ts) { return
{OffsetSpec::Timestamp, ts}; }
+};
+
struct Result {
int32_t error_code{0};
std::string error_message;
@@ -301,6 +319,7 @@ private:
};
struct ScanRecord {
+ int32_t bucket_id;
int64_t offset;
int64_t timestamp;
GenericRow row;
@@ -324,6 +343,11 @@ struct BucketOffset {
int64_t offset;
};
+struct BucketSubscription {
+ int32_t bucket_id;
+ int64_t offset;
+};
+
struct LakeSnapshot {
int64_t snapshot_id;
std::vector<BucketOffset> bucket_offsets;
@@ -372,10 +396,17 @@ public:
const TableDescriptor& descriptor,
bool ignore_if_exists = false);
+ Result DropTable(const TablePath& table_path, bool ignore_if_not_exists =
false);
+
Result GetTable(const TablePath& table_path, TableInfo& out);
Result GetLatestLakeSnapshot(const TablePath& table_path, LakeSnapshot&
out);
+ Result ListOffsets(const TablePath& table_path,
+ const std::vector<int32_t>& bucket_ids,
+ const OffsetQuery& offset_query,
+ std::unordered_map<int32_t, int64_t>& out);
+
private:
friend class Connection;
Admin(ffi::Admin* admin) noexcept;
@@ -448,6 +479,7 @@ public:
bool Available() const;
Result Subscribe(int32_t bucket_id, int64_t start_offset);
+ Result Subscribe(const std::vector<BucketSubscription>& bucket_offsets);
Result Poll(int64_t timeout_ms, ScanRecords& out);
private:
diff --git a/bindings/cpp/src/admin.cpp b/bindings/cpp/src/admin.cpp
index f6997a6..bf9c712 100644
--- a/bindings/cpp/src/admin.cpp
+++ b/bindings/cpp/src/admin.cpp
@@ -66,6 +66,16 @@ Result Admin::CreateTable(const TablePath& table_path,
return utils::from_ffi_result(ffi_result);
}
+Result Admin::DropTable(const TablePath& table_path, bool
ignore_if_not_exists) {
+ if (!Available()) {
+ return utils::make_error(1, "Admin not available");
+ }
+
+ auto ffi_path = utils::to_ffi_table_path(table_path);
+ auto ffi_result = admin_->drop_table(ffi_path, ignore_if_not_exists);
+ return utils::from_ffi_result(ffi_result);
+}
+
Result Admin::GetTable(const TablePath& table_path, TableInfo& out) {
if (!Available()) {
return utils::make_error(1, "Admin not available");
@@ -98,4 +108,36 @@ Result Admin::GetLatestLakeSnapshot(const TablePath&
table_path, LakeSnapshot& o
return result;
}
+Result Admin::ListOffsets(const TablePath& table_path,
+ const std::vector<int32_t>& bucket_ids,
+ const OffsetQuery& offset_query,
+ std::unordered_map<int32_t, int64_t>& out) {
+ if (!Available()) {
+ return utils::make_error(1, "Admin not available");
+ }
+
+ auto ffi_path = utils::to_ffi_table_path(table_path);
+
+ rust::Vec<int32_t> rust_bucket_ids;
+ for (int32_t id : bucket_ids) {
+ rust_bucket_ids.push_back(id);
+ }
+
+ ffi::FfiOffsetQuery ffi_query;
+ ffi_query.offset_type = static_cast<int32_t>(offset_query.spec);
+ ffi_query.timestamp = offset_query.timestamp;
+
+ auto ffi_result = admin_->list_offsets(ffi_path,
std::move(rust_bucket_ids), ffi_query);
+
+ auto result = utils::from_ffi_result(ffi_result.result);
+ if (result.Ok()) {
+ out.clear();
+ for (const auto& pair : ffi_result.bucket_offsets) {
+ out[pair.bucket_id] = pair.offset;
+ }
+ }
+
+ return result;
+}
+
} // namespace fluss
diff --git a/bindings/cpp/src/ffi_converter.hpp
b/bindings/cpp/src/ffi_converter.hpp
index 52dd7fe..63a2e91 100644
--- a/bindings/cpp/src/ffi_converter.hpp
+++ b/bindings/cpp/src/ffi_converter.hpp
@@ -222,6 +222,7 @@ inline GenericRow from_ffi_generic_row(const
ffi::FfiGenericRow& ffi_row) {
inline ScanRecord from_ffi_scan_record(const ffi::FfiScanRecord& ffi_record) {
return ScanRecord{
+ ffi_record.bucket_id,
ffi_record.offset,
ffi_record.timestamp,
from_ffi_generic_row(ffi_record.row)};
diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs
index 54d6941..cd1803b 100644
--- a/bindings/cpp/src/lib.rs
+++ b/bindings/cpp/src/lib.rs
@@ -104,6 +104,7 @@ mod ffi {
}
struct FfiScanRecord {
+ bucket_id: i32,
offset: i64,
timestamp: i64,
row: FfiGenericRow,
@@ -130,6 +131,26 @@ mod ffi {
offset: i64,
}
+ struct FfiOffsetQuery {
+ offset_type: i32,
+ timestamp: i64,
+ }
+
+ struct FfiBucketSubscription {
+ bucket_id: i32,
+ offset: i64,
+ }
+
+ struct FfiBucketOffsetPair {
+ bucket_id: i32,
+ offset: i64,
+ }
+
+ struct FfiListOffsetsResult {
+ result: FfiResult,
+ bucket_offsets: Vec<FfiBucketOffsetPair>,
+ }
+
struct FfiLakeSnapshotResult {
result: FfiResult,
lake_snapshot: FfiLakeSnapshot,
@@ -156,11 +177,22 @@ mod ffi {
descriptor: &FfiTableDescriptor,
ignore_if_exists: bool,
) -> FfiResult;
+ fn drop_table(
+ self: &Admin,
+ table_path: &FfiTablePath,
+ ignore_if_not_exists: bool,
+ ) -> FfiResult;
fn get_table_info(self: &Admin, table_path: &FfiTablePath) ->
FfiTableInfoResult;
fn get_latest_lake_snapshot(
self: &Admin,
table_path: &FfiTablePath,
) -> FfiLakeSnapshotResult;
+ fn list_offsets(
+ self: &Admin,
+ table_path: &FfiTablePath,
+ bucket_ids: Vec<i32>,
+ offset_query: &FfiOffsetQuery,
+ ) -> FfiListOffsetsResult;
// Table
unsafe fn delete_table(table: *mut Table);
@@ -182,6 +214,10 @@ mod ffi {
// LogScanner
unsafe fn delete_log_scanner(scanner: *mut LogScanner);
fn subscribe(self: &LogScanner, bucket_id: i32, start_offset: i64) ->
FfiResult;
+ fn subscribe_batch(
+ self: &LogScanner,
+ subscriptions: Vec<FfiBucketSubscription>,
+ ) -> FfiResult;
fn poll(self: &LogScanner, timeout_ms: i64) -> FfiScanRecordsResult;
}
}
@@ -330,6 +366,25 @@ impl Admin {
}
}
+ fn drop_table(
+ &self,
+ table_path: &ffi::FfiTablePath,
+ ignore_if_not_exists: bool,
+ ) -> ffi::FfiResult {
+ let path = fcore::metadata::TablePath::new(
+ table_path.database_name.clone(),
+ table_path.table_name.clone(),
+ );
+
+ let result =
+ RUNTIME.block_on(async { self.inner.drop_table(&path,
ignore_if_not_exists).await });
+
+ match result {
+ Ok(_) => ok_result(),
+ Err(e) => err_result(1, e.to_string()),
+ }
+ }
+
fn get_table_info(&self, table_path: &ffi::FfiTablePath) ->
ffi::FfiTableInfoResult {
let path = fcore::metadata::TablePath::new(
table_path.database_name.clone(),
@@ -375,6 +430,58 @@ impl Admin {
},
}
}
+
+ fn list_offsets(
+ &self,
+ table_path: &ffi::FfiTablePath,
+ bucket_ids: Vec<i32>,
+ offset_query: &ffi::FfiOffsetQuery,
+ ) -> ffi::FfiListOffsetsResult {
+ use fcore::rpc::message::OffsetSpec;
+
+ let path = fcore::metadata::TablePath::new(
+ table_path.database_name.clone(),
+ table_path.table_name.clone(),
+ );
+
+ let offset_spec = match offset_query.offset_type {
+ 0 => OffsetSpec::Earliest,
+ 1 => OffsetSpec::Latest,
+ 2 => OffsetSpec::Timestamp(offset_query.timestamp),
+ _ => {
+ return ffi::FfiListOffsetsResult {
+ result: err_result(
+ 1,
+ format!("Invalid offset_type: {}",
offset_query.offset_type),
+ ),
+ bucket_offsets: vec![],
+ };
+ }
+ };
+
+ let result = RUNTIME.block_on(async {
+ self.inner
+ .list_offsets(&path, &bucket_ids, offset_spec)
+ .await
+ });
+
+ match result {
+ Ok(offsets) => {
+ let bucket_offsets: Vec<ffi::FfiBucketOffsetPair> = offsets
+ .into_iter()
+ .map(|(bucket_id, offset)| ffi::FfiBucketOffsetPair {
bucket_id, offset })
+ .collect();
+ ffi::FfiListOffsetsResult {
+ result: ok_result(),
+ bucket_offsets,
+ }
+ }
+ Err(e) => ffi::FfiListOffsetsResult {
+ result: err_result(1, e.to_string()),
+ bucket_offsets: vec![],
+ },
+ }
+ }
}
// Table implementation
@@ -511,6 +618,21 @@ impl LogScanner {
}
}
+ fn subscribe_batch(&self, subscriptions: Vec<ffi::FfiBucketSubscription>)
-> ffi::FfiResult {
+ use std::collections::HashMap;
+ let mut bucket_offsets = HashMap::new();
+ for sub in subscriptions {
+ bucket_offsets.insert(sub.bucket_id, sub.offset);
+ }
+
+ let result = RUNTIME.block_on(async {
self.inner.subscribe_batch(bucket_offsets).await });
+
+ match result {
+ Ok(_) => ok_result(),
+ Err(e) => err_result(1, e.to_string()),
+ }
+ }
+
fn poll(&self, timeout_ms: i64) -> ffi::FfiScanRecordsResult {
let timeout = Duration::from_millis(timeout_ms as u64);
let result = RUNTIME.block_on(async { self.inner.poll(timeout).await
});
diff --git a/bindings/cpp/src/table.cpp b/bindings/cpp/src/table.cpp
index b28b783..d42e1a2 100644
--- a/bindings/cpp/src/table.cpp
+++ b/bindings/cpp/src/table.cpp
@@ -210,6 +210,23 @@ Result LogScanner::Subscribe(int32_t bucket_id, int64_t
start_offset) {
return utils::from_ffi_result(ffi_result);
}
+Result LogScanner::Subscribe(const std::vector<BucketSubscription>&
bucket_offsets) {
+ if (!Available()) {
+ return utils::make_error(1, "LogScanner not available");
+ }
+
+ rust::Vec<ffi::FfiBucketSubscription> rust_subs;
+ for (const auto& sub : bucket_offsets) {
+ ffi::FfiBucketSubscription ffi_sub;
+ ffi_sub.bucket_id = sub.bucket_id;
+ ffi_sub.offset = sub.offset;
+ rust_subs.push_back(ffi_sub);
+ }
+
+ auto ffi_result = scanner_->subscribe_batch(std::move(rust_subs));
+ return utils::from_ffi_result(ffi_result);
+}
+
Result LogScanner::Poll(int64_t timeout_ms, ScanRecords& out) {
if (!Available()) {
return utils::make_error(1, "LogScanner not available");
diff --git a/bindings/cpp/src/types.rs b/bindings/cpp/src/types.rs
index 8221f22..d95da14 100644
--- a/bindings/cpp/src/types.rs
+++ b/bindings/cpp/src/types.rs
@@ -233,12 +233,14 @@ pub fn core_scan_records_to_ffi(records:
&fcore::record::ScanRecords) -> ffi::Ff
let mut ffi_records = Vec::new();
// Iterate over all buckets and their records
- for bucket_records in records.records_by_buckets().values() {
+ for (table_bucket, bucket_records) in records.records_by_buckets() {
+ let bucket_id = table_bucket.bucket_id();
for record in bucket_records {
let row = record.row();
let fields = core_row_to_ffi_fields(row);
ffi_records.push(ffi::FfiScanRecord {
+ bucket_id,
offset: record.offset(),
timestamp: record.timestamp(),
row: ffi::FfiGenericRow { fields },
diff --git a/crates/fluss/src/client/admin.rs b/crates/fluss/src/client/admin.rs
index e185af8..6646f97 100644
--- a/crates/fluss/src/client/admin.rs
+++ b/crates/fluss/src/client/admin.rs
@@ -232,6 +232,13 @@ impl FlussAdmin {
.check_and_update_table_metadata(from_ref(table_path))
.await?;
+ if buckets_id.is_empty() {
+ return Err(Error::UnexpectedError {
+ message: "Buckets are empty.".to_string(),
+ source: None,
+ });
+ }
+
let cluster = self.metadata.get_cluster();
let table_id = cluster.get_table(table_path).table_id;
diff --git a/crates/fluss/src/client/table/scanner.rs
b/crates/fluss/src/client/table/scanner.rs
index 1e70649..a9384d9 100644
--- a/crates/fluss/src/client/table/scanner.rs
+++ b/crates/fluss/src/client/table/scanner.rs
@@ -185,6 +185,28 @@ impl LogScanner {
Ok(())
}
+ pub async fn subscribe_batch(&self, bucket_offsets: HashMap<i32, i64>) ->
Result<()> {
+ self.metadata
+ .check_and_update_table_metadata(from_ref(&self.table_path))
+ .await?;
+ if bucket_offsets.is_empty() {
+ return Err(Error::UnexpectedError {
+ message: "Bucket offsets are empty.".to_string(),
+ source: None,
+ });
+ }
+
+ let mut scan_bucket_offsets = HashMap::new();
+ for (bucket_id, offset) in bucket_offsets {
+ let table_bucket = TableBucket::new(self.table_id, bucket_id);
+ scan_bucket_offsets.insert(table_bucket, offset);
+ }
+
+ self.log_scanner_status
+ .assign_scan_buckets(scan_bucket_offsets);
+ Ok(())
+ }
+
async fn poll_for_fetches(&self) -> Result<HashMap<TableBucket,
Vec<ScanRecord>>> {
self.log_fetcher.send_fetches_and_collect().await
}