This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 0c8a37f  feat: support ListOffset/SubscribeBatch/DropTable for cpp 
bindings (#100)
0c8a37f is described below

commit 0c8a37f417858d458230bdcbb03ef3a4eaec7115
Author: AlexZhao <[email protected]>
AuthorDate: Sun Dec 21 15:19:56 2025 +0800

    feat: support ListOffset/SubscribeBatch/DropTable for cpp bindings (#100)
    
    ---------
    
    Co-authored-by: 赵海源 <[email protected]>
---
 bindings/cpp/examples/example.cpp        |  99 +++++++++++++++++++++++--
 bindings/cpp/include/fluss.hpp           |  32 ++++++++
 bindings/cpp/src/admin.cpp               |  42 +++++++++++
 bindings/cpp/src/ffi_converter.hpp       |   1 +
 bindings/cpp/src/lib.rs                  | 122 +++++++++++++++++++++++++++++++
 bindings/cpp/src/table.cpp               |  17 +++++
 bindings/cpp/src/types.rs                |   4 +-
 crates/fluss/src/client/admin.rs         |   7 ++
 crates/fluss/src/client/table/scanner.rs |  22 ++++++
 9 files changed, 340 insertions(+), 6 deletions(-)

diff --git a/bindings/cpp/examples/example.cpp 
b/bindings/cpp/examples/example.cpp
index 5146f28..04f9ac6 100644
--- a/bindings/cpp/examples/example.cpp
+++ b/bindings/cpp/examples/example.cpp
@@ -19,6 +19,8 @@
 
 #include <iostream>
 #include <vector>
+#include <unordered_map>
+#include <chrono>
 
 static void check(const char* step, const fluss::Result& r) {
     if (!r.Ok()) {
@@ -37,6 +39,17 @@ int main() {
     fluss::Admin admin;
     check("get_admin", conn.GetAdmin(admin));
 
+    fluss::TablePath table_path("fluss", "sample_table_cpp_v1");
+    
+    // 2.1) Drop table if exists
+    std::cout << "Dropping table if exists..." << std::endl;
+    auto drop_result = admin.DropTable(table_path, true);
+    if (drop_result.Ok()) {
+        std::cout << "Table dropped successfully" << std::endl;
+    } else {
+        std::cout << "Table drop result: " << drop_result.error_message << 
std::endl;
+    }
+
     // 3) Schema & descriptor
     auto schema = fluss::Schema::NewBuilder()
                         .AddColumn("id", fluss::DataType::Int)
@@ -47,14 +60,14 @@ int main() {
 
     auto descriptor = fluss::TableDescriptor::NewBuilder()
                           .SetSchema(schema)
-                          .SetBucketCount(1)
+                          .SetBucketCount(3)
                           .SetProperty("table.log.arrow.compression.type", 
"NONE")
-                          .SetComment("cpp example table")
+                          .SetComment("cpp example table with 3 buckets")
                           .Build();
 
-    fluss::TablePath table_path("fluss", "sample_table_cpp_v1");
-    // ignore_if_exists=true to allow re-run
-    check("create_table", admin.CreateTable(table_path, descriptor, true));
+    // 3.1) Create table with 3 buckets
+    std::cout << "Creating table with 3 buckets..." << std::endl;
+    check("create_table", admin.CreateTable(table_path, descriptor, false));
 
     // 4) Get table
     fluss::Table table;
@@ -162,5 +175,81 @@ int main() {
         std::exit(1);
     }
 
+    // 8) List offsets examples
+    std::cout << "\n=== List Offsets Examples ===" << std::endl;
+    
+    // 8.1) Query earliest offsets for all buckets
+    std::vector<int32_t> all_bucket_ids;
+    for (int b = 0; b < buckets; ++b) {
+        all_bucket_ids.push_back(b);
+    }
+    
+    std::unordered_map<int32_t, int64_t> earliest_offsets;
+    check("list_earliest_offsets", 
+          admin.ListOffsets(table_path, all_bucket_ids, 
+                           fluss::OffsetQuery::Earliest(), 
+                           earliest_offsets));
+    std::cout << "Earliest offsets:" << std::endl;
+    for (const auto& [bucket_id, offset] : earliest_offsets) {
+        std::cout << "  Bucket " << bucket_id << ": offset=" << offset << 
std::endl;
+    }
+    
+    // 8.2) Query latest offsets for all buckets
+    std::unordered_map<int32_t, int64_t> latest_offsets;
+    check("list_latest_offsets", 
+          admin.ListOffsets(table_path, all_bucket_ids, 
+                           fluss::OffsetQuery::Latest(), 
+                           latest_offsets));
+    std::cout << "Latest offsets:" << std::endl;
+    for (const auto& [bucket_id, offset] : latest_offsets) {
+        std::cout << "  Bucket " << bucket_id << ": offset=" << offset << 
std::endl;
+    }
+    
+    // 8.3) Query offsets for a specific timestamp (current time - 1 hour)
+    auto now = std::chrono::system_clock::now();
+    auto one_hour_ago = now - std::chrono::hours(1);
+    auto timestamp_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
+        one_hour_ago.time_since_epoch()).count();
+    
+    std::unordered_map<int32_t, int64_t> timestamp_offsets;
+    check("list_timestamp_offsets", 
+          admin.ListOffsets(table_path, all_bucket_ids, 
+                           fluss::OffsetQuery::FromTimestamp(timestamp_ms), 
+                           timestamp_offsets));
+    std::cout << "Offsets for timestamp " << timestamp_ms << " (1 hour ago):" 
<< std::endl;
+    for (const auto& [bucket_id, offset] : timestamp_offsets) {
+        std::cout << "  Bucket " << bucket_id << ": offset=" << offset << 
std::endl;
+    }
+    
+    // 8.4) Use batch subscribe with offsets from list_offsets
+    std::cout << "\n=== Batch Subscribe Example ===" << std::endl;
+    fluss::LogScanner batch_scanner;
+    check("new_log_scanner_for_batch", table.NewLogScanner(batch_scanner));
+    
+    std::vector<fluss::BucketSubscription> subscriptions;
+    for (const auto& [bucket_id, offset] : earliest_offsets) {
+        subscriptions.push_back({bucket_id, offset});
+        std::cout << "Preparing subscription: bucket=" << bucket_id 
+                  << ", offset=" << offset << std::endl;
+    }
+    
+    check("subscribe_batch", batch_scanner.Subscribe(subscriptions));
+    std::cout << "Batch subscribed to " << subscriptions.size() << " buckets" 
<< std::endl;
+    
+    // 8.5) Poll and verify bucket_id in records
+    fluss::ScanRecords batch_records;
+    check("poll_batch", batch_scanner.Poll(5000, batch_records));
+    
+    std::cout << "Scanned " << batch_records.Size() << " records from batch 
subscription" << std::endl;
+    for (size_t i = 0; i < batch_records.Size() && i < 5; ++i) {
+        const auto& rec = batch_records[i];
+        std::cout << "  Record " << i << ": bucket_id=" << rec.bucket_id 
+                  << ", offset=" << rec.offset 
+                  << ", timestamp=" << rec.timestamp << std::endl;
+    }
+    if (batch_records.Size() > 5) {
+        std::cout << "  ... and " << (batch_records.Size() - 5) << " more 
records" << std::endl;
+    }
+
     return 0;
 }
diff --git a/bindings/cpp/include/fluss.hpp b/bindings/cpp/include/fluss.hpp
index 002f806..479adf9 100644
--- a/bindings/cpp/include/fluss.hpp
+++ b/bindings/cpp/include/fluss.hpp
@@ -63,6 +63,24 @@ enum class DatumType {
     Bytes = 7,
 };
 
+constexpr int64_t EARLIEST_OFFSET = -2;
+constexpr int64_t LATEST_OFFSET = -1;
+
+enum class OffsetSpec {
+    Earliest = 0,
+    Latest = 1,
+    Timestamp = 2,
+};
+
+struct OffsetQuery {
+    OffsetSpec spec;
+    int64_t timestamp{0};
+
+    static OffsetQuery Earliest() { return {OffsetSpec::Earliest, 0}; }
+    static OffsetQuery Latest() { return {OffsetSpec::Latest, 0}; }
+    static OffsetQuery FromTimestamp(int64_t ts) { return 
{OffsetSpec::Timestamp, ts}; }
+};
+
 struct Result {
     int32_t error_code{0};
     std::string error_message;
@@ -301,6 +319,7 @@ private:
 };
 
 struct ScanRecord {
+    int32_t bucket_id;
     int64_t offset;
     int64_t timestamp;
     GenericRow row;
@@ -324,6 +343,11 @@ struct BucketOffset {
     int64_t offset;
 };
 
+struct BucketSubscription {
+    int32_t bucket_id;
+    int64_t offset;
+};
+
 struct LakeSnapshot {
     int64_t snapshot_id;
     std::vector<BucketOffset> bucket_offsets;
@@ -372,10 +396,17 @@ public:
                        const TableDescriptor& descriptor,
                        bool ignore_if_exists = false);
 
+    Result DropTable(const TablePath& table_path, bool ignore_if_not_exists = 
false);
+
     Result GetTable(const TablePath& table_path, TableInfo& out);
 
     Result GetLatestLakeSnapshot(const TablePath& table_path, LakeSnapshot& 
out);
 
+    Result ListOffsets(const TablePath& table_path,
+                       const std::vector<int32_t>& bucket_ids,
+                       const OffsetQuery& offset_query,
+                       std::unordered_map<int32_t, int64_t>& out);
+
 private:
     friend class Connection;
     Admin(ffi::Admin* admin) noexcept;
@@ -448,6 +479,7 @@ public:
     bool Available() const;
 
     Result Subscribe(int32_t bucket_id, int64_t start_offset);
+    Result Subscribe(const std::vector<BucketSubscription>& bucket_offsets);
     Result Poll(int64_t timeout_ms, ScanRecords& out);
 
 private:
diff --git a/bindings/cpp/src/admin.cpp b/bindings/cpp/src/admin.cpp
index f6997a6..bf9c712 100644
--- a/bindings/cpp/src/admin.cpp
+++ b/bindings/cpp/src/admin.cpp
@@ -66,6 +66,16 @@ Result Admin::CreateTable(const TablePath& table_path,
     return utils::from_ffi_result(ffi_result);
 }
 
+Result Admin::DropTable(const TablePath& table_path, bool 
ignore_if_not_exists) {
+    if (!Available()) {
+        return utils::make_error(1, "Admin not available");
+    }
+
+    auto ffi_path = utils::to_ffi_table_path(table_path);
+    auto ffi_result = admin_->drop_table(ffi_path, ignore_if_not_exists);
+    return utils::from_ffi_result(ffi_result);
+}
+
 Result Admin::GetTable(const TablePath& table_path, TableInfo& out) {
     if (!Available()) {
         return utils::make_error(1, "Admin not available");
@@ -98,4 +108,36 @@ Result Admin::GetLatestLakeSnapshot(const TablePath& 
table_path, LakeSnapshot& o
     return result;
 }
 
+Result Admin::ListOffsets(const TablePath& table_path,
+                          const std::vector<int32_t>& bucket_ids,
+                          const OffsetQuery& offset_query,
+                          std::unordered_map<int32_t, int64_t>& out) {
+    if (!Available()) {
+        return utils::make_error(1, "Admin not available");
+    }
+
+    auto ffi_path = utils::to_ffi_table_path(table_path);
+    
+    rust::Vec<int32_t> rust_bucket_ids;
+    for (int32_t id : bucket_ids) {
+        rust_bucket_ids.push_back(id);
+    }
+
+    ffi::FfiOffsetQuery ffi_query;
+    ffi_query.offset_type = static_cast<int32_t>(offset_query.spec);
+    ffi_query.timestamp = offset_query.timestamp;
+
+    auto ffi_result = admin_->list_offsets(ffi_path, 
std::move(rust_bucket_ids), ffi_query);
+    
+    auto result = utils::from_ffi_result(ffi_result.result);
+    if (result.Ok()) {
+        out.clear();
+        for (const auto& pair : ffi_result.bucket_offsets) {
+            out[pair.bucket_id] = pair.offset;
+        }
+    }
+
+    return result;
+}
+
 }  // namespace fluss
diff --git a/bindings/cpp/src/ffi_converter.hpp 
b/bindings/cpp/src/ffi_converter.hpp
index 52dd7fe..63a2e91 100644
--- a/bindings/cpp/src/ffi_converter.hpp
+++ b/bindings/cpp/src/ffi_converter.hpp
@@ -222,6 +222,7 @@ inline GenericRow from_ffi_generic_row(const 
ffi::FfiGenericRow& ffi_row) {
 
 inline ScanRecord from_ffi_scan_record(const ffi::FfiScanRecord& ffi_record) {
     return ScanRecord{
+        ffi_record.bucket_id,
         ffi_record.offset,
         ffi_record.timestamp,
         from_ffi_generic_row(ffi_record.row)};
diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs
index 54d6941..cd1803b 100644
--- a/bindings/cpp/src/lib.rs
+++ b/bindings/cpp/src/lib.rs
@@ -104,6 +104,7 @@ mod ffi {
     }
 
     struct FfiScanRecord {
+        bucket_id: i32,
         offset: i64,
         timestamp: i64,
         row: FfiGenericRow,
@@ -130,6 +131,26 @@ mod ffi {
         offset: i64,
     }
 
+    struct FfiOffsetQuery {
+        offset_type: i32,
+        timestamp: i64,
+    }
+
+    struct FfiBucketSubscription {
+        bucket_id: i32,
+        offset: i64,
+    }
+
+    struct FfiBucketOffsetPair {
+        bucket_id: i32,
+        offset: i64,
+    }
+
+    struct FfiListOffsetsResult {
+        result: FfiResult,
+        bucket_offsets: Vec<FfiBucketOffsetPair>,
+    }
+
     struct FfiLakeSnapshotResult {
         result: FfiResult,
         lake_snapshot: FfiLakeSnapshot,
@@ -156,11 +177,22 @@ mod ffi {
             descriptor: &FfiTableDescriptor,
             ignore_if_exists: bool,
         ) -> FfiResult;
+        fn drop_table(
+            self: &Admin,
+            table_path: &FfiTablePath,
+            ignore_if_not_exists: bool,
+        ) -> FfiResult;
         fn get_table_info(self: &Admin, table_path: &FfiTablePath) -> 
FfiTableInfoResult;
         fn get_latest_lake_snapshot(
             self: &Admin,
             table_path: &FfiTablePath,
         ) -> FfiLakeSnapshotResult;
+        fn list_offsets(
+            self: &Admin,
+            table_path: &FfiTablePath,
+            bucket_ids: Vec<i32>,
+            offset_query: &FfiOffsetQuery,
+        ) -> FfiListOffsetsResult;
 
         // Table
         unsafe fn delete_table(table: *mut Table);
@@ -182,6 +214,10 @@ mod ffi {
         // LogScanner
         unsafe fn delete_log_scanner(scanner: *mut LogScanner);
         fn subscribe(self: &LogScanner, bucket_id: i32, start_offset: i64) -> 
FfiResult;
+        fn subscribe_batch(
+            self: &LogScanner,
+            subscriptions: Vec<FfiBucketSubscription>,
+        ) -> FfiResult;
         fn poll(self: &LogScanner, timeout_ms: i64) -> FfiScanRecordsResult;
     }
 }
@@ -330,6 +366,25 @@ impl Admin {
         }
     }
 
+    fn drop_table(
+        &self,
+        table_path: &ffi::FfiTablePath,
+        ignore_if_not_exists: bool,
+    ) -> ffi::FfiResult {
+        let path = fcore::metadata::TablePath::new(
+            table_path.database_name.clone(),
+            table_path.table_name.clone(),
+        );
+
+        let result =
+            RUNTIME.block_on(async { self.inner.drop_table(&path, 
ignore_if_not_exists).await });
+
+        match result {
+            Ok(_) => ok_result(),
+            Err(e) => err_result(1, e.to_string()),
+        }
+    }
+
     fn get_table_info(&self, table_path: &ffi::FfiTablePath) -> 
ffi::FfiTableInfoResult {
         let path = fcore::metadata::TablePath::new(
             table_path.database_name.clone(),
@@ -375,6 +430,58 @@ impl Admin {
             },
         }
     }
+
+    fn list_offsets(
+        &self,
+        table_path: &ffi::FfiTablePath,
+        bucket_ids: Vec<i32>,
+        offset_query: &ffi::FfiOffsetQuery,
+    ) -> ffi::FfiListOffsetsResult {
+        use fcore::rpc::message::OffsetSpec;
+
+        let path = fcore::metadata::TablePath::new(
+            table_path.database_name.clone(),
+            table_path.table_name.clone(),
+        );
+
+        let offset_spec = match offset_query.offset_type {
+            0 => OffsetSpec::Earliest,
+            1 => OffsetSpec::Latest,
+            2 => OffsetSpec::Timestamp(offset_query.timestamp),
+            _ => {
+                return ffi::FfiListOffsetsResult {
+                    result: err_result(
+                        1,
+                        format!("Invalid offset_type: {}", 
offset_query.offset_type),
+                    ),
+                    bucket_offsets: vec![],
+                };
+            }
+        };
+
+        let result = RUNTIME.block_on(async {
+            self.inner
+                .list_offsets(&path, &bucket_ids, offset_spec)
+                .await
+        });
+
+        match result {
+            Ok(offsets) => {
+                let bucket_offsets: Vec<ffi::FfiBucketOffsetPair> = offsets
+                    .into_iter()
+                    .map(|(bucket_id, offset)| ffi::FfiBucketOffsetPair { 
bucket_id, offset })
+                    .collect();
+                ffi::FfiListOffsetsResult {
+                    result: ok_result(),
+                    bucket_offsets,
+                }
+            }
+            Err(e) => ffi::FfiListOffsetsResult {
+                result: err_result(1, e.to_string()),
+                bucket_offsets: vec![],
+            },
+        }
+    }
 }
 
 // Table implementation
@@ -511,6 +618,21 @@ impl LogScanner {
         }
     }
 
+    fn subscribe_batch(&self, subscriptions: Vec<ffi::FfiBucketSubscription>) 
-> ffi::FfiResult {
+        use std::collections::HashMap;
+        let mut bucket_offsets = HashMap::new();
+        for sub in subscriptions {
+            bucket_offsets.insert(sub.bucket_id, sub.offset);
+        }
+
+        let result = RUNTIME.block_on(async { 
self.inner.subscribe_batch(bucket_offsets).await });
+
+        match result {
+            Ok(_) => ok_result(),
+            Err(e) => err_result(1, e.to_string()),
+        }
+    }
+
     fn poll(&self, timeout_ms: i64) -> ffi::FfiScanRecordsResult {
         let timeout = Duration::from_millis(timeout_ms as u64);
         let result = RUNTIME.block_on(async { self.inner.poll(timeout).await 
});
diff --git a/bindings/cpp/src/table.cpp b/bindings/cpp/src/table.cpp
index b28b783..d42e1a2 100644
--- a/bindings/cpp/src/table.cpp
+++ b/bindings/cpp/src/table.cpp
@@ -210,6 +210,23 @@ Result LogScanner::Subscribe(int32_t bucket_id, int64_t 
start_offset) {
     return utils::from_ffi_result(ffi_result);
 }
 
+Result LogScanner::Subscribe(const std::vector<BucketSubscription>& 
bucket_offsets) {
+    if (!Available()) {
+        return utils::make_error(1, "LogScanner not available");
+    }
+
+    rust::Vec<ffi::FfiBucketSubscription> rust_subs;
+    for (const auto& sub : bucket_offsets) {
+        ffi::FfiBucketSubscription ffi_sub;
+        ffi_sub.bucket_id = sub.bucket_id;
+        ffi_sub.offset = sub.offset;
+        rust_subs.push_back(ffi_sub);
+    }
+
+    auto ffi_result = scanner_->subscribe_batch(std::move(rust_subs));
+    return utils::from_ffi_result(ffi_result);
+}
+
 Result LogScanner::Poll(int64_t timeout_ms, ScanRecords& out) {
     if (!Available()) {
         return utils::make_error(1, "LogScanner not available");
diff --git a/bindings/cpp/src/types.rs b/bindings/cpp/src/types.rs
index 8221f22..d95da14 100644
--- a/bindings/cpp/src/types.rs
+++ b/bindings/cpp/src/types.rs
@@ -233,12 +233,14 @@ pub fn core_scan_records_to_ffi(records: 
&fcore::record::ScanRecords) -> ffi::Ff
     let mut ffi_records = Vec::new();
 
     // Iterate over all buckets and their records
-    for bucket_records in records.records_by_buckets().values() {
+    for (table_bucket, bucket_records) in records.records_by_buckets() {
+        let bucket_id = table_bucket.bucket_id();
         for record in bucket_records {
             let row = record.row();
             let fields = core_row_to_ffi_fields(row);
 
             ffi_records.push(ffi::FfiScanRecord {
+                bucket_id,
                 offset: record.offset(),
                 timestamp: record.timestamp(),
                 row: ffi::FfiGenericRow { fields },
diff --git a/crates/fluss/src/client/admin.rs b/crates/fluss/src/client/admin.rs
index e185af8..6646f97 100644
--- a/crates/fluss/src/client/admin.rs
+++ b/crates/fluss/src/client/admin.rs
@@ -232,6 +232,13 @@ impl FlussAdmin {
             .check_and_update_table_metadata(from_ref(table_path))
             .await?;
 
+        if buckets_id.is_empty() {
+            return Err(Error::UnexpectedError {
+                message: "Buckets are empty.".to_string(),
+                source: None,
+            });
+        }
+
         let cluster = self.metadata.get_cluster();
         let table_id = cluster.get_table(table_path).table_id;
 
diff --git a/crates/fluss/src/client/table/scanner.rs 
b/crates/fluss/src/client/table/scanner.rs
index 1e70649..a9384d9 100644
--- a/crates/fluss/src/client/table/scanner.rs
+++ b/crates/fluss/src/client/table/scanner.rs
@@ -185,6 +185,28 @@ impl LogScanner {
         Ok(())
     }
 
+    pub async fn subscribe_batch(&self, bucket_offsets: HashMap<i32, i64>) -> 
Result<()> {
+        self.metadata
+            .check_and_update_table_metadata(from_ref(&self.table_path))
+            .await?;
+        if bucket_offsets.is_empty() {
+            return Err(Error::UnexpectedError {
+                message: "Bucket offsets are empty.".to_string(),
+                source: None,
+            });
+        }
+
+        let mut scan_bucket_offsets = HashMap::new();
+        for (bucket_id, offset) in bucket_offsets {
+            let table_bucket = TableBucket::new(self.table_id, bucket_id);
+            scan_bucket_offsets.insert(table_bucket, offset);
+        }
+
+        self.log_scanner_status
+            .assign_scan_buckets(scan_bucket_offsets);
+        Ok(())
+    }
+
     async fn poll_for_fetches(&self) -> Result<HashMap<TableBucket, 
Vec<ScanRecord>>> {
         self.log_fetcher.send_fetches_and_collect().await
     }

Reply via email to