This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 257eb7d chore: error handling fix CPP, for pointer returning methods
(#409)
257eb7d is described below
commit 257eb7dd3c16d8312f71286f0d420478277384d8
Author: Anton Borisov <[email protected]>
AuthorDate: Tue Mar 3 11:19:58 2026 +0000
chore: error handling fix CPP, for pointer returning methods (#409)
---
bindings/cpp/src/connection.cpp | 40 ++---
bindings/cpp/src/ffi_converter.hpp | 8 +
bindings/cpp/src/lib.rs | 298 ++++++++++++++++++++---------------
bindings/cpp/src/table.cpp | 107 ++++++-------
bindings/cpp/test/test_sasl_auth.cpp | 5 +-
5 files changed, 243 insertions(+), 215 deletions(-)
diff --git a/bindings/cpp/src/connection.cpp b/bindings/cpp/src/connection.cpp
index dcf8578..6cd7301 100644
--- a/bindings/cpp/src/connection.cpp
+++ b/bindings/cpp/src/connection.cpp
@@ -47,15 +47,13 @@ Connection& Connection::operator=(Connection&& other)
noexcept {
}
Result Connection::Create(const Configuration& config, Connection& out) {
- try {
- auto ffi_config = utils::to_ffi_config(config);
- out.conn_ = ffi::new_connection(ffi_config);
- return utils::make_ok();
- } catch (const rust::Error& e) {
- return utils::make_client_error(e.what());
- } catch (const std::exception& e) {
- return utils::make_client_error(e.what());
+ auto ffi_config = utils::to_ffi_config(config);
+ auto ffi_result = ffi::new_connection(ffi_config);
+ auto result = utils::from_ffi_result(ffi_result.result);
+ if (result.Ok()) {
+ out.conn_ = utils::ptr_from_ffi<ffi::Connection>(ffi_result);
}
+ return result;
}
bool Connection::Available() const { return conn_ != nullptr; }
@@ -65,14 +63,12 @@ Result Connection::GetAdmin(Admin& out) {
return utils::make_client_error("Connection not available");
}
- try {
- out.admin_ = conn_->get_admin();
- return utils::make_ok();
- } catch (const rust::Error& e) {
- return utils::make_client_error(e.what());
- } catch (const std::exception& e) {
- return utils::make_client_error(e.what());
+ auto ffi_result = conn_->get_admin();
+ auto result = utils::from_ffi_result(ffi_result.result);
+ if (result.Ok()) {
+ out.admin_ = utils::ptr_from_ffi<ffi::Admin>(ffi_result);
}
+ return result;
}
Result Connection::GetTable(const TablePath& table_path, Table& out) {
@@ -80,15 +76,13 @@ Result Connection::GetTable(const TablePath& table_path,
Table& out) {
return utils::make_client_error("Connection not available");
}
- try {
- auto ffi_path = utils::to_ffi_table_path(table_path);
- out.table_ = conn_->get_table(ffi_path);
- return utils::make_ok();
- } catch (const rust::Error& e) {
- return utils::make_client_error(e.what());
- } catch (const std::exception& e) {
- return utils::make_client_error(e.what());
+ auto ffi_path = utils::to_ffi_table_path(table_path);
+ auto ffi_result = conn_->get_table(ffi_path);
+ auto result = utils::from_ffi_result(ffi_result.result);
+ if (result.Ok()) {
+ out.table_ = utils::ptr_from_ffi<ffi::Table>(ffi_result);
}
+ return result;
}
} // namespace fluss
diff --git a/bindings/cpp/src/ffi_converter.hpp
b/bindings/cpp/src/ffi_converter.hpp
index 3375761..93a60bf 100644
--- a/bindings/cpp/src/ffi_converter.hpp
+++ b/bindings/cpp/src/ffi_converter.hpp
@@ -19,6 +19,8 @@
#pragma once
+#include <cassert>
+
#include "fluss.hpp"
#include "lib.rs.h"
@@ -37,6 +39,12 @@ inline Result from_ffi_result(const ffi::FfiResult&
ffi_result) {
return Result{ffi_result.error_code,
std::string(ffi_result.error_message)};
}
+template <typename T>
+inline T* ptr_from_ffi(const ffi::FfiPtrResult& r) {
+ assert(r.ptr != 0 && "ptr_from_ffi: null pointer in FfiPtrResult");
+ return reinterpret_cast<T*>(r.ptr);
+}
+
inline ffi::FfiTablePath to_ffi_table_path(const TablePath& path) {
ffi::FfiTablePath ffi_path;
ffi_path.database_name = rust::String(path.database_name);
diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs
index c310fc8..366a198 100644
--- a/bindings/cpp/src/lib.rs
+++ b/bindings/cpp/src/lib.rs
@@ -247,6 +247,11 @@ mod ffi {
server_nodes: Vec<FfiServerNode>,
}
+ struct FfiPtrResult {
+ result: FfiResult,
+ ptr: usize,
+ }
+
extern "Rust" {
type Connection;
type Admin;
@@ -263,13 +268,10 @@ mod ffi {
type LookupResultInner;
// Connection
- // TODO: all Result<*mut T> methods lose server error codes (mapped to
CLIENT_ERROR).
- // Fix by introducing some struct like { result: FfiResult, ptr: i64
} to preserve error
- // codes from the server, matching how Rust and Python bindings handle
errors.
- fn new_connection(config: &FfiConfig) -> Result<*mut Connection>;
+ fn new_connection(config: &FfiConfig) -> FfiPtrResult;
unsafe fn delete_connection(conn: *mut Connection);
- fn get_admin(self: &Connection) -> Result<*mut Admin>;
- fn get_table(self: &Connection, table_path: &FfiTablePath) ->
Result<*mut Table>;
+ fn get_admin(self: &Connection) -> FfiPtrResult;
+ fn get_table(self: &Connection, table_path: &FfiTablePath) ->
FfiPtrResult;
// Admin
unsafe fn delete_admin(admin: *mut Admin);
@@ -344,20 +346,13 @@ mod ffi {
// Table
unsafe fn delete_table(table: *mut Table);
- fn new_append_writer(self: &Table) -> Result<*mut AppendWriter>;
- fn create_scanner(
- self: &Table,
- column_indices: Vec<usize>,
- batch: bool,
- ) -> Result<*mut LogScanner>;
+ fn new_append_writer(self: &Table) -> FfiPtrResult;
+ fn create_scanner(self: &Table, column_indices: Vec<usize>, batch:
bool) -> FfiPtrResult;
fn get_table_info_from_table(self: &Table) -> FfiTableInfo;
fn get_table_path(self: &Table) -> FfiTablePath;
fn has_primary_key(self: &Table) -> bool;
- fn create_upsert_writer(
- self: &Table,
- column_indices: Vec<usize>,
- ) -> Result<*mut UpsertWriter>;
- fn new_lookuper(self: &Table) -> Result<*mut Lookuper>;
+ fn create_upsert_writer(self: &Table, column_indices: Vec<usize>) ->
FfiPtrResult;
+ fn new_lookuper(self: &Table) -> FfiPtrResult;
// GenericRowInner — opaque row for writes
fn new_generic_row(field_count: usize) -> Box<GenericRowInner>;
@@ -378,21 +373,22 @@ mod ffi {
// AppendWriter
unsafe fn delete_append_writer(writer: *mut AppendWriter);
- fn append(self: &mut AppendWriter, row: &GenericRowInner) ->
Result<Box<WriteResult>>;
+ fn append(self: &mut AppendWriter, row: &GenericRowInner) ->
FfiPtrResult;
fn append_arrow_batch(
self: &mut AppendWriter,
array_ptr: usize,
schema_ptr: usize,
- ) -> Result<Box<WriteResult>>;
+ ) -> FfiPtrResult;
fn flush(self: &mut AppendWriter) -> FfiResult;
- // WriteResult — dropped automatically via rust::Box, or call wait()
for ack
+ // WriteResult
+ unsafe fn delete_write_result(wr: *mut WriteResult);
fn wait(self: &mut WriteResult) -> FfiResult;
// UpsertWriter
unsafe fn delete_upsert_writer(writer: *mut UpsertWriter);
- fn upsert(self: &mut UpsertWriter, row: &GenericRowInner) ->
Result<Box<WriteResult>>;
- fn delete_row(self: &mut UpsertWriter, row: &GenericRowInner) ->
Result<Box<WriteResult>>;
+ fn upsert(self: &mut UpsertWriter, row: &GenericRowInner) ->
FfiPtrResult;
+ fn delete_row(self: &mut UpsertWriter, row: &GenericRowInner) ->
FfiPtrResult;
fn upsert_flush(self: &mut UpsertWriter) -> FfiResult;
// Lookuper
@@ -630,13 +626,34 @@ fn err_from_core_error(e: &fcore::error::Error) ->
ffi::FfiResult {
}
}
+fn ok_ptr(ptr: usize) -> ffi::FfiPtrResult {
+ ffi::FfiPtrResult {
+ result: ok_result(),
+ ptr,
+ }
+}
+
+fn client_err_ptr(msg: String) -> ffi::FfiPtrResult {
+ ffi::FfiPtrResult {
+ result: client_err(msg),
+ ptr: 0usize,
+ }
+}
+
+fn err_ptr_from_core(e: &fcore::error::Error) -> ffi::FfiPtrResult {
+ ffi::FfiPtrResult {
+ result: err_from_core_error(e),
+ ptr: 0usize,
+ }
+}
+
// Connection implementation
-fn new_connection(config: &ffi::FfiConfig) -> Result<*mut Connection, String> {
+fn new_connection(config: &ffi::FfiConfig) -> ffi::FfiPtrResult {
let assigner_type = match config.writer_bucket_no_key_assigner.as_str() {
"round_robin" => fluss::config::NoKeyAssigner::RoundRobin,
"sticky" => fluss::config::NoKeyAssigner::Sticky,
other => {
- return Err(format!(
+ return client_err_ptr(format!(
"Unknown bucket assigner type: '{other}', expected 'sticky' or
'round_robin'"
));
}
@@ -664,10 +681,10 @@ fn new_connection(config: &ffi::FfiConfig) -> Result<*mut
Connection, String> {
match conn {
Ok(c) => {
- let conn = Box::into_raw(Box::new(Connection { inner: Arc::new(c)
}));
- Ok(conn)
+ let ptr = Box::into_raw(Box::new(Connection { inner: Arc::new(c)
}));
+ ok_ptr(ptr as usize)
}
- Err(e) => Err(format!("Failed to connect: {e}")),
+ Err(e) => err_ptr_from_core(&e),
}
}
@@ -680,19 +697,19 @@ unsafe fn delete_connection(conn: *mut Connection) {
}
impl Connection {
- fn get_admin(&self) -> Result<*mut Admin, String> {
+ fn get_admin(&self) -> ffi::FfiPtrResult {
let admin_result = RUNTIME.block_on(async {
self.inner.get_admin().await });
match admin_result {
Ok(admin) => {
- let admin = Box::into_raw(Box::new(Admin { inner: admin }));
- Ok(admin)
+ let ptr = Box::into_raw(Box::new(Admin { inner: admin }));
+ ok_ptr(ptr as usize)
}
- Err(e) => Err(format!("Failed to get admin: {e}")),
+ Err(e) => err_ptr_from_core(&e),
}
}
- fn get_table(&self, table_path: &ffi::FfiTablePath) -> Result<*mut Table,
String> {
+ fn get_table(&self, table_path: &ffi::FfiTablePath) -> ffi::FfiPtrResult {
let path = fcore::metadata::TablePath::new(
table_path.database_name.clone(),
table_path.table_name.clone(),
@@ -702,16 +719,16 @@ impl Connection {
match table_result {
Ok(t) => {
- let table = Box::into_raw(Box::new(Table {
+ let ptr = Box::into_raw(Box::new(Table {
connection: self.inner.clone(),
metadata: t.metadata().clone(),
table_info: t.get_table_info().clone(),
table_path: t.table_path().clone(),
has_pk: t.has_primary_key(),
}));
- Ok(table)
+ ok_ptr(ptr as usize)
}
- Err(e) => Err(format!("Failed to get table: {e}")),
+ Err(e) => err_ptr_from_core(&e),
}
}
}
@@ -1196,29 +1213,27 @@ impl Table {
.collect()
}
- fn new_append_writer(&self) -> Result<*mut AppendWriter, String> {
+ fn new_append_writer(&self) -> ffi::FfiPtrResult {
let _enter = RUNTIME.enter();
- let table_append = self
- .fluss_table()
- .new_append()
- .map_err(|e| format!("Failed to create append: {e}"))?;
+ let table_append = match self.fluss_table().new_append() {
+ Ok(a) => a,
+ Err(e) => return err_ptr_from_core(&e),
+ };
- let writer = table_append
- .create_writer()
- .map_err(|e| format!("Failed to create writer: {e}"))?;
+ let writer = match table_append.create_writer() {
+ Ok(w) => w,
+ Err(e) => return err_ptr_from_core(&e),
+ };
- Ok(Box::into_raw(Box::new(AppendWriter {
+ let ptr = Box::into_raw(Box::new(AppendWriter {
inner: writer,
table_info: self.table_info.clone(),
- })))
+ }));
+ ok_ptr(ptr as usize)
}
- fn create_scanner(
- &self,
- column_indices: Vec<usize>,
- batch: bool,
- ) -> Result<*mut LogScanner, String> {
+ fn create_scanner(&self, column_indices: Vec<usize>, batch: bool) ->
ffi::FfiPtrResult {
RUNTIME.block_on(async {
let fluss_table = self.fluss_table();
let scan = fluss_table.new_scan();
@@ -1226,29 +1241,34 @@ impl Table {
let (projected_columns, scan) = if column_indices.is_empty() {
(self.table_info.get_schema().columns().to_vec(), scan)
} else {
- let cols = self.resolve_projected_columns(&column_indices)?;
- let scan = scan
- .project(&column_indices)
- .map_err(|e| format!("Failed to project columns: {e}"))?;
+ let cols = match
self.resolve_projected_columns(&column_indices) {
+ Ok(c) => c,
+ Err(e) => return client_err_ptr(e),
+ };
+ let scan = match scan.project(&column_indices) {
+ Ok(s) => s,
+ Err(e) => return err_ptr_from_core(&e),
+ };
(cols, scan)
};
let scanner = if batch {
- let s = scan
- .create_record_batch_log_scanner()
- .map_err(|e| format!("Failed to create record batch log
scanner: {e}"))?;
- ScannerKind::Batch(s)
+ match scan.create_record_batch_log_scanner() {
+ Ok(s) => ScannerKind::Batch(s),
+ Err(e) => return err_ptr_from_core(&e),
+ }
} else {
- let s = scan
- .create_log_scanner()
- .map_err(|e| format!("Failed to create log scanner:
{e}"))?;
- ScannerKind::Record(s)
+ match scan.create_log_scanner() {
+ Ok(s) => ScannerKind::Record(s),
+ Err(e) => return err_ptr_from_core(&e),
+ }
};
- Ok(Box::into_raw(Box::new(LogScanner {
+ let ptr = Box::into_raw(Box::new(LogScanner {
scanner,
projected_columns,
- })))
+ }));
+ ok_ptr(ptr as usize)
})
}
@@ -1267,51 +1287,53 @@ impl Table {
self.has_pk
}
- fn create_upsert_writer(
- &self,
- column_indices: Vec<usize>,
- ) -> Result<*mut UpsertWriter, String> {
+ fn create_upsert_writer(&self, column_indices: Vec<usize>) ->
ffi::FfiPtrResult {
let _enter = RUNTIME.enter();
- let table_upsert = self
- .fluss_table()
- .new_upsert()
- .map_err(|e| format!("Failed to create upsert: {e}"))?;
+ let table_upsert = match self.fluss_table().new_upsert() {
+ Ok(u) => u,
+ Err(e) => return err_ptr_from_core(&e),
+ };
let table_upsert = if column_indices.is_empty() {
table_upsert
} else {
- table_upsert
- .partial_update(Some(column_indices))
- .map_err(|e| format!("Failed to set partial update columns:
{e}"))?
+ match table_upsert.partial_update(Some(column_indices)) {
+ Ok(u) => u,
+ Err(e) => return err_ptr_from_core(&e),
+ }
};
- let writer = table_upsert
- .create_writer()
- .map_err(|e| format!("Failed to create upsert writer: {e}"))?;
+ let writer = match table_upsert.create_writer() {
+ Ok(w) => w,
+ Err(e) => return err_ptr_from_core(&e),
+ };
- Ok(Box::into_raw(Box::new(UpsertWriter {
+ let ptr = Box::into_raw(Box::new(UpsertWriter {
inner: writer,
table_info: self.table_info.clone(),
- })))
+ }));
+ ok_ptr(ptr as usize)
}
- fn new_lookuper(&self) -> Result<*mut Lookuper, String> {
+ fn new_lookuper(&self) -> ffi::FfiPtrResult {
let _enter = RUNTIME.enter();
- let table_lookup = self
- .fluss_table()
- .new_lookup()
- .map_err(|e| format!("Failed to create lookup: {e}"))?;
+ let table_lookup = match self.fluss_table().new_lookup() {
+ Ok(l) => l,
+ Err(e) => return err_ptr_from_core(&e),
+ };
- let lookuper = table_lookup
- .create_lookuper()
- .map_err(|e| format!("Failed to create lookuper: {e}"))?;
+ let lookuper = match table_lookup.create_lookuper() {
+ Ok(l) => l,
+ Err(e) => return err_ptr_from_core(&e),
+ };
- Ok(Box::into_raw(Box::new(Lookuper {
+ let ptr = Box::into_raw(Box::new(Lookuper {
inner: lookuper,
table_info: self.table_info.clone(),
- })))
+ }));
+ ok_ptr(ptr as usize)
}
}
@@ -1325,26 +1347,25 @@ unsafe fn delete_append_writer(writer: *mut
AppendWriter) {
}
impl AppendWriter {
- fn append(&mut self, row: &GenericRowInner) -> Result<Box<WriteResult>,
String> {
+ fn append(&mut self, row: &GenericRowInner) -> ffi::FfiPtrResult {
let schema = self.table_info.get_schema();
- let generic_row =
- types::resolve_row_types(&row.row, Some(schema)).map_err(|e|
e.to_string())?;
+ let generic_row = match types::resolve_row_types(&row.row,
Some(schema)) {
+ Ok(r) => r,
+ Err(e) => return client_err_ptr(e.to_string()),
+ };
- let result_future = self
- .inner
- .append(&generic_row)
- .map_err(|e| format!("Failed to append: {e}"))?;
+ let result_future = match self.inner.append(&generic_row) {
+ Ok(f) => f,
+ Err(e) => return err_ptr_from_core(&e),
+ };
- Ok(Box::new(WriteResult {
+ let ptr = Box::into_raw(Box::new(WriteResult {
inner: Some(result_future),
- }))
+ }));
+ ok_ptr(ptr as usize)
}
- fn append_arrow_batch(
- &mut self,
- array_ptr: usize,
- schema_ptr: usize,
- ) -> Result<Box<WriteResult>, String> {
+ fn append_arrow_batch(&mut self, array_ptr: usize, schema_ptr: usize) ->
ffi::FfiPtrResult {
use arrow::ffi::{FFI_ArrowArray, FFI_ArrowSchema};
// Safety: C++ allocates these via `new ArrowArray/ArrowSchema` after a
@@ -1355,22 +1376,25 @@ impl AppendWriter {
// Safety: `from_ffi` requires that the array and schema conform to the
// Arrow C Data Interface, which is guaranteed by C++'s
ExportRecordBatch.
- let array_data = unsafe { arrow::ffi::from_ffi(ffi_array, &ffi_schema)
}
- .map_err(|e| format!("Failed to import Arrow batch: {e}"))?;
+ let array_data = match unsafe { arrow::ffi::from_ffi(ffi_array,
&ffi_schema) } {
+ Ok(d) => d,
+ Err(e) => return client_err_ptr(format!("Failed to import Arrow
batch: {e}")),
+ };
// ffi_array is consumed by from_ffi; ffi_schema is dropped here (Box
goes out of scope)
// Reconstruct RecordBatch from the imported StructArray data
let struct_array = arrow::array::StructArray::from(array_data);
let batch = arrow::record_batch::RecordBatch::from(struct_array);
- let result_future = self
- .inner
- .append_arrow_batch(batch)
- .map_err(|e| format!("Failed to append Arrow batch: {e}"))?;
+ let result_future = match self.inner.append_arrow_batch(batch) {
+ Ok(f) => f,
+ Err(e) => return err_ptr_from_core(&e),
+ };
- Ok(Box::new(WriteResult {
+ let ptr = Box::into_raw(Box::new(WriteResult {
inner: Some(result_future),
- }))
+ }));
+ ok_ptr(ptr as usize)
}
fn flush(&mut self) -> ffi::FfiResult {
@@ -1383,6 +1407,14 @@ impl AppendWriter {
}
}
+unsafe fn delete_write_result(wr: *mut WriteResult) {
+ if !wr.is_null() {
+ unsafe {
+ drop(Box::from_raw(wr));
+ }
+ }
+}
+
impl WriteResult {
fn wait(&mut self) -> ffi::FfiResult {
if let Some(future) = self.inner.take() {
@@ -1417,36 +1449,42 @@ impl UpsertWriter {
row
}
- fn upsert(&mut self, row: &GenericRowInner) -> Result<Box<WriteResult>,
String> {
+ fn upsert(&mut self, row: &GenericRowInner) -> ffi::FfiPtrResult {
let schema = self.table_info.get_schema();
- let generic_row =
- types::resolve_row_types(&row.row, Some(schema)).map_err(|e|
e.to_string())?;
+ let generic_row = match types::resolve_row_types(&row.row,
Some(schema)) {
+ Ok(r) => r,
+ Err(e) => return client_err_ptr(e.to_string()),
+ };
let generic_row = self.pad_row(generic_row);
- let result_future = self
- .inner
- .upsert(&generic_row)
- .map_err(|e| format!("Failed to upsert: {e}"))?;
+ let result_future = match self.inner.upsert(&generic_row) {
+ Ok(f) => f,
+ Err(e) => return err_ptr_from_core(&e),
+ };
- Ok(Box::new(WriteResult {
+ let ptr = Box::into_raw(Box::new(WriteResult {
inner: Some(result_future),
- }))
+ }));
+ ok_ptr(ptr as usize)
}
- fn delete_row(&mut self, row: &GenericRowInner) ->
Result<Box<WriteResult>, String> {
+ fn delete_row(&mut self, row: &GenericRowInner) -> ffi::FfiPtrResult {
let schema = self.table_info.get_schema();
- let generic_row =
- types::resolve_row_types(&row.row, Some(schema)).map_err(|e|
e.to_string())?;
+ let generic_row = match types::resolve_row_types(&row.row,
Some(schema)) {
+ Ok(r) => r,
+ Err(e) => return client_err_ptr(e.to_string()),
+ };
let generic_row = self.pad_row(generic_row);
- let result_future = self
- .inner
- .delete(&generic_row)
- .map_err(|e| format!("Failed to delete: {e}"))?;
+ let result_future = match self.inner.delete(&generic_row) {
+ Ok(f) => f,
+ Err(e) => return err_ptr_from_core(&e),
+ };
- Ok(Box::new(WriteResult {
+ let ptr = Box::into_raw(Box::new(WriteResult {
inner: Some(result_future),
- }))
+ }));
+ ok_ptr(ptr as usize)
}
fn upsert_flush(&mut self) -> ffi::FfiResult {
diff --git a/bindings/cpp/src/table.cpp b/bindings/cpp/src/table.cpp
index b0b7029..c49a644 100644
--- a/bindings/cpp/src/table.cpp
+++ b/bindings/cpp/src/table.cpp
@@ -578,14 +578,12 @@ Result TableAppend::CreateWriter(AppendWriter& out) {
return utils::make_client_error("Table not available");
}
- try {
- out = AppendWriter(table_->new_append_writer());
- return utils::make_ok();
- } catch (const rust::Error& e) {
- return utils::make_client_error(e.what());
- } catch (const std::exception& e) {
- return utils::make_client_error(e.what());
+ auto ffi_result = table_->new_append_writer();
+ auto result = utils::from_ffi_result(ffi_result.result);
+ if (result.Ok()) {
+ out = AppendWriter(utils::ptr_from_ffi<ffi::AppendWriter>(ffi_result));
}
+ return result;
}
// ============================================================================
@@ -645,11 +643,14 @@ Result TableUpsert::CreateWriter(UpsertWriter& out) {
for (size_t idx : resolved_indices) {
rust_indices.push_back(idx);
}
- out =
UpsertWriter(table_->create_upsert_writer(std::move(rust_indices)));
- return utils::make_ok();
- } catch (const rust::Error& e) {
- return utils::make_client_error(e.what());
+ auto ffi_result =
table_->create_upsert_writer(std::move(rust_indices));
+ auto result = utils::from_ffi_result(ffi_result.result);
+ if (result.Ok()) {
+ out =
UpsertWriter(utils::ptr_from_ffi<ffi::UpsertWriter>(ffi_result));
+ }
+ return result;
} catch (const std::exception& e) {
+ // ResolveNameProjection() may throw
return utils::make_client_error(e.what());
}
}
@@ -665,14 +666,12 @@ Result TableLookup::CreateLookuper(Lookuper& out) {
return utils::make_client_error("Table not available");
}
- try {
- out = Lookuper(table_->new_lookuper());
- return utils::make_ok();
- } catch (const rust::Error& e) {
- return utils::make_client_error(e.what());
- } catch (const std::exception& e) {
- return utils::make_client_error(e.what());
+ auto ffi_result = table_->new_lookuper();
+ auto result = utils::from_ffi_result(ffi_result.result);
+ if (result.Ok()) {
+ out = Lookuper(utils::ptr_from_ffi<ffi::Lookuper>(ffi_result));
}
+ return result;
}
// ============================================================================
@@ -731,11 +730,14 @@ Result TableScan::DoCreateScanner(LogScanner& out, bool
is_record_batch) {
for (size_t idx : resolved_indices) {
rust_indices.push_back(idx);
}
- out.scanner_ = table_->create_scanner(std::move(rust_indices),
is_record_batch);
- return utils::make_ok();
- } catch (const rust::Error& e) {
- return utils::make_client_error(e.what());
+ auto ffi_result = table_->create_scanner(std::move(rust_indices),
is_record_batch);
+ auto result = utils::from_ffi_result(ffi_result.result);
+ if (result.Ok()) {
+ out.scanner_ = utils::ptr_from_ffi<ffi::LogScanner>(ffi_result);
+ }
+ return result;
} catch (const std::exception& e) {
+ // ResolveNameProjection() may throw
return utils::make_client_error(e.what());
}
}
@@ -752,8 +754,7 @@ WriteResult::~WriteResult() noexcept { Destroy(); }
void WriteResult::Destroy() noexcept {
if (inner_) {
- // Reconstruct the rust::Box to let Rust drop the value
- rust::Box<ffi::WriteResult>::from_raw(inner_);
+ ffi::delete_write_result(inner_);
inner_ = nullptr;
}
}
@@ -827,15 +828,12 @@ Result AppendWriter::Append(const GenericRow& row,
WriteResult& out) {
return utils::make_client_error("GenericRow not available");
}
- try {
- auto rust_box = writer_->append(*row.inner_);
- out = WriteResult(rust_box.into_raw());
- return utils::make_ok();
- } catch (const rust::Error& e) {
- return utils::make_client_error(e.what());
- } catch (const std::exception& e) {
- return utils::make_client_error(e.what());
+ auto ffi_result = writer_->append(*row.inner_);
+ auto result = utils::from_ffi_result(ffi_result.result);
+ if (result.Ok()) {
+ out = WriteResult(utils::ptr_from_ffi<ffi::WriteResult>(ffi_result));
}
+ return result;
}
Result AppendWriter::AppendArrowBatch(const
std::shared_ptr<arrow::RecordBatch>& batch) {
@@ -864,19 +862,16 @@ Result AppendWriter::AppendArrowBatch(const
std::shared_ptr<arrow::RecordBatch>&
auto* array_heap = new ArrowArray(std::move(c_array));
auto* schema_heap = new ArrowSchema(std::move(c_schema));
- try {
- // Rust takes ownership of both pointers immediately via
Box::from_raw(),
- // so after this call (success or exception) C++ must NOT free them.
- auto result_box =
writer_->append_arrow_batch(reinterpret_cast<size_t>(array_heap),
-
reinterpret_cast<size_t>(schema_heap));
+ // Rust takes ownership of both pointers immediately via Box::from_raw(),
+ // so after this call C++ must NOT free them.
+ auto ffi_result =
writer_->append_arrow_batch(reinterpret_cast<size_t>(array_heap),
+
reinterpret_cast<size_t>(schema_heap));
+ auto result = utils::from_ffi_result(ffi_result.result);
+ if (result.Ok()) {
out.Destroy();
- out.inner_ = result_box.into_raw();
- return utils::make_ok();
- } catch (const rust::Error& e) {
- return utils::make_client_error(std::string(e.what()));
- } catch (const std::exception& e) {
- return utils::make_client_error(std::string(e.what()));
+ out.inner_ = utils::ptr_from_ffi<ffi::WriteResult>(ffi_result);
}
+ return result;
}
Result AppendWriter::Flush() {
@@ -933,15 +928,12 @@ Result UpsertWriter::Upsert(const GenericRow& row,
WriteResult& out) {
return utils::make_client_error("GenericRow not available");
}
- try {
- auto rust_box = writer_->upsert(*row.inner_);
- out = WriteResult(rust_box.into_raw());
- return utils::make_ok();
- } catch (const rust::Error& e) {
- return utils::make_client_error(e.what());
- } catch (const std::exception& e) {
- return utils::make_client_error(e.what());
+ auto ffi_result = writer_->upsert(*row.inner_);
+ auto result = utils::from_ffi_result(ffi_result.result);
+ if (result.Ok()) {
+ out = WriteResult(utils::ptr_from_ffi<ffi::WriteResult>(ffi_result));
}
+ return result;
}
Result UpsertWriter::Delete(const GenericRow& row) {
@@ -957,15 +949,12 @@ Result UpsertWriter::Delete(const GenericRow& row,
WriteResult& out) {
return utils::make_client_error("GenericRow not available");
}
- try {
- auto rust_box = writer_->delete_row(*row.inner_);
- out = WriteResult(rust_box.into_raw());
- return utils::make_ok();
- } catch (const rust::Error& e) {
- return utils::make_client_error(e.what());
- } catch (const std::exception& e) {
- return utils::make_client_error(e.what());
+ auto ffi_result = writer_->delete_row(*row.inner_);
+ auto result = utils::from_ffi_result(ffi_result.result);
+ if (result.Ok()) {
+ out = WriteResult(utils::ptr_from_ffi<ffi::WriteResult>(ffi_result));
}
+ return result;
}
Result UpsertWriter::Flush() {
diff --git a/bindings/cpp/test/test_sasl_auth.cpp
b/bindings/cpp/test/test_sasl_auth.cpp
index 2208db3..5a52a1a 100644
--- a/bindings/cpp/test/test_sasl_auth.cpp
+++ b/bindings/cpp/test/test_sasl_auth.cpp
@@ -89,8 +89,7 @@ TEST_F(SaslAuthTest, SaslConnectWithWrongPassword) {
fluss::Connection conn;
auto result = fluss::Connection::Create(config, conn);
ASSERT_FALSE(result.Ok());
- // TODO: error_code is CLIENT_ERROR (-2) because CXX Result<*mut T> loses
the server
- // error code. Should be AUTHENTICATE_EXCEPTION (46) once fixed
+ EXPECT_EQ(result.error_code, fluss::ErrorCode::AUTHENTICATE_EXCEPTION);
EXPECT_NE(result.error_message.find("Authentication failed"),
std::string::npos)
<< "Expected 'Authentication failed' in: " << result.error_message;
}
@@ -106,7 +105,7 @@ TEST_F(SaslAuthTest, SaslConnectWithUnknownUser) {
fluss::Connection conn;
auto result = fluss::Connection::Create(config, conn);
ASSERT_FALSE(result.Ok());
- // TODO: same as above — should check error_code == AUTHENTICATE_EXCEPTION
once fixed.
+ EXPECT_EQ(result.error_code, fluss::ErrorCode::AUTHENTICATE_EXCEPTION);
EXPECT_NE(result.error_message.find("Authentication failed"),
std::string::npos)
<< "Expected 'Authentication failed' in: " << result.error_message;
}