This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new a34f1c1 feat: introduce cpp bindings (#83)
a34f1c1 is described below
commit a34f1c1d438534bb9ec3de5f1830d600aec09839
Author: AlexZhao <[email protected]>
AuthorDate: Sat Dec 13 15:14:05 2025 +0800
feat: introduce cpp bindings (#83)
---------
Co-authored-by: 赵海源 <[email protected]>
---
Cargo.toml | 2 +-
Cargo.toml => bindings/cpp/.clang-format | 25 +-
bindings/cpp/.gitignore | 7 +
bindings/cpp/CMakeLists.txt | 107 +++++
Cargo.toml => bindings/cpp/Cargo.toml | 32 +-
.../fluss/src/config.rs => bindings/cpp/build.rs | 25 +-
bindings/cpp/examples/example.cpp | 166 +++++++
bindings/cpp/include/fluss.hpp | 461 ++++++++++++++++++
bindings/cpp/src/admin.cpp | 101 ++++
bindings/cpp/src/connection.cpp | 95 ++++
bindings/cpp/src/ffi_converter.hpp | 256 ++++++++++
bindings/cpp/src/lib.rs | 523 +++++++++++++++++++++
bindings/cpp/src/table.cpp | 228 +++++++++
bindings/cpp/src/types.rs | 485 +++++++++++++++++++
crates/fluss/src/config.rs | 14 +-
15 files changed, 2467 insertions(+), 60 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
index 15bcb79..b4ac03b 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -28,7 +28,7 @@ rust-version = "1.85"
[workspace]
resolver = "2"
-members = ["crates/fluss", "crates/examples", "bindings/python"]
+members = ["crates/fluss", "crates/examples", "bindings/python",
"bindings/cpp"]
[workspace.dependencies]
fluss = { version = "0.1.0", path = "./crates/fluss" }
diff --git a/Cargo.toml b/bindings/cpp/.clang-format
similarity index 54%
copy from Cargo.toml
copy to bindings/cpp/.clang-format
index 15bcb79..1c31900 100644
--- a/Cargo.toml
+++ b/bindings/cpp/.clang-format
@@ -15,24 +15,7 @@
# specific language governing permissions and limitations
# under the License.
-[workspace.package]
-categories = ["command-line-utilities"]
-description = "The rust implementation of fluss"
-repository = "https://github.com/apache/fluss-rust"
-name = "fluss"
-edition = "2024"
-version = "0.1.0"
-license = "Apache-2.0"
-rust-version = "1.85"
-
-
-[workspace]
-resolver = "2"
-members = ["crates/fluss", "crates/examples", "bindings/python"]
-
-[workspace.dependencies]
-fluss = { version = "0.1.0", path = "./crates/fluss" }
-tokio = { version = "1.44.2", features = ["full"] }
-clap = { version = "4.5.37", features = ["derive"] }
-arrow = { version = "57.0.0", features = ["ipc_compression"] }
-chrono = { version = "0.4", features = ["clock", "std", "wasmbind"] }
+---
+BasedOnStyle: Google
+ColumnLimit: 100
+IndentWidth: 4
diff --git a/bindings/cpp/.gitignore b/bindings/cpp/.gitignore
new file mode 100644
index 0000000..6836e70
--- /dev/null
+++ b/bindings/cpp/.gitignore
@@ -0,0 +1,7 @@
+build/
+cmake-build-*/
+.idea/
+*.o
+*.a
+*.so
+*.dylib
diff --git a/bindings/cpp/CMakeLists.txt b/bindings/cpp/CMakeLists.txt
new file mode 100644
index 0000000..629f3f0
--- /dev/null
+++ b/bindings/cpp/CMakeLists.txt
@@ -0,0 +1,107 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+cmake_minimum_required(VERSION 3.22)
+
+if (POLICY CMP0135)
+ cmake_policy(SET CMP0135 NEW)
+endif()
+
+project(fluss-cpp LANGUAGES CXX)
+
+include(FetchContent)
+set(FLUSS_GOOGLETEST_VERSION 1.15.2 CACHE STRING "version of GoogleTest")
+set(CMAKE_EXPORT_COMPILE_COMMANDS ON)
+
+find_package(Threads REQUIRED)
+
+if (NOT CMAKE_BUILD_TYPE)
+ set(CMAKE_BUILD_TYPE Debug)
+endif()
+
+set(CMAKE_CXX_STANDARD 17)
+set(CMAKE_CXX_STANDARD_REQUIRED ON)
+
+option(FLUSS_ENABLE_ADDRESS_SANITIZER "Enable address sanitizer" OFF)
+option(FLUSS_ENABLE_TESTING "Enable building test binary for fluss" OFF)
+option(FLUSS_DEV "Enable dev mode" OFF)
+
+if (FLUSS_DEV)
+ set(FLUSS_ENABLE_ADDRESS_SANITIZER ON)
+ set(FLUSS_ENABLE_TESTING ON)
+endif()
+
+# Get cargo target dir
+execute_process(COMMAND cargo locate-project --workspace --message-format plain
+ OUTPUT_VARIABLE CARGO_TARGET_DIR
+ WORKING_DIRECTORY ${PROJECT_SOURCE_DIR})
+string(REGEX REPLACE "/Cargo.toml\n$" "/target" CARGO_TARGET_DIR
"${CARGO_TARGET_DIR}")
+
+set(CARGO_MANIFEST ${PROJECT_SOURCE_DIR}/Cargo.toml)
+set(RUST_SOURCE_FILE ${PROJECT_SOURCE_DIR}/src/lib.rs)
+set(RUST_BRIDGE_CPP ${CARGO_TARGET_DIR}/cxxbridge/fluss-cpp/src/lib.rs.cc)
+set(RUST_HEADER_FILE ${CARGO_TARGET_DIR}/cxxbridge/fluss-cpp/src/lib.rs.h)
+
+if (CMAKE_BUILD_TYPE STREQUAL "Debug")
+ set(RUST_LIB
${CARGO_TARGET_DIR}/debug/${CMAKE_STATIC_LIBRARY_PREFIX}fluss_cpp${CMAKE_STATIC_LIBRARY_SUFFIX})
+else()
+ set(RUST_LIB
${CARGO_TARGET_DIR}/release/${CMAKE_STATIC_LIBRARY_PREFIX}fluss_cpp${CMAKE_STATIC_LIBRARY_SUFFIX})
+endif()
+
+set(CPP_INCLUDE_DIR ${PROJECT_SOURCE_DIR}/include
+ ${PROJECT_SOURCE_DIR}/src
+ ${CARGO_TARGET_DIR}/cxxbridge
+ ${CARGO_TARGET_DIR}/cxxbridge/fluss-cpp/src)
+
+file(GLOB CPP_SOURCE_FILE "src/*.cpp")
+file(GLOB CPP_HEADER_FILE "include/*.hpp")
+
+if (NOT CMAKE_BUILD_TYPE STREQUAL "Debug")
+ list(APPEND CARGO_BUILD_FLAGS "--release")
+endif()
+
+add_custom_target(cargo_build
+ COMMAND cargo build --manifest-path ${CARGO_MANIFEST} ${CARGO_BUILD_FLAGS}
+ BYPRODUCTS ${RUST_BRIDGE_CPP} ${RUST_LIB} ${RUST_HEADER_FILE}
+ DEPENDS ${RUST_SOURCE_FILE}
+ USES_TERMINAL
+ COMMENT "Running cargo..."
+)
+
+add_library(fluss_cpp STATIC ${CPP_SOURCE_FILE} ${RUST_BRIDGE_CPP})
+target_sources(fluss_cpp PUBLIC ${CPP_HEADER_FILE})
+target_sources(fluss_cpp PRIVATE ${RUST_HEADER_FILE})
+target_include_directories(fluss_cpp PUBLIC ${CPP_INCLUDE_DIR})
+target_link_libraries(fluss_cpp PUBLIC ${RUST_LIB})
+target_link_libraries(fluss_cpp PRIVATE ${CMAKE_DL_LIBS} Threads::Threads)
+if(APPLE)
+ target_link_libraries(fluss_cpp PUBLIC "-framework CoreFoundation"
"-framework Security")
+endif()
+
+add_executable(fluss_cpp_example examples/example.cpp)
+target_link_libraries(fluss_cpp_example fluss_cpp)
+target_include_directories(fluss_cpp_example PUBLIC ${CPP_INCLUDE_DIR})
+
+set_target_properties(fluss_cpp
+ PROPERTIES ADDITIONAL_CLEAN_FILES ${CARGO_TARGET_DIR}
+)
+add_dependencies(fluss_cpp cargo_build)
+
+if (FLUSS_ENABLE_ADDRESS_SANITIZER)
+ target_compile_options(fluss_cpp PRIVATE -fsanitize=leak,address,undefined
-fno-omit-frame-pointer -fno-common -O1)
+ target_link_options(fluss_cpp PRIVATE -fsanitize=leak,address,undefined)
+endif()
\ No newline at end of file
diff --git a/Cargo.toml b/bindings/cpp/Cargo.toml
similarity index 55%
copy from Cargo.toml
copy to bindings/cpp/Cargo.toml
index 15bcb79..2d3d913 100644
--- a/Cargo.toml
+++ b/bindings/cpp/Cargo.toml
@@ -15,24 +15,22 @@
# specific language governing permissions and limitations
# under the License.
-[workspace.package]
-categories = ["command-line-utilities"]
-description = "The rust implementation of fluss"
-repository = "https://github.com/apache/fluss-rust"
-name = "fluss"
-edition = "2024"
+[package]
+name = "fluss-cpp"
version = "0.1.0"
-license = "Apache-2.0"
-rust-version = "1.85"
+edition.workspace = true
+rust-version.workspace = true
+publish = false
+[lib]
+crate-type = ["staticlib"]
-[workspace]
-resolver = "2"
-members = ["crates/fluss", "crates/examples", "bindings/python"]
+[dependencies]
+anyhow = "1.0"
+arrow = { workspace = true }
+cxx = "1.0"
+fluss = { path = "../../crates/fluss" }
+tokio = { version = "1.27", features = ["rt-multi-thread", "macros"] }
-[workspace.dependencies]
-fluss = { version = "0.1.0", path = "./crates/fluss" }
-tokio = { version = "1.44.2", features = ["full"] }
-clap = { version = "4.5.37", features = ["derive"] }
-arrow = { version = "57.0.0", features = ["ipc_compression"] }
-chrono = { version = "0.4", features = ["clock", "std", "wasmbind"] }
+[build-dependencies]
+cxx-build = "1.0"
diff --git a/crates/fluss/src/config.rs b/bindings/cpp/build.rs
similarity index 55%
copy from crates/fluss/src/config.rs
copy to bindings/cpp/build.rs
index 0857496..ec75e24 100644
--- a/crates/fluss/src/config.rs
+++ b/bindings/cpp/build.rs
@@ -15,25 +15,10 @@
// specific language governing permissions and limitations
// under the License.
-use clap::Parser;
-use serde::{Deserialize, Serialize};
+fn main() {
+ cxx_build::bridge("src/lib.rs")
+ .std("c++17")
+ .compile("fluss-cpp-bridge");
-#[derive(Parser, Debug, Clone, Deserialize, Serialize, Default)]
-#[command(author, version, about, long_about = None)]
-pub struct Config {
- #[arg(long)]
- #[serde(skip_serializing_if = "Option::is_none")]
- pub bootstrap_server: Option<String>,
-
- #[arg(long, default_value_t = 10 * 1024 * 1024)]
- pub request_max_size: i32,
-
- #[arg(long, default_value_t = String::from("all"))]
- pub writer_acks: String,
-
- #[arg(long, default_value_t = i32::MAX)]
- pub writer_retries: i32,
-
- #[arg(long, default_value_t = 2 * 1024 * 1024)]
- pub writer_batch_size: i32,
+ println!("cargo:rerun-if-changed=src/lib.rs");
}
diff --git a/bindings/cpp/examples/example.cpp
b/bindings/cpp/examples/example.cpp
new file mode 100644
index 0000000..5146f28
--- /dev/null
+++ b/bindings/cpp/examples/example.cpp
@@ -0,0 +1,166 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "fluss.hpp"
+
+#include <iostream>
+#include <vector>
+
+static void check(const char* step, const fluss::Result& r) {
+ if (!r.Ok()) {
+ std::cerr << step << " failed: code=" << r.error_code
+ << " msg=" << r.error_message << std::endl;
+ std::exit(1);
+ }
+}
+
+int main() {
+ // 1) Connect
+ fluss::Connection conn;
+ check("connect", fluss::Connection::Connect("127.0.0.1:9123", conn));
+
+ // 2) Admin
+ fluss::Admin admin;
+ check("get_admin", conn.GetAdmin(admin));
+
+ // 3) Schema & descriptor
+ auto schema = fluss::Schema::NewBuilder()
+ .AddColumn("id", fluss::DataType::Int)
+ .AddColumn("name", fluss::DataType::String)
+ .AddColumn("score", fluss::DataType::Float)
+ .AddColumn("age", fluss::DataType::Int)
+ .Build();
+
+ auto descriptor = fluss::TableDescriptor::NewBuilder()
+ .SetSchema(schema)
+ .SetBucketCount(1)
+ .SetProperty("table.log.arrow.compression.type",
"NONE")
+ .SetComment("cpp example table")
+ .Build();
+
+ fluss::TablePath table_path("fluss", "sample_table_cpp_v1");
+ // ignore_if_exists=true to allow re-run
+ check("create_table", admin.CreateTable(table_path, descriptor, true));
+
+ // 4) Get table
+ fluss::Table table;
+ check("get_table", conn.GetTable(table_path, table));
+
+ // 5) Writer
+ fluss::AppendWriter writer;
+ check("new_append_writer", table.NewAppendWriter(writer));
+
+ struct RowData {
+ int id;
+ const char* name;
+ float score;
+ int age;
+ };
+
+ std::vector<RowData> rows = {
+ {1, "Alice", 95.2f, 25},
+ {2, "Bob", 87.2f, 30},
+ {3, "Charlie", 92.1f, 35},
+ };
+
+ for (const auto& r : rows) {
+ fluss::GenericRow row;
+ row.SetInt32(0, r.id);
+ row.SetString(1, r.name);
+ row.SetFloat32(2, r.score);
+ row.SetInt32(3, r.age);
+ check("append", writer.Append(row));
+ }
+ check("flush", writer.Flush());
+ std::cout << "Wrote " << rows.size() << " rows" << std::endl;
+
+ // 6) Scan
+ fluss::LogScanner scanner;
+ check("new_log_scanner", table.NewLogScanner(scanner));
+
+ auto info = table.GetTableInfo();
+ int buckets = info.num_buckets;
+ for (int b = 0; b < buckets; ++b) {
+ check("subscribe", scanner.Subscribe(b, 0));
+ }
+
+ fluss::ScanRecords records;
+ check("poll", scanner.Poll(5000, records));
+
+ std::cout << "Scanned records: " << records.records.size() << std::endl;
+ for (const auto& rec : records.records) {
+ std::cout << " offset=" << rec.offset << " id=" <<
rec.row.fields[0].i32_val
+ << " name=" << rec.row.fields[1].string_val
+ << " score=" << rec.row.fields[2].f32_val << " age=" <<
rec.row.fields[3].i32_val
+ << " ts=" << rec.timestamp << std::endl;
+ }
+
+ // 7) Project only id (0) and name (1) columns
+ std::vector<size_t> projected_columns = {0, 1};
+ fluss::LogScanner projected_scanner;
+ check("new_log_scanner_with_projection",
+ table.NewLogScannerWithProjection(projected_columns,
projected_scanner));
+
+ for (int b = 0; b < buckets; ++b) {
+ check("subscribe_projected", projected_scanner.Subscribe(b, 0));
+ }
+
+ fluss::ScanRecords projected_records;
+ check("poll_projected", projected_scanner.Poll(5000, projected_records));
+
+ std::cout << "Projected records: " << projected_records.records.size() <<
std::endl;
+
+ bool projection_verified = true;
+ for (size_t i = 0; i < projected_records.records.size(); ++i) {
+ const auto& rec = projected_records.records[i];
+ const auto& row = rec.row;
+
+ if (row.fields.size() != projected_columns.size()) {
+ std::cerr << "ERROR: Record " << i << " has " << row.fields.size()
+ << " fields, expected " << projected_columns.size() <<
std::endl;
+ projection_verified = false;
+ continue;
+ }
+
+ // Verify field types match expected columns
+ // Column 0 (id) should be Int32, Column 1 (name) should be String
+ if (row.fields[0].type != fluss::DatumType::Int32) {
+ std::cerr << "ERROR: Record " << i << " field 0 type mismatch,
expected Int32" << std::endl;
+ projection_verified = false;
+ }
+ if (row.fields[1].type != fluss::DatumType::String) {
+ std::cerr << "ERROR: Record " << i << " field 1 type mismatch,
expected String" << std::endl;
+ projection_verified = false;
+ }
+
+ // Print projected data
+ if (row.fields[0].type == fluss::DatumType::Int32 &&
+ row.fields[1].type == fluss::DatumType::String) {
+ std::cout << " Record " << i << ": id=" << row.fields[0].i32_val
+ << ", name=" << row.fields[1].string_val << std::endl;
+ }
+ }
+
+ if (projection_verified) {
+ std::cout << "Column pruning verification passed!" << std::endl;
+ } else {
+ std::cerr << "Column pruning verification failed!" << std::endl;
+ std::exit(1);
+ }
+
+ return 0;
+}
diff --git a/bindings/cpp/include/fluss.hpp b/bindings/cpp/include/fluss.hpp
new file mode 100644
index 0000000..002f806
--- /dev/null
+++ b/bindings/cpp/include/fluss.hpp
@@ -0,0 +1,461 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <memory>
+#include <optional>
+#include <string>
+#include <unordered_map>
+#include <vector>
+
+namespace fluss {
+
+namespace ffi {
+ struct Connection;
+ struct Admin;
+ struct Table;
+ struct AppendWriter;
+ struct LogScanner;
+} // namespace ffi
+
+enum class DataType {
+ Boolean = 1,
+ TinyInt = 2,
+ SmallInt = 3,
+ Int = 4,
+ BigInt = 5,
+ Float = 6,
+ Double = 7,
+ String = 8,
+ Bytes = 9,
+ Date = 10,
+ Time = 11,
+ Timestamp = 12,
+ TimestampLtz = 13,
+};
+
+enum class DatumType {
+ Null = 0,
+ Bool = 1,
+ Int32 = 2,
+ Int64 = 3,
+ Float32 = 4,
+ Float64 = 5,
+ String = 6,
+ Bytes = 7,
+};
+
+struct Result {
+ int32_t error_code{0};
+ std::string error_message;
+
+ bool Ok() const { return error_code == 0; }
+};
+
+struct TablePath {
+ std::string database_name;
+ std::string table_name;
+
+ TablePath() = default;
+ TablePath(std::string db, std::string tbl)
+ : database_name(std::move(db)), table_name(std::move(tbl)) {}
+
+ std::string ToString() const { return database_name + "." + table_name; }
+};
+
+struct Column {
+ std::string name;
+ DataType data_type;
+ std::string comment;
+};
+
+struct Schema {
+ std::vector<Column> columns;
+ std::vector<std::string> primary_keys;
+
+ class Builder {
+ public:
+ Builder& AddColumn(std::string name, DataType type,
+ std::string comment = "") {
+ columns_.push_back({std::move(name), type, std::move(comment)});
+ return *this;
+ }
+
+ Builder& SetPrimaryKeys(std::vector<std::string> keys) {
+ primary_keys_ = std::move(keys);
+ return *this;
+ }
+
+ Schema Build() {
+ return Schema{std::move(columns_), std::move(primary_keys_)};
+ }
+
+ private:
+ std::vector<Column> columns_;
+ std::vector<std::string> primary_keys_;
+ };
+
+ static Builder NewBuilder() { return Builder(); }
+};
+
+struct TableDescriptor {
+ Schema schema;
+ std::vector<std::string> partition_keys;
+ int32_t bucket_count{0};
+ std::vector<std::string> bucket_keys;
+ std::unordered_map<std::string, std::string> properties;
+ std::string comment;
+
+ class Builder {
+ public:
+ Builder& SetSchema(Schema s) {
+ schema_ = std::move(s);
+ return *this;
+ }
+
+ Builder& SetPartitionKeys(std::vector<std::string> keys) {
+ partition_keys_ = std::move(keys);
+ return *this;
+ }
+
+ Builder& SetBucketCount(int32_t count) {
+ bucket_count_ = count;
+ return *this;
+ }
+
+ Builder& SetBucketKeys(std::vector<std::string> keys) {
+ bucket_keys_ = std::move(keys);
+ return *this;
+ }
+
+ Builder& SetProperty(std::string key, std::string value) {
+ properties_[std::move(key)] = std::move(value);
+ return *this;
+ }
+
+ Builder& SetComment(std::string comment) {
+ comment_ = std::move(comment);
+ return *this;
+ }
+
+ TableDescriptor Build() {
+ return TableDescriptor{std::move(schema_),
+ std::move(partition_keys_),
+ bucket_count_,
+ std::move(bucket_keys_),
+ std::move(properties_),
+ std::move(comment_)};
+ }
+
+ private:
+ Schema schema_;
+ std::vector<std::string> partition_keys_;
+ int32_t bucket_count_{0};
+ std::vector<std::string> bucket_keys_;
+ std::unordered_map<std::string, std::string> properties_;
+ std::string comment_;
+ };
+
+ static Builder NewBuilder() { return Builder(); }
+};
+
+struct TableInfo {
+ int64_t table_id;
+ int32_t schema_id;
+ TablePath table_path;
+ int64_t created_time;
+ int64_t modified_time;
+ std::vector<std::string> primary_keys;
+ std::vector<std::string> bucket_keys;
+ std::vector<std::string> partition_keys;
+ int32_t num_buckets;
+ bool has_primary_key;
+ bool is_partitioned;
+ std::unordered_map<std::string, std::string> properties;
+ std::string comment;
+ Schema schema;
+};
+
+struct Datum {
+ DatumType type{DatumType::Null};
+ bool bool_val{false};
+ int32_t i32_val{0};
+ int64_t i64_val{0};
+ float f32_val{0.0F};
+ double f64_val{0.0};
+ std::string string_val;
+ std::vector<uint8_t> bytes_val;
+
+ static Datum Null() { return {}; }
+ static Datum Bool(bool v) {
+ Datum d;
+ d.type = DatumType::Bool;
+ d.bool_val = v;
+ return d;
+ }
+ static Datum Int32(int32_t v) {
+ Datum d;
+ d.type = DatumType::Int32;
+ d.i32_val = v;
+ return d;
+ }
+ static Datum Int64(int64_t v) {
+ Datum d;
+ d.type = DatumType::Int64;
+ d.i64_val = v;
+ return d;
+ }
+ static Datum Float32(float v) {
+ Datum d;
+ d.type = DatumType::Float32;
+ d.f32_val = v;
+ return d;
+ }
+ static Datum Float64(double v) {
+ Datum d;
+ d.type = DatumType::Float64;
+ d.f64_val = v;
+ return d;
+ }
+ static Datum String(std::string v) {
+ Datum d;
+ d.type = DatumType::String;
+ d.string_val = std::move(v);
+ return d;
+ }
+ static Datum Bytes(std::vector<uint8_t> v) {
+ Datum d;
+ d.type = DatumType::Bytes;
+ d.bytes_val = std::move(v);
+ return d;
+ }
+};
+
+struct GenericRow {
+ std::vector<Datum> fields;
+
+ void SetNull(size_t idx) {
+ EnsureSize(idx);
+ fields[idx] = Datum::Null();
+ }
+
+ void SetBool(size_t idx, bool v) {
+ EnsureSize(idx);
+ fields[idx] = Datum::Bool(v);
+ }
+
+ void SetInt32(size_t idx, int32_t v) {
+ EnsureSize(idx);
+ fields[idx] = Datum::Int32(v);
+ }
+
+ void SetInt64(size_t idx, int64_t v) {
+ EnsureSize(idx);
+ fields[idx] = Datum::Int64(v);
+ }
+
+ void SetFloat32(size_t idx, float v) {
+ EnsureSize(idx);
+ fields[idx] = Datum::Float32(v);
+ }
+
+ void SetFloat64(size_t idx, double v) {
+ EnsureSize(idx);
+ fields[idx] = Datum::Float64(v);
+ }
+
+ void SetString(size_t idx, std::string v) {
+ EnsureSize(idx);
+ fields[idx] = Datum::String(std::move(v));
+ }
+
+ void SetBytes(size_t idx, std::vector<uint8_t> v) {
+ EnsureSize(idx);
+ fields[idx] = Datum::Bytes(std::move(v));
+ }
+
+private:
+ void EnsureSize(size_t idx) {
+ if (fields.size() <= idx) {
+ fields.resize(idx + 1);
+ }
+ }
+};
+
+struct ScanRecord {
+ int64_t offset;
+ int64_t timestamp;
+ GenericRow row;
+};
+
+struct ScanRecords {
+ std::vector<ScanRecord> records;
+
+ size_t Size() const { return records.size(); }
+ bool Empty() const { return records.empty(); }
+ const ScanRecord& operator[](size_t idx) const { return records[idx]; }
+
+ auto begin() const { return records.begin(); }
+ auto end() const { return records.end(); }
+};
+
+struct BucketOffset {
+ int64_t table_id;
+ int64_t partition_id;
+ int32_t bucket_id;
+ int64_t offset;
+};
+
+struct LakeSnapshot {
+ int64_t snapshot_id;
+ std::vector<BucketOffset> bucket_offsets;
+};
+
+class AppendWriter;
+class LogScanner;
+class Admin;
+class Table;
+
+class Connection {
+public:
+ Connection() noexcept;
+ ~Connection() noexcept;
+
+ Connection(const Connection&) = delete;
+ Connection& operator=(const Connection&) = delete;
+ Connection(Connection&& other) noexcept;
+ Connection& operator=(Connection&& other) noexcept;
+
+ static Result Connect(const std::string& bootstrap_server, Connection&
out);
+
+ bool Available() const;
+
+ Result GetAdmin(Admin& out);
+ Result GetTable(const TablePath& table_path, Table& out);
+
+private:
+ void Destroy() noexcept;
+ ffi::Connection* conn_{nullptr};
+};
+
+class Admin {
+public:
+ Admin() noexcept;
+ ~Admin() noexcept;
+
+ Admin(const Admin&) = delete;
+ Admin& operator=(const Admin&) = delete;
+ Admin(Admin&& other) noexcept;
+ Admin& operator=(Admin&& other) noexcept;
+
+ bool Available() const;
+
+ Result CreateTable(const TablePath& table_path,
+ const TableDescriptor& descriptor,
+ bool ignore_if_exists = false);
+
+ Result GetTable(const TablePath& table_path, TableInfo& out);
+
+ Result GetLatestLakeSnapshot(const TablePath& table_path, LakeSnapshot&
out);
+
+private:
+ friend class Connection;
+ Admin(ffi::Admin* admin) noexcept;
+
+ void Destroy() noexcept;
+ ffi::Admin* admin_{nullptr};
+};
+
+class Table {
+public:
+ Table() noexcept;
+ ~Table() noexcept;
+
+ Table(const Table&) = delete;
+ Table& operator=(const Table&) = delete;
+ Table(Table&& other) noexcept;
+ Table& operator=(Table&& other) noexcept;
+
+ bool Available() const;
+
+ Result NewAppendWriter(AppendWriter& out);
+ Result NewLogScanner(LogScanner& out);
+ Result NewLogScannerWithProjection(const std::vector<size_t>&
column_indices, LogScanner& out);
+
+ TableInfo GetTableInfo() const;
+ TablePath GetTablePath() const;
+ bool HasPrimaryKey() const;
+
+private:
+ friend class Connection;
+ Table(ffi::Table* table) noexcept;
+
+ void Destroy() noexcept;
+ ffi::Table* table_{nullptr};
+};
+
+class AppendWriter {
+public:
+ AppendWriter() noexcept;
+ ~AppendWriter() noexcept;
+
+ AppendWriter(const AppendWriter&) = delete;
+ AppendWriter& operator=(const AppendWriter&) = delete;
+ AppendWriter(AppendWriter&& other) noexcept;
+ AppendWriter& operator=(AppendWriter&& other) noexcept;
+
+ bool Available() const;
+
+ Result Append(const GenericRow& row);
+ Result Flush();
+
+private:
+ friend class Table;
+ AppendWriter(ffi::AppendWriter* writer) noexcept;
+
+ void Destroy() noexcept;
+ ffi::AppendWriter* writer_{nullptr};
+};
+
+class LogScanner {
+public:
+ LogScanner() noexcept;
+ ~LogScanner() noexcept;
+
+ LogScanner(const LogScanner&) = delete;
+ LogScanner& operator=(const LogScanner&) = delete;
+ LogScanner(LogScanner&& other) noexcept;
+ LogScanner& operator=(LogScanner&& other) noexcept;
+
+ bool Available() const;
+
+ Result Subscribe(int32_t bucket_id, int64_t start_offset);
+ Result Poll(int64_t timeout_ms, ScanRecords& out);
+
+private:
+ friend class Table;
+ LogScanner(ffi::LogScanner* scanner) noexcept;
+
+ void Destroy() noexcept;
+ ffi::LogScanner* scanner_{nullptr};
+};
+
+} // namespace fluss
diff --git a/bindings/cpp/src/admin.cpp b/bindings/cpp/src/admin.cpp
new file mode 100644
index 0000000..f6997a6
--- /dev/null
+++ b/bindings/cpp/src/admin.cpp
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "fluss.hpp"
+#include "lib.rs.h"
+#include "ffi_converter.hpp"
+#include "rust/cxx.h"
+
+namespace fluss {
+
+Admin::Admin() noexcept = default;
+
+Admin::Admin(ffi::Admin* admin) noexcept : admin_(admin) {}
+
+Admin::~Admin() noexcept { Destroy(); }
+
+void Admin::Destroy() noexcept {
+ if (admin_) {
+ ffi::delete_admin(admin_);
+ admin_ = nullptr;
+ }
+}
+
+Admin::Admin(Admin&& other) noexcept : admin_(other.admin_) {
+ other.admin_ = nullptr;
+}
+
+Admin& Admin::operator=(Admin&& other) noexcept {
+ if (this != &other) {
+ Destroy();
+ admin_ = other.admin_;
+ other.admin_ = nullptr;
+ }
+ return *this;
+}
+
+bool Admin::Available() const { return admin_ != nullptr; }
+
+Result Admin::CreateTable(const TablePath& table_path,
+ const TableDescriptor& descriptor,
+ bool ignore_if_exists) {
+ if (!Available()) {
+ return utils::make_error(1, "Admin not available");
+ }
+
+ auto ffi_path = utils::to_ffi_table_path(table_path);
+ auto ffi_desc = utils::to_ffi_table_descriptor(descriptor);
+
+ auto ffi_result = admin_->create_table(ffi_path, ffi_desc,
ignore_if_exists);
+ return utils::from_ffi_result(ffi_result);
+}
+
+Result Admin::GetTable(const TablePath& table_path, TableInfo& out) {
+ if (!Available()) {
+ return utils::make_error(1, "Admin not available");
+ }
+
+ auto ffi_path = utils::to_ffi_table_path(table_path);
+ auto ffi_result = admin_->get_table_info(ffi_path);
+
+ auto result = utils::from_ffi_result(ffi_result.result);
+ if (result.Ok()) {
+ out = utils::from_ffi_table_info(ffi_result.table_info);
+ }
+
+ return result;
+}
+
+Result Admin::GetLatestLakeSnapshot(const TablePath& table_path, LakeSnapshot&
out) {
+ if (!Available()) {
+ return utils::make_error(1, "Admin not available");
+ }
+
+ auto ffi_path = utils::to_ffi_table_path(table_path);
+ auto ffi_result = admin_->get_latest_lake_snapshot(ffi_path);
+
+ auto result = utils::from_ffi_result(ffi_result.result);
+ if (result.Ok()) {
+ out = utils::from_ffi_lake_snapshot(ffi_result.lake_snapshot);
+ }
+
+ return result;
+}
+
+} // namespace fluss
diff --git a/bindings/cpp/src/connection.cpp b/bindings/cpp/src/connection.cpp
new file mode 100644
index 0000000..ea884cd
--- /dev/null
+++ b/bindings/cpp/src/connection.cpp
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "fluss.hpp"
+#include "lib.rs.h"
+#include "ffi_converter.hpp"
+#include "rust/cxx.h"
+
+namespace fluss {
+
+Connection::Connection() noexcept = default;
+
+Connection::~Connection() noexcept { Destroy(); }
+
+void Connection::Destroy() noexcept {
+ if (conn_) {
+ ffi::delete_connection(conn_);
+ conn_ = nullptr;
+ }
+}
+
+Connection::Connection(Connection&& other) noexcept : conn_(other.conn_) {
+ other.conn_ = nullptr;
+}
+
+Connection& Connection::operator=(Connection&& other) noexcept {
+ if (this != &other) {
+ Destroy();
+ conn_ = other.conn_;
+ other.conn_ = nullptr;
+ }
+ return *this;
+}
+
+Result Connection::Connect(const std::string& bootstrap_server, Connection&
out) {
+ try {
+ out.conn_ = ffi::new_connection(bootstrap_server);
+ return utils::make_ok();
+ } catch (const rust::Error& e) {
+ return utils::make_error(1, e.what());
+ } catch (const std::exception& e) {
+ return utils::make_error(1, e.what());
+ }
+}
+
+bool Connection::Available() const { return conn_ != nullptr; }
+
+Result Connection::GetAdmin(Admin& out) {
+ if (!Available()) {
+ return utils::make_error(1, "Connection not available");
+ }
+
+ try {
+ out.admin_ = conn_->get_admin();
+ return utils::make_ok();
+ } catch (const rust::Error& e) {
+ return utils::make_error(1, e.what());
+ } catch (const std::exception& e) {
+ return utils::make_error(1, e.what());
+ }
+}
+
+Result Connection::GetTable(const TablePath& table_path, Table& out) {
+ if (!Available()) {
+ return utils::make_error(1, "Connection not available");
+ }
+
+ try {
+ auto ffi_path = utils::to_ffi_table_path(table_path);
+ out.table_ = conn_->get_table(ffi_path);
+ return utils::make_ok();
+ } catch (const rust::Error& e) {
+ return utils::make_error(1, e.what());
+ } catch (const std::exception& e) {
+ return utils::make_error(1, e.what());
+ }
+}
+
+} // namespace fluss
diff --git a/bindings/cpp/src/ffi_converter.hpp
b/bindings/cpp/src/ffi_converter.hpp
new file mode 100644
index 0000000..52dd7fe
--- /dev/null
+++ b/bindings/cpp/src/ffi_converter.hpp
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include "fluss.hpp"
+#include "lib.rs.h"
+
+namespace fluss {
+namespace utils {
+
+inline Result make_error(int32_t code, std::string msg) {
+ return Result{code, std::move(msg)};
+}
+
+inline Result make_ok() {
+ return Result{0, {}};
+}
+
+inline Result from_ffi_result(const ffi::FfiResult& ffi_result) {
+ return Result{ffi_result.error_code,
std::string(ffi_result.error_message)};
+}
+
+inline ffi::FfiTablePath to_ffi_table_path(const TablePath& path) {
+ ffi::FfiTablePath ffi_path;
+ ffi_path.database_name = rust::String(path.database_name);
+ ffi_path.table_name = rust::String(path.table_name);
+ return ffi_path;
+}
+
+inline ffi::FfiColumn to_ffi_column(const Column& col) {
+ ffi::FfiColumn ffi_col;
+ ffi_col.name = rust::String(col.name);
+ ffi_col.data_type = static_cast<int32_t>(col.data_type);
+ ffi_col.comment = rust::String(col.comment);
+ return ffi_col;
+}
+
+inline ffi::FfiSchema to_ffi_schema(const Schema& schema) {
+ ffi::FfiSchema ffi_schema;
+
+ rust::Vec<ffi::FfiColumn> cols;
+ for (const auto& col : schema.columns) {
+ cols.push_back(to_ffi_column(col));
+ }
+ ffi_schema.columns = std::move(cols);
+
+ rust::Vec<rust::String> pks;
+ for (const auto& pk : schema.primary_keys) {
+ pks.push_back(rust::String(pk));
+ }
+ ffi_schema.primary_keys = std::move(pks);
+
+ return ffi_schema;
+}
+
+inline ffi::FfiTableDescriptor to_ffi_table_descriptor(const TableDescriptor&
desc) {
+ ffi::FfiTableDescriptor ffi_desc;
+
+ ffi_desc.schema = to_ffi_schema(desc.schema);
+
+ rust::Vec<rust::String> partition_keys;
+ for (const auto& pk : desc.partition_keys) {
+ partition_keys.push_back(rust::String(pk));
+ }
+ ffi_desc.partition_keys = std::move(partition_keys);
+
+ ffi_desc.bucket_count = desc.bucket_count;
+
+ rust::Vec<rust::String> bucket_keys;
+ for (const auto& bk : desc.bucket_keys) {
+ bucket_keys.push_back(rust::String(bk));
+ }
+ ffi_desc.bucket_keys = std::move(bucket_keys);
+
+ rust::Vec<ffi::HashMapValue> props;
+ for (const auto& [k, v] : desc.properties) {
+ ffi::HashMapValue prop;
+ prop.key = rust::String(k);
+ prop.value = rust::String(v);
+ props.push_back(prop);
+ }
+ ffi_desc.properties = std::move(props);
+
+ ffi_desc.comment = rust::String(desc.comment);
+
+ return ffi_desc;
+}
+
+inline ffi::FfiDatum to_ffi_datum(const Datum& datum) {
+ ffi::FfiDatum ffi_datum;
+ ffi_datum.datum_type = static_cast<int32_t>(datum.type);
+ ffi_datum.bool_val = datum.bool_val;
+ ffi_datum.i32_val = datum.i32_val;
+ ffi_datum.i64_val = datum.i64_val;
+ ffi_datum.f32_val = datum.f32_val;
+ ffi_datum.f64_val = datum.f64_val;
+ ffi_datum.string_val = rust::String(datum.string_val);
+
+ rust::Vec<uint8_t> bytes;
+ for (auto b : datum.bytes_val) {
+ bytes.push_back(b);
+ }
+ ffi_datum.bytes_val = std::move(bytes);
+
+ return ffi_datum;
+}
+
+inline ffi::FfiGenericRow to_ffi_generic_row(const GenericRow& row) {
+ ffi::FfiGenericRow ffi_row;
+
+ rust::Vec<ffi::FfiDatum> fields;
+ for (const auto& field : row.fields) {
+ fields.push_back(to_ffi_datum(field));
+ }
+ ffi_row.fields = std::move(fields);
+
+ return ffi_row;
+}
+
+inline Column from_ffi_column(const ffi::FfiColumn& ffi_col) {
+ return Column{
+ std::string(ffi_col.name),
+ static_cast<DataType>(ffi_col.data_type),
+ std::string(ffi_col.comment)};
+}
+
+inline Schema from_ffi_schema(const ffi::FfiSchema& ffi_schema) {
+ Schema schema;
+
+ for (const auto& col : ffi_schema.columns) {
+ schema.columns.push_back(from_ffi_column(col));
+ }
+
+ for (const auto& pk : ffi_schema.primary_keys) {
+ schema.primary_keys.push_back(std::string(pk));
+ }
+
+ return schema;
+}
+
+inline TableInfo from_ffi_table_info(const ffi::FfiTableInfo& ffi_info) {
+ TableInfo info;
+
+ info.table_id = ffi_info.table_id;
+ info.schema_id = ffi_info.schema_id;
+ info.table_path = TablePath{
+ std::string(ffi_info.table_path.database_name),
+ std::string(ffi_info.table_path.table_name)};
+ info.created_time = ffi_info.created_time;
+ info.modified_time = ffi_info.modified_time;
+
+ for (const auto& pk : ffi_info.primary_keys) {
+ info.primary_keys.push_back(std::string(pk));
+ }
+
+ for (const auto& bk : ffi_info.bucket_keys) {
+ info.bucket_keys.push_back(std::string(bk));
+ }
+
+ for (const auto& pk : ffi_info.partition_keys) {
+ info.partition_keys.push_back(std::string(pk));
+ }
+
+ info.num_buckets = ffi_info.num_buckets;
+ info.has_primary_key = ffi_info.has_primary_key;
+ info.is_partitioned = ffi_info.is_partitioned;
+
+ for (const auto& prop : ffi_info.properties) {
+ info.properties[std::string(prop.key)] = std::string(prop.value);
+ }
+
+ info.comment = std::string(ffi_info.comment);
+ info.schema = from_ffi_schema(ffi_info.schema);
+
+ return info;
+}
+
+inline Datum from_ffi_datum(const ffi::FfiDatum& ffi_datum) {
+ Datum datum;
+ datum.type = static_cast<DatumType>(ffi_datum.datum_type);
+ datum.bool_val = ffi_datum.bool_val;
+ datum.i32_val = ffi_datum.i32_val;
+ datum.i64_val = ffi_datum.i64_val;
+ datum.f32_val = ffi_datum.f32_val;
+ datum.f64_val = ffi_datum.f64_val;
+ // todo: avoid copy string
+ datum.string_val = std::string(ffi_datum.string_val);
+
+ for (auto b : ffi_datum.bytes_val) {
+ datum.bytes_val.push_back(b);
+ }
+
+ return datum;
+}
+
+inline GenericRow from_ffi_generic_row(const ffi::FfiGenericRow& ffi_row) {
+ GenericRow row;
+
+ for (const auto& field : ffi_row.fields) {
+ row.fields.push_back(from_ffi_datum(field));
+ }
+
+ return row;
+}
+
+inline ScanRecord from_ffi_scan_record(const ffi::FfiScanRecord& ffi_record) {
+ return ScanRecord{
+ ffi_record.offset,
+ ffi_record.timestamp,
+ from_ffi_generic_row(ffi_record.row)};
+}
+
+inline ScanRecords from_ffi_scan_records(const ffi::FfiScanRecords&
ffi_records) {
+ ScanRecords records;
+
+ for (const auto& record : ffi_records.records) {
+ records.records.push_back(from_ffi_scan_record(record));
+ }
+
+ return records;
+}
+
+inline LakeSnapshot from_ffi_lake_snapshot(const ffi::FfiLakeSnapshot&
ffi_snapshot) {
+ LakeSnapshot snapshot;
+ snapshot.snapshot_id = ffi_snapshot.snapshot_id;
+
+ for (const auto& offset : ffi_snapshot.bucket_offsets) {
+ snapshot.bucket_offsets.push_back(BucketOffset{
+ offset.table_id,
+ offset.partition_id,
+ offset.bucket_id,
+ offset.offset});
+ }
+
+ return snapshot;
+}
+
+} // namespace utils
+} // namespace fluss
diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs
new file mode 100644
index 0000000..3e883e2
--- /dev/null
+++ b/bindings/cpp/src/lib.rs
@@ -0,0 +1,523 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+mod types;
+
+use std::sync::{Arc, LazyLock};
+use std::time::Duration;
+
+use fluss as fcore;
+
+static RUNTIME: LazyLock<tokio::runtime::Runtime> = LazyLock::new(|| {
+ tokio::runtime::Builder::new_multi_thread()
+ .enable_all()
+ .build()
+ .unwrap()
+});
+
+#[cxx::bridge(namespace = "fluss::ffi")]
+mod ffi {
+ struct HashMapValue {
+ key: String,
+ value: String,
+ }
+
+ struct FfiResult {
+ error_code: i32,
+ error_message: String,
+ }
+
+ struct FfiTablePath {
+ database_name: String,
+ table_name: String,
+ }
+
+ struct FfiColumn {
+ name: String,
+ data_type: i32,
+ comment: String,
+ }
+
+ struct FfiSchema {
+ columns: Vec<FfiColumn>,
+ primary_keys: Vec<String>,
+ }
+
+ struct FfiTableDescriptor {
+ schema: FfiSchema,
+ partition_keys: Vec<String>,
+ bucket_count: i32,
+ bucket_keys: Vec<String>,
+ properties: Vec<HashMapValue>,
+ comment: String,
+ }
+
+ struct FfiTableInfo {
+ table_id: i64,
+ schema_id: i32,
+ table_path: FfiTablePath,
+ created_time: i64,
+ modified_time: i64,
+ primary_keys: Vec<String>,
+ bucket_keys: Vec<String>,
+ partition_keys: Vec<String>,
+ num_buckets: i32,
+ has_primary_key: bool,
+ is_partitioned: bool,
+ properties: Vec<HashMapValue>,
+ comment: String,
+ schema: FfiSchema,
+ }
+
+ struct FfiTableInfoResult {
+ result: FfiResult,
+ table_info: FfiTableInfo,
+ }
+
+ struct FfiDatum {
+ datum_type: i32,
+ bool_val: bool,
+ i32_val: i32,
+ i64_val: i64,
+ f32_val: f32,
+ f64_val: f64,
+ string_val: String,
+ bytes_val: Vec<u8>,
+ }
+
+ struct FfiGenericRow {
+ fields: Vec<FfiDatum>,
+ }
+
+ struct FfiScanRecord {
+ offset: i64,
+ timestamp: i64,
+ row: FfiGenericRow,
+ }
+
+ struct FfiScanRecords {
+ records: Vec<FfiScanRecord>,
+ }
+
+ struct FfiScanRecordsResult {
+ result: FfiResult,
+ scan_records: FfiScanRecords,
+ }
+
+ struct FfiLakeSnapshot {
+ snapshot_id: i64,
+ bucket_offsets: Vec<FfiBucketOffset>,
+ }
+
+ struct FfiBucketOffset {
+ table_id: i64,
+ partition_id: i64,
+ bucket_id: i32,
+ offset: i64,
+ }
+
+ struct FfiLakeSnapshotResult {
+ result: FfiResult,
+ lake_snapshot: FfiLakeSnapshot,
+ }
+
+ extern "Rust" {
+ type Connection;
+ type Admin;
+ type Table;
+ type AppendWriter;
+ type LogScanner;
+
+ // Connection
+ fn new_connection(bootstrap_server: &str) -> Result<*mut Connection>;
+ unsafe fn delete_connection(conn: *mut Connection);
+ fn get_admin(self: &Connection) -> Result<*mut Admin>;
+ fn get_table(self: &Connection, table_path: &FfiTablePath) ->
Result<*mut Table>;
+
+ // Admin
+ unsafe fn delete_admin(admin: *mut Admin);
+ fn create_table(
+ self: &Admin,
+ table_path: &FfiTablePath,
+ descriptor: &FfiTableDescriptor,
+ ignore_if_exists: bool,
+ ) -> FfiResult;
+ fn get_table_info(self: &Admin, table_path: &FfiTablePath) ->
FfiTableInfoResult;
+ fn get_latest_lake_snapshot(
+ self: &Admin,
+ table_path: &FfiTablePath,
+ ) -> FfiLakeSnapshotResult;
+
+ // Table
+ unsafe fn delete_table(table: *mut Table);
+ fn new_append_writer(self: &Table) -> Result<*mut AppendWriter>;
+ fn new_log_scanner(self: &Table) -> Result<*mut LogScanner>;
+ fn new_log_scanner_with_projection(
+ self: &Table,
+ column_indices: Vec<usize>,
+ ) -> Result<*mut LogScanner>;
+ fn get_table_info_from_table(self: &Table) -> FfiTableInfo;
+ fn get_table_path(self: &Table) -> FfiTablePath;
+ fn has_primary_key(self: &Table) -> bool;
+
+ // AppendWriter
+ unsafe fn delete_append_writer(writer: *mut AppendWriter);
+ fn append(self: &mut AppendWriter, row: &FfiGenericRow) -> FfiResult;
+ fn flush(self: &mut AppendWriter) -> FfiResult;
+
+ // LogScanner
+ unsafe fn delete_log_scanner(scanner: *mut LogScanner);
+ fn subscribe(self: &LogScanner, bucket_id: i32, start_offset: i64) ->
FfiResult;
+ fn poll(self: &LogScanner, timeout_ms: i64) -> FfiScanRecordsResult;
+ }
+}
+
+pub struct Connection {
+ inner: Arc<fcore::client::FlussConnection>,
+ #[allow(dead_code)]
+ metadata: Option<Arc<fcore::client::Metadata>>,
+}
+
+pub struct Admin {
+ inner: fcore::client::FlussAdmin,
+}
+
+pub struct Table {
+ connection: Arc<fcore::client::FlussConnection>,
+ metadata: Arc<fcore::client::Metadata>,
+ table_info: fcore::metadata::TableInfo,
+ table_path: fcore::metadata::TablePath,
+ has_pk: bool,
+}
+
+pub struct AppendWriter {
+ inner: fcore::client::AppendWriter,
+}
+
+pub struct LogScanner {
+ inner: fcore::client::LogScanner,
+}
+
+fn ok_result() -> ffi::FfiResult {
+ ffi::FfiResult {
+ error_code: 0,
+ error_message: String::new(),
+ }
+}
+
+fn err_result(code: i32, msg: String) -> ffi::FfiResult {
+ ffi::FfiResult {
+ error_code: code,
+ error_message: msg,
+ }
+}
+
+// Connection implementation
+fn new_connection(bootstrap_server: &str) -> Result<*mut Connection, String> {
+ let config = fluss::config::Config {
+ bootstrap_server: Some(bootstrap_server.to_string()),
+ ..Default::default()
+ };
+
+ let conn = RUNTIME.block_on(async {
fcore::client::FlussConnection::new(config).await });
+
+ match conn {
+ Ok(c) => {
+ let conn = Box::into_raw(Box::new(Connection {
+ inner: Arc::new(c),
+ metadata: None,
+ }));
+ Ok(conn)
+ }
+ Err(e) => Err(format!("Failed to connect: {}", e)),
+ }
+}
+
+unsafe fn delete_connection(conn: *mut Connection) {
+ if !conn.is_null() {
+ unsafe {
+ drop(Box::from_raw(conn));
+ }
+ }
+}
+
+impl Connection {
+ fn get_admin(&self) -> Result<*mut Admin, String> {
+ let admin_result = RUNTIME.block_on(async {
self.inner.get_admin().await });
+
+ match admin_result {
+ Ok(admin) => {
+ let admin = Box::into_raw(Box::new(Admin { inner: admin }));
+ Ok(admin)
+ }
+ Err(e) => Err(format!("Failed to get admin: {}", e)),
+ }
+ }
+
+ fn get_table(&self, table_path: &ffi::FfiTablePath) -> Result<*mut Table,
String> {
+ let path = fcore::metadata::TablePath::new(
+ table_path.database_name.clone(),
+ table_path.table_name.clone(),
+ );
+
+ let table_result = RUNTIME.block_on(async {
self.inner.get_table(&path).await });
+
+ match table_result {
+ Ok(t) => {
+ let table = Box::into_raw(Box::new(Table {
+ connection: self.inner.clone(),
+ metadata: t.metadata().clone(),
+ table_info: t.table_info().clone(),
+ table_path: t.table_path().clone(),
+ has_pk: t.has_primary_key(),
+ }));
+ Ok(table)
+ }
+ Err(e) => Err(format!("Failed to get table: {}", e)),
+ }
+ }
+}
+
+// Admin implementation
+unsafe fn delete_admin(admin: *mut Admin) {
+ if !admin.is_null() {
+ unsafe {
+ drop(Box::from_raw(admin));
+ }
+ }
+}
+
+impl Admin {
+ fn create_table(
+ &self,
+ table_path: &ffi::FfiTablePath,
+ descriptor: &ffi::FfiTableDescriptor,
+ ignore_if_exists: bool,
+ ) -> ffi::FfiResult {
+ let path = fcore::metadata::TablePath::new(
+ table_path.database_name.clone(),
+ table_path.table_name.clone(),
+ );
+
+ let core_descriptor = match types::ffi_descriptor_to_core(descriptor) {
+ Ok(d) => d,
+ Err(e) => return err_result(1, e.to_string()),
+ };
+
+ let result = RUNTIME.block_on(async {
+ self.inner
+ .create_table(&path, &core_descriptor, ignore_if_exists)
+ .await
+ });
+
+ match result {
+ Ok(_) => ok_result(),
+ Err(e) => err_result(2, e.to_string()),
+ }
+ }
+
+ fn get_table_info(&self, table_path: &ffi::FfiTablePath) ->
ffi::FfiTableInfoResult {
+ let path = fcore::metadata::TablePath::new(
+ table_path.database_name.clone(),
+ table_path.table_name.clone(),
+ );
+
+ let result = RUNTIME.block_on(async {
self.inner.get_table(&path).await });
+
+ match result {
+ Ok(info) => ffi::FfiTableInfoResult {
+ result: ok_result(),
+ table_info: types::core_table_info_to_ffi(&info),
+ },
+ Err(e) => ffi::FfiTableInfoResult {
+ result: err_result(1, e.to_string()),
+ table_info: types::empty_table_info(),
+ },
+ }
+ }
+
+ fn get_latest_lake_snapshot(
+ &self,
+ table_path: &ffi::FfiTablePath,
+ ) -> ffi::FfiLakeSnapshotResult {
+ let path = fcore::metadata::TablePath::new(
+ table_path.database_name.clone(),
+ table_path.table_name.clone(),
+ );
+
+ let result = RUNTIME.block_on(async {
self.inner.get_latest_lake_snapshot(&path).await });
+
+ match result {
+ Ok(snapshot) => ffi::FfiLakeSnapshotResult {
+ result: ok_result(),
+ lake_snapshot: types::core_lake_snapshot_to_ffi(&snapshot),
+ },
+ Err(e) => ffi::FfiLakeSnapshotResult {
+ result: err_result(1, e.to_string()),
+ lake_snapshot: ffi::FfiLakeSnapshot {
+ snapshot_id: -1,
+ bucket_offsets: vec![],
+ },
+ },
+ }
+ }
+}
+
+// Table implementation
+unsafe fn delete_table(table: *mut Table) {
+ if !table.is_null() {
+ unsafe {
+ drop(Box::from_raw(table));
+ }
+ }
+}
+
+impl Table {
+ fn new_append_writer(&self) -> Result<*mut AppendWriter, String> {
+ let _enter = RUNTIME.enter();
+
+ let fluss_table = fcore::client::FlussTable::new(
+ &self.connection,
+ self.metadata.clone(),
+ self.table_info.clone(),
+ );
+
+ let table_append = match fluss_table.new_append() {
+ Ok(a) => a,
+ Err(e) => return Err(format!("Failed to create append: {}", e)),
+ };
+
+ let writer = table_append.create_writer();
+ let writer = Box::into_raw(Box::new(AppendWriter { inner: writer }));
+ Ok(writer)
+ }
+
+ fn new_log_scanner(&self) -> Result<*mut LogScanner, String> {
+ let fluss_table = fcore::client::FlussTable::new(
+ &self.connection,
+ self.metadata.clone(),
+ self.table_info.clone(),
+ );
+
+ let scanner = fluss_table.new_scan().create_log_scanner();
+ let scanner = Box::into_raw(Box::new(LogScanner { inner: scanner }));
+ Ok(scanner)
+ }
+
+ fn new_log_scanner_with_projection(
+ &self,
+ column_indices: Vec<usize>,
+ ) -> Result<*mut LogScanner, String> {
+ let fluss_table = fcore::client::FlussTable::new(
+ &self.connection,
+ self.metadata.clone(),
+ self.table_info.clone(),
+ );
+
+ let scan = fluss_table.new_scan();
+ let scan = match scan.project(&column_indices) {
+ Ok(s) => s,
+ Err(e) => return Err(format!("Failed to project columns: {}", e)),
+ };
+ let scanner = scan.create_log_scanner();
+ let scanner = Box::into_raw(Box::new(LogScanner { inner: scanner }));
+ Ok(scanner)
+ }
+
+ fn get_table_info_from_table(&self) -> ffi::FfiTableInfo {
+ types::core_table_info_to_ffi(&self.table_info)
+ }
+
+ fn get_table_path(&self) -> ffi::FfiTablePath {
+ ffi::FfiTablePath {
+ database_name: self.table_path.database().to_string(),
+ table_name: self.table_path.table().to_string(),
+ }
+ }
+
+ fn has_primary_key(&self) -> bool {
+ self.has_pk
+ }
+}
+
+// AppendWriter implementation
+unsafe fn delete_append_writer(writer: *mut AppendWriter) {
+ if !writer.is_null() {
+ unsafe {
+ drop(Box::from_raw(writer));
+ }
+ }
+}
+
+impl AppendWriter {
+ fn append(&mut self, row: &ffi::FfiGenericRow) -> ffi::FfiResult {
+ let generic_row = types::ffi_row_to_core(row);
+
+ let result = RUNTIME.block_on(async {
self.inner.append(generic_row).await });
+
+ match result {
+ Ok(_) => ok_result(),
+ Err(e) => err_result(1, e.to_string()),
+ }
+ }
+
+ fn flush(&mut self) -> ffi::FfiResult {
+ let result = RUNTIME.block_on(async { self.inner.flush().await });
+
+ match result {
+ Ok(_) => ok_result(),
+ Err(e) => err_result(1, e.to_string()),
+ }
+ }
+}
+
+// LogScanner implementation
+unsafe fn delete_log_scanner(scanner: *mut LogScanner) {
+ if !scanner.is_null() {
+ unsafe {
+ drop(Box::from_raw(scanner));
+ }
+ }
+}
+
+impl LogScanner {
+ fn subscribe(&self, bucket_id: i32, start_offset: i64) -> ffi::FfiResult {
+ let result =
+ RUNTIME.block_on(async { self.inner.subscribe(bucket_id,
start_offset).await });
+
+ match result {
+ Ok(_) => ok_result(),
+ Err(e) => err_result(1, e.to_string()),
+ }
+ }
+
+ fn poll(&self, timeout_ms: i64) -> ffi::FfiScanRecordsResult {
+ let timeout = Duration::from_millis(timeout_ms as u64);
+ let result = RUNTIME.block_on(async { self.inner.poll(timeout).await
});
+
+ match result {
+ Ok(records) => ffi::FfiScanRecordsResult {
+ result: ok_result(),
+ scan_records: types::core_scan_records_to_ffi(&records),
+ },
+ Err(e) => ffi::FfiScanRecordsResult {
+ result: err_result(1, e.to_string()),
+ scan_records: ffi::FfiScanRecords { records: vec![] },
+ },
+ }
+ }
+}
diff --git a/bindings/cpp/src/table.cpp b/bindings/cpp/src/table.cpp
new file mode 100644
index 0000000..b28b783
--- /dev/null
+++ b/bindings/cpp/src/table.cpp
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "fluss.hpp"
+#include "lib.rs.h"
+#include "ffi_converter.hpp"
+#include "rust/cxx.h"
+
+namespace fluss {
+
+Table::Table() noexcept = default;
+
+Table::Table(ffi::Table* table) noexcept : table_(table) {}
+
+Table::~Table() noexcept { Destroy(); }
+
+void Table::Destroy() noexcept {
+ if (table_) {
+ ffi::delete_table(table_);
+ table_ = nullptr;
+ }
+}
+
+Table::Table(Table&& other) noexcept : table_(other.table_) {
+ other.table_ = nullptr;
+}
+
+Table& Table::operator=(Table&& other) noexcept {
+ if (this != &other) {
+ Destroy();
+ table_ = other.table_;
+ other.table_ = nullptr;
+ }
+ return *this;
+}
+
+bool Table::Available() const { return table_ != nullptr; }
+
+Result Table::NewAppendWriter(AppendWriter& out) {
+ if (!Available()) {
+ return utils::make_error(1, "Table not available");
+ }
+
+ try {
+ out.writer_ = table_->new_append_writer();
+ return utils::make_ok();
+ } catch (const rust::Error& e) {
+ return utils::make_error(1, e.what());
+ } catch (const std::exception& e) {
+ return utils::make_error(1, e.what());
+ }
+}
+
+Result Table::NewLogScanner(LogScanner& out) {
+ if (!Available()) {
+ return utils::make_error(1, "Table not available");
+ }
+
+ try {
+ out.scanner_ = table_->new_log_scanner();
+ return utils::make_ok();
+ } catch (const rust::Error& e) {
+ return utils::make_error(1, e.what());
+ } catch (const std::exception& e) {
+ return utils::make_error(1, e.what());
+ }
+}
+
+Result Table::NewLogScannerWithProjection(const std::vector<size_t>&
column_indices, LogScanner& out) {
+ if (!Available()) {
+ return utils::make_error(1, "Table not available");
+ }
+
+ try {
+ rust::Vec<size_t> rust_indices;
+ for (size_t idx : column_indices) {
+ rust_indices.push_back(idx);
+ }
+ out.scanner_ =
table_->new_log_scanner_with_projection(std::move(rust_indices));
+ return utils::make_ok();
+ } catch (const rust::Error& e) {
+ return utils::make_error(1, e.what());
+ } catch (const std::exception& e) {
+ return utils::make_error(1, e.what());
+ }
+}
+
+TableInfo Table::GetTableInfo() const {
+ if (!Available()) {
+ return TableInfo{};
+ }
+ auto ffi_info = table_->get_table_info_from_table();
+ return utils::from_ffi_table_info(ffi_info);
+}
+
+TablePath Table::GetTablePath() const {
+ if (!Available()) {
+ return TablePath{};
+ }
+ auto ffi_path = table_->get_table_path();
+ return TablePath{std::string(ffi_path.database_name),
std::string(ffi_path.table_name)};
+}
+
+bool Table::HasPrimaryKey() const {
+ if (!Available()) {
+ return false;
+ }
+ return table_->has_primary_key();
+}
+
+// AppendWriter implementation
+AppendWriter::AppendWriter() noexcept = default;
+
+AppendWriter::AppendWriter(ffi::AppendWriter* writer) noexcept :
writer_(writer) {}
+
+AppendWriter::~AppendWriter() noexcept { Destroy(); }
+
+void AppendWriter::Destroy() noexcept {
+ if (writer_) {
+ ffi::delete_append_writer(writer_);
+ writer_ = nullptr;
+ }
+}
+
+AppendWriter::AppendWriter(AppendWriter&& other) noexcept :
writer_(other.writer_) {
+ other.writer_ = nullptr;
+}
+
+AppendWriter& AppendWriter::operator=(AppendWriter&& other) noexcept {
+ if (this != &other) {
+ Destroy();
+ writer_ = other.writer_;
+ other.writer_ = nullptr;
+ }
+ return *this;
+}
+
+bool AppendWriter::Available() const { return writer_ != nullptr; }
+
+Result AppendWriter::Append(const GenericRow& row) {
+ if (!Available()) {
+ return utils::make_error(1, "AppendWriter not available");
+ }
+
+ auto ffi_row = utils::to_ffi_generic_row(row);
+ auto ffi_result = writer_->append(ffi_row);
+ return utils::from_ffi_result(ffi_result);
+}
+
+Result AppendWriter::Flush() {
+ if (!Available()) {
+ return utils::make_error(1, "AppendWriter not available");
+ }
+
+ auto ffi_result = writer_->flush();
+ return utils::from_ffi_result(ffi_result);
+}
+
+// LogScanner implementation
+LogScanner::LogScanner() noexcept = default;
+
+LogScanner::LogScanner(ffi::LogScanner* scanner) noexcept : scanner_(scanner)
{}
+
+LogScanner::~LogScanner() noexcept { Destroy(); }
+
+void LogScanner::Destroy() noexcept {
+ if (scanner_) {
+ ffi::delete_log_scanner(scanner_);
+ scanner_ = nullptr;
+ }
+}
+
+LogScanner::LogScanner(LogScanner&& other) noexcept : scanner_(other.scanner_)
{
+ other.scanner_ = nullptr;
+}
+
+LogScanner& LogScanner::operator=(LogScanner&& other) noexcept {
+ if (this != &other) {
+ Destroy();
+ scanner_ = other.scanner_;
+ other.scanner_ = nullptr;
+ }
+ return *this;
+}
+
+bool LogScanner::Available() const { return scanner_ != nullptr; }
+
+Result LogScanner::Subscribe(int32_t bucket_id, int64_t start_offset) {
+ if (!Available()) {
+ return utils::make_error(1, "LogScanner not available");
+ }
+
+ auto ffi_result = scanner_->subscribe(bucket_id, start_offset);
+ return utils::from_ffi_result(ffi_result);
+}
+
+Result LogScanner::Poll(int64_t timeout_ms, ScanRecords& out) {
+ if (!Available()) {
+ return utils::make_error(1, "LogScanner not available");
+ }
+
+ auto ffi_result = scanner_->poll(timeout_ms);
+ auto result = utils::from_ffi_result(ffi_result.result);
+ if (!result.Ok()) {
+ return result;
+ }
+
+ out = utils::from_ffi_scan_records(ffi_result.scan_records);
+ return utils::make_ok();
+}
+
+} // namespace fluss
diff --git a/bindings/cpp/src/types.rs b/bindings/cpp/src/types.rs
new file mode 100644
index 0000000..d3bab38
--- /dev/null
+++ b/bindings/cpp/src/types.rs
@@ -0,0 +1,485 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::ffi;
+use anyhow::{Result, anyhow};
+use arrow::array::{
+ Date32Array, LargeBinaryArray, LargeStringArray, Time32MillisecondArray,
Time32SecondArray,
+ Time64MicrosecondArray, Time64NanosecondArray, TimestampMicrosecondArray,
+ TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray,
+};
+use arrow::datatypes::{DataType as ArrowDataType, TimeUnit};
+use fcore::row::InternalRow;
+use fluss as fcore;
+
+pub const DATA_TYPE_BOOLEAN: i32 = 1;
+pub const DATA_TYPE_TINYINT: i32 = 2;
+pub const DATA_TYPE_SMALLINT: i32 = 3;
+pub const DATA_TYPE_INT: i32 = 4;
+pub const DATA_TYPE_BIGINT: i32 = 5;
+pub const DATA_TYPE_FLOAT: i32 = 6;
+pub const DATA_TYPE_DOUBLE: i32 = 7;
+pub const DATA_TYPE_STRING: i32 = 8;
+pub const DATA_TYPE_BYTES: i32 = 9;
+pub const DATA_TYPE_DATE: i32 = 10;
+pub const DATA_TYPE_TIME: i32 = 11;
+pub const DATA_TYPE_TIMESTAMP: i32 = 12;
+pub const DATA_TYPE_TIMESTAMP_LTZ: i32 = 13;
+
+pub const DATUM_TYPE_NULL: i32 = 0;
+pub const DATUM_TYPE_BOOL: i32 = 1;
+pub const DATUM_TYPE_INT32: i32 = 2;
+pub const DATUM_TYPE_INT64: i32 = 3;
+pub const DATUM_TYPE_FLOAT32: i32 = 4;
+pub const DATUM_TYPE_FLOAT64: i32 = 5;
+pub const DATUM_TYPE_STRING: i32 = 6;
+pub const DATUM_TYPE_BYTES: i32 = 7;
+
+fn ffi_data_type_to_core(dt: i32) -> Result<fcore::metadata::DataType> {
+ match dt {
+ DATA_TYPE_BOOLEAN => Ok(fcore::metadata::DataTypes::boolean()),
+ DATA_TYPE_TINYINT => Ok(fcore::metadata::DataTypes::tinyint()),
+ DATA_TYPE_SMALLINT => Ok(fcore::metadata::DataTypes::smallint()),
+ DATA_TYPE_INT => Ok(fcore::metadata::DataTypes::int()),
+ DATA_TYPE_BIGINT => Ok(fcore::metadata::DataTypes::bigint()),
+ DATA_TYPE_FLOAT => Ok(fcore::metadata::DataTypes::float()),
+ DATA_TYPE_DOUBLE => Ok(fcore::metadata::DataTypes::double()),
+ DATA_TYPE_STRING => Ok(fcore::metadata::DataTypes::string()),
+ DATA_TYPE_BYTES => Ok(fcore::metadata::DataTypes::bytes()),
+ DATA_TYPE_DATE => Ok(fcore::metadata::DataTypes::date()),
+ DATA_TYPE_TIME => Ok(fcore::metadata::DataTypes::time()),
+ DATA_TYPE_TIMESTAMP => Ok(fcore::metadata::DataTypes::timestamp()),
+ DATA_TYPE_TIMESTAMP_LTZ =>
Ok(fcore::metadata::DataTypes::timestamp_ltz()),
+ _ => Err(anyhow!("Unknown data type: {}", dt)),
+ }
+}
+
+fn core_data_type_to_ffi(dt: &fcore::metadata::DataType) -> i32 {
+ match dt {
+ fcore::metadata::DataType::Boolean(_) => DATA_TYPE_BOOLEAN,
+ fcore::metadata::DataType::TinyInt(_) => DATA_TYPE_TINYINT,
+ fcore::metadata::DataType::SmallInt(_) => DATA_TYPE_SMALLINT,
+ fcore::metadata::DataType::Int(_) => DATA_TYPE_INT,
+ fcore::metadata::DataType::BigInt(_) => DATA_TYPE_BIGINT,
+ fcore::metadata::DataType::Float(_) => DATA_TYPE_FLOAT,
+ fcore::metadata::DataType::Double(_) => DATA_TYPE_DOUBLE,
+ fcore::metadata::DataType::String(_) => DATA_TYPE_STRING,
+ fcore::metadata::DataType::Bytes(_) => DATA_TYPE_BYTES,
+ fcore::metadata::DataType::Date(_) => DATA_TYPE_DATE,
+ fcore::metadata::DataType::Time(_) => DATA_TYPE_TIME,
+ fcore::metadata::DataType::Timestamp(_) => DATA_TYPE_TIMESTAMP,
+ fcore::metadata::DataType::TimestampLTz(_) => DATA_TYPE_TIMESTAMP_LTZ,
+ _ => 0,
+ }
+}
+
+pub fn ffi_descriptor_to_core(
+ descriptor: &ffi::FfiTableDescriptor,
+) -> Result<fcore::metadata::TableDescriptor> {
+ let mut schema_builder = fcore::metadata::Schema::builder();
+
+ for col in &descriptor.schema.columns {
+ let dt = ffi_data_type_to_core(col.data_type)?;
+ schema_builder = schema_builder.column(&col.name, dt);
+ if !col.comment.is_empty() {
+ schema_builder = schema_builder.with_comment(&col.comment);
+ }
+ }
+
+ if !descriptor.schema.primary_keys.is_empty() {
+ schema_builder =
schema_builder.primary_key(descriptor.schema.primary_keys.clone());
+ }
+
+ let schema = schema_builder.build()?;
+
+ let mut builder = fcore::metadata::TableDescriptor::builder()
+ .schema(schema)
+ .partitioned_by(descriptor.partition_keys.clone());
+
+ if descriptor.bucket_count > 0 {
+ builder = builder.distributed_by(
+ Some(descriptor.bucket_count),
+ descriptor.bucket_keys.clone(),
+ );
+ } else {
+ builder = builder.distributed_by(None, descriptor.bucket_keys.clone());
+ }
+
+ for prop in &descriptor.properties {
+ builder = builder.property(&prop.key, &prop.value);
+ }
+
+ if !descriptor.comment.is_empty() {
+ builder = builder.comment(&descriptor.comment);
+ }
+
+ Ok(builder.build()?)
+}
+
+pub fn core_table_info_to_ffi(info: &fcore::metadata::TableInfo) ->
ffi::FfiTableInfo {
+ let schema = info.get_schema();
+ let columns: Vec<ffi::FfiColumn> = schema
+ .columns()
+ .iter()
+ .map(|col| ffi::FfiColumn {
+ name: col.name().to_string(),
+ data_type: core_data_type_to_ffi(col.data_type()),
+ comment: col.comment().unwrap_or("").to_string(),
+ })
+ .collect();
+
+ let primary_keys: Vec<String> = schema
+ .primary_key()
+ .map(|pk| pk.column_names().to_vec())
+ .unwrap_or_default();
+
+ let properties: Vec<ffi::HashMapValue> = info
+ .get_properties()
+ .iter()
+ .map(|(k, v)| ffi::HashMapValue {
+ key: k.clone(),
+ value: v.clone(),
+ })
+ .collect();
+
+ ffi::FfiTableInfo {
+ table_id: info.get_table_id(),
+ schema_id: info.get_schema_id(),
+ table_path: ffi::FfiTablePath {
+ database_name: info.get_table_path().database().to_string(),
+ table_name: info.get_table_path().table().to_string(),
+ },
+ created_time: info.get_created_time(),
+ modified_time: info.get_modified_time(),
+ primary_keys: info.get_primary_keys().clone(),
+ bucket_keys: info.get_bucket_keys().to_vec(),
+ partition_keys: info.get_partition_keys().to_vec(),
+ num_buckets: info.get_num_buckets(),
+ has_primary_key: info.has_primary_key(),
+ is_partitioned: info.is_partitioned(),
+ properties,
+ comment: info.get_comment().unwrap_or("").to_string(),
+ schema: ffi::FfiSchema {
+ columns,
+ primary_keys,
+ },
+ }
+}
+
+pub fn empty_table_info() -> ffi::FfiTableInfo {
+ ffi::FfiTableInfo {
+ table_id: 0,
+ schema_id: 0,
+ table_path: ffi::FfiTablePath {
+ database_name: String::new(),
+ table_name: String::new(),
+ },
+ created_time: 0,
+ modified_time: 0,
+ primary_keys: vec![],
+ bucket_keys: vec![],
+ partition_keys: vec![],
+ num_buckets: 0,
+ has_primary_key: false,
+ is_partitioned: false,
+ properties: vec![],
+ comment: String::new(),
+ schema: ffi::FfiSchema {
+ columns: vec![],
+ primary_keys: vec![],
+ },
+ }
+}
+
+pub fn ffi_row_to_core(row: &ffi::FfiGenericRow) -> fcore::row::GenericRow<'_>
{
+ use fcore::row::Datum;
+
+ let mut generic_row = fcore::row::GenericRow::new();
+
+ for (idx, field) in row.fields.iter().enumerate() {
+ let datum = match field.datum_type {
+ DATUM_TYPE_NULL => Datum::Null,
+ DATUM_TYPE_BOOL => Datum::Bool(field.bool_val),
+ DATUM_TYPE_INT32 => Datum::Int32(field.i32_val),
+ DATUM_TYPE_INT64 => Datum::Int64(field.i64_val),
+ DATUM_TYPE_FLOAT32 => Datum::Float32(field.f32_val.into()),
+ DATUM_TYPE_FLOAT64 => Datum::Float64(field.f64_val.into()),
+ DATUM_TYPE_STRING => Datum::String(field.string_val.as_str()),
+ // todo: avoid copy bytes for blob
+ DATUM_TYPE_BYTES => Datum::Blob(field.bytes_val.clone().into()),
+ _ => Datum::Null,
+ };
+ generic_row.set_field(idx, datum);
+ }
+
+ generic_row
+}
+
+pub fn core_scan_records_to_ffi(records: &fcore::record::ScanRecords) ->
ffi::FfiScanRecords {
+ let mut ffi_records = Vec::new();
+
+ // Iterate over all buckets and their records
+ for bucket_records in records.records_by_buckets().values() {
+ for record in bucket_records {
+ let row = record.row();
+ let fields = core_row_to_ffi_fields(row);
+
+ ffi_records.push(ffi::FfiScanRecord {
+ offset: record.offset(),
+ timestamp: record.timestamp(),
+ row: ffi::FfiGenericRow { fields },
+ });
+ }
+ }
+
+ ffi::FfiScanRecords {
+ records: ffi_records,
+ }
+}
+
+fn core_row_to_ffi_fields(row: &fcore::row::ColumnarRow) -> Vec<ffi::FfiDatum>
{
+ fn new_datum(datum_type: i32) -> ffi::FfiDatum {
+ ffi::FfiDatum {
+ datum_type,
+ bool_val: false,
+ i32_val: 0,
+ i64_val: 0,
+ f32_val: 0.0,
+ f64_val: 0.0,
+ string_val: String::new(),
+ bytes_val: vec![],
+ }
+ }
+
+ let record_batch = row.get_record_batch();
+ let schema = record_batch.schema();
+ let row_id = row.get_row_id();
+
+ let mut fields = Vec::with_capacity(schema.fields().len());
+
+ for (i, field) in schema.fields().iter().enumerate() {
+ if row.is_null_at(i) {
+ fields.push(new_datum(DATUM_TYPE_NULL));
+ continue;
+ }
+
+ let datum = match field.data_type() {
+ ArrowDataType::Boolean => {
+ let mut datum = new_datum(DATUM_TYPE_BOOL);
+ datum.bool_val = row.get_boolean(i);
+ datum
+ }
+ ArrowDataType::Int8 => {
+ let mut datum = new_datum(DATUM_TYPE_INT32);
+ datum.i32_val = row.get_byte(i) as i32;
+ datum
+ }
+ ArrowDataType::Int16 => {
+ let mut datum = new_datum(DATUM_TYPE_INT32);
+ datum.i32_val = row.get_short(i) as i32;
+ datum
+ }
+ ArrowDataType::Int32 => {
+ let mut datum = new_datum(DATUM_TYPE_INT32);
+ datum.i32_val = row.get_int(i);
+ datum
+ }
+ ArrowDataType::Int64 => {
+ let mut datum = new_datum(DATUM_TYPE_INT64);
+ datum.i64_val = row.get_long(i);
+ datum
+ }
+ ArrowDataType::Float32 => {
+ let mut datum = new_datum(DATUM_TYPE_FLOAT32);
+ datum.f32_val = row.get_float(i);
+ datum
+ }
+ ArrowDataType::Float64 => {
+ let mut datum = new_datum(DATUM_TYPE_FLOAT64);
+ datum.f64_val = row.get_double(i);
+ datum
+ }
+ ArrowDataType::Utf8 => {
+ let mut datum = new_datum(DATUM_TYPE_STRING);
+ // todo: avoid copy string
+ datum.string_val = row.get_string(i).to_string();
+ datum
+ }
+ ArrowDataType::LargeUtf8 => {
+ let array = record_batch
+ .column(i)
+ .as_any()
+ .downcast_ref::<LargeStringArray>()
+ .expect("LargeUtf8 column expected");
+ let mut datum = new_datum(DATUM_TYPE_STRING);
+ datum.string_val = array.value(row_id).to_string();
+ datum
+ }
+ ArrowDataType::Binary => {
+ let mut datum = new_datum(DATUM_TYPE_BYTES);
+ // todo: avoid copy bytes for blob
+ datum.bytes_val = row.get_bytes(i);
+ datum
+ }
+ ArrowDataType::FixedSizeBinary(len) => {
+ let mut datum = new_datum(DATUM_TYPE_BYTES);
+ datum.bytes_val = row.get_binary(i, *len as usize);
+ datum
+ }
+ ArrowDataType::LargeBinary => {
+ let array = record_batch
+ .column(i)
+ .as_any()
+ .downcast_ref::<LargeBinaryArray>()
+ .expect("LargeBinary column expected");
+ let mut datum = new_datum(DATUM_TYPE_BYTES);
+ datum.bytes_val = array.value(row_id).to_vec();
+ datum
+ }
+ ArrowDataType::Date32 => {
+ let array = record_batch
+ .column(i)
+ .as_any()
+ .downcast_ref::<Date32Array>()
+ .expect("Date32 column expected");
+ let mut datum = new_datum(DATUM_TYPE_INT32);
+ datum.i32_val = array.value(row_id);
+ datum
+ }
+ ArrowDataType::Timestamp(unit, _) => match unit {
+ TimeUnit::Second => {
+ let array = record_batch
+ .column(i)
+ .as_any()
+ .downcast_ref::<TimestampSecondArray>()
+ .expect("Timestamp(second) column expected");
+ let mut datum = new_datum(DATUM_TYPE_INT64);
+ datum.i64_val = array.value(row_id);
+ datum
+ }
+ TimeUnit::Millisecond => {
+ let array = record_batch
+ .column(i)
+ .as_any()
+ .downcast_ref::<TimestampMillisecondArray>()
+ .expect("Timestamp(millisecond) column expected");
+ let mut datum = new_datum(DATUM_TYPE_INT64);
+ datum.i64_val = array.value(row_id);
+ datum
+ }
+ TimeUnit::Microsecond => {
+ let array = record_batch
+ .column(i)
+ .as_any()
+ .downcast_ref::<TimestampMicrosecondArray>()
+ .expect("Timestamp(microsecond) column expected");
+ let mut datum = new_datum(DATUM_TYPE_INT64);
+ datum.i64_val = array.value(row_id);
+ datum
+ }
+ TimeUnit::Nanosecond => {
+ let array = record_batch
+ .column(i)
+ .as_any()
+ .downcast_ref::<TimestampNanosecondArray>()
+ .expect("Timestamp(nanosecond) column expected");
+ let mut datum = new_datum(DATUM_TYPE_INT64);
+ datum.i64_val = array.value(row_id);
+ datum
+ }
+ },
+ ArrowDataType::Time32(unit) => match unit {
+ TimeUnit::Second => {
+ let array = record_batch
+ .column(i)
+ .as_any()
+ .downcast_ref::<Time32SecondArray>()
+ .expect("Time32(second) column expected");
+ let mut datum = new_datum(DATUM_TYPE_INT32);
+ datum.i32_val = array.value(row_id);
+ datum
+ }
+ TimeUnit::Millisecond => {
+ let array = record_batch
+ .column(i)
+ .as_any()
+ .downcast_ref::<Time32MillisecondArray>()
+ .expect("Time32(millisecond) column expected");
+ let mut datum = new_datum(DATUM_TYPE_INT32);
+ datum.i32_val = array.value(row_id);
+ datum
+ }
+ _ => panic!(
+ "Will never come here. Unsupported Time32 unit for column
{}",
+ i
+ ),
+ },
+ ArrowDataType::Time64(unit) => match unit {
+ TimeUnit::Microsecond => {
+ let array = record_batch
+ .column(i)
+ .as_any()
+ .downcast_ref::<Time64MicrosecondArray>()
+ .expect("Time64(microsecond) column expected");
+ let mut datum = new_datum(DATUM_TYPE_INT64);
+ datum.i64_val = array.value(row_id);
+ datum
+ }
+ TimeUnit::Nanosecond => {
+ let array = record_batch
+ .column(i)
+ .as_any()
+ .downcast_ref::<Time64NanosecondArray>()
+ .expect("Time64(nanosecond) column expected");
+ let mut datum = new_datum(DATUM_TYPE_INT64);
+ datum.i64_val = array.value(row_id);
+ datum
+ }
+ _ => panic!(
+ "Will never come here. Unsupported Time64 unit for column
{}",
+ i
+ ),
+ },
+ other => panic!(
+ "Will never come here. Unsupported Arrow data type for column
{}: {:?}",
+ i, other
+ ),
+ };
+
+ fields.push(datum);
+ }
+
+ fields
+}
+
+pub fn core_lake_snapshot_to_ffi(snapshot: &fcore::metadata::LakeSnapshot) ->
ffi::FfiLakeSnapshot {
+ let bucket_offsets: Vec<ffi::FfiBucketOffset> = snapshot
+ .table_buckets_offset
+ .iter()
+ .map(|(bucket, offset)| ffi::FfiBucketOffset {
+ table_id: bucket.table_id(),
+ partition_id: bucket.partition_id().unwrap_or(-1),
+ bucket_id: bucket.bucket_id(),
+ offset: *offset,
+ })
+ .collect();
+
+ ffi::FfiLakeSnapshot {
+ snapshot_id: snapshot.snapshot_id,
+ bucket_offsets,
+ }
+}
diff --git a/crates/fluss/src/config.rs b/crates/fluss/src/config.rs
index 0857496..92f600e 100644
--- a/crates/fluss/src/config.rs
+++ b/crates/fluss/src/config.rs
@@ -18,7 +18,7 @@
use clap::Parser;
use serde::{Deserialize, Serialize};
-#[derive(Parser, Debug, Clone, Deserialize, Serialize, Default)]
+#[derive(Parser, Debug, Clone, Deserialize, Serialize)]
#[command(author, version, about, long_about = None)]
pub struct Config {
#[arg(long)]
@@ -37,3 +37,15 @@ pub struct Config {
#[arg(long, default_value_t = 2 * 1024 * 1024)]
pub writer_batch_size: i32,
}
+
+impl Default for Config {
+ fn default() -> Self {
+ Self {
+ bootstrap_server: None,
+ request_max_size: 10 * 1024 * 1024,
+ writer_acks: String::from("all"),
+ writer_retries: i32::MAX,
+ writer_batch_size: 2 * 1024 * 1024,
+ }
+ }
+}