This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 257eb7d  chore: error handling fix CPP, for pointer returning methods 
(#409)
257eb7d is described below

commit 257eb7dd3c16d8312f71286f0d420478277384d8
Author: Anton Borisov <[email protected]>
AuthorDate: Tue Mar 3 11:19:58 2026 +0000

    chore: error handling fix CPP, for pointer returning methods (#409)
---
 bindings/cpp/src/connection.cpp      |  40 ++---
 bindings/cpp/src/ffi_converter.hpp   |   8 +
 bindings/cpp/src/lib.rs              | 298 ++++++++++++++++++++---------------
 bindings/cpp/src/table.cpp           | 107 ++++++-------
 bindings/cpp/test/test_sasl_auth.cpp |   5 +-
 5 files changed, 243 insertions(+), 215 deletions(-)

diff --git a/bindings/cpp/src/connection.cpp b/bindings/cpp/src/connection.cpp
index dcf8578..6cd7301 100644
--- a/bindings/cpp/src/connection.cpp
+++ b/bindings/cpp/src/connection.cpp
@@ -47,15 +47,13 @@ Connection& Connection::operator=(Connection&& other) 
noexcept {
 }
 
 Result Connection::Create(const Configuration& config, Connection& out) {
-    try {
-        auto ffi_config = utils::to_ffi_config(config);
-        out.conn_ = ffi::new_connection(ffi_config);
-        return utils::make_ok();
-    } catch (const rust::Error& e) {
-        return utils::make_client_error(e.what());
-    } catch (const std::exception& e) {
-        return utils::make_client_error(e.what());
+    auto ffi_config = utils::to_ffi_config(config);
+    auto ffi_result = ffi::new_connection(ffi_config);
+    auto result = utils::from_ffi_result(ffi_result.result);
+    if (result.Ok()) {
+        out.conn_ = utils::ptr_from_ffi<ffi::Connection>(ffi_result);
     }
+    return result;
 }
 
 bool Connection::Available() const { return conn_ != nullptr; }
@@ -65,14 +63,12 @@ Result Connection::GetAdmin(Admin& out) {
         return utils::make_client_error("Connection not available");
     }
 
-    try {
-        out.admin_ = conn_->get_admin();
-        return utils::make_ok();
-    } catch (const rust::Error& e) {
-        return utils::make_client_error(e.what());
-    } catch (const std::exception& e) {
-        return utils::make_client_error(e.what());
+    auto ffi_result = conn_->get_admin();
+    auto result = utils::from_ffi_result(ffi_result.result);
+    if (result.Ok()) {
+        out.admin_ = utils::ptr_from_ffi<ffi::Admin>(ffi_result);
     }
+    return result;
 }
 
 Result Connection::GetTable(const TablePath& table_path, Table& out) {
@@ -80,15 +76,13 @@ Result Connection::GetTable(const TablePath& table_path, 
Table& out) {
         return utils::make_client_error("Connection not available");
     }
 
-    try {
-        auto ffi_path = utils::to_ffi_table_path(table_path);
-        out.table_ = conn_->get_table(ffi_path);
-        return utils::make_ok();
-    } catch (const rust::Error& e) {
-        return utils::make_client_error(e.what());
-    } catch (const std::exception& e) {
-        return utils::make_client_error(e.what());
+    auto ffi_path = utils::to_ffi_table_path(table_path);
+    auto ffi_result = conn_->get_table(ffi_path);
+    auto result = utils::from_ffi_result(ffi_result.result);
+    if (result.Ok()) {
+        out.table_ = utils::ptr_from_ffi<ffi::Table>(ffi_result);
     }
+    return result;
 }
 
 }  // namespace fluss
diff --git a/bindings/cpp/src/ffi_converter.hpp 
b/bindings/cpp/src/ffi_converter.hpp
index 3375761..93a60bf 100644
--- a/bindings/cpp/src/ffi_converter.hpp
+++ b/bindings/cpp/src/ffi_converter.hpp
@@ -19,6 +19,8 @@
 
 #pragma once
 
+#include <cassert>
+
 #include "fluss.hpp"
 #include "lib.rs.h"
 
@@ -37,6 +39,12 @@ inline Result from_ffi_result(const ffi::FfiResult& 
ffi_result) {
     return Result{ffi_result.error_code, 
std::string(ffi_result.error_message)};
 }
 
+template <typename T>
+inline T* ptr_from_ffi(const ffi::FfiPtrResult& r) {
+    assert(r.ptr != 0 && "ptr_from_ffi: null pointer in FfiPtrResult");
+    return reinterpret_cast<T*>(r.ptr);
+}
+
 inline ffi::FfiTablePath to_ffi_table_path(const TablePath& path) {
     ffi::FfiTablePath ffi_path;
     ffi_path.database_name = rust::String(path.database_name);
diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs
index c310fc8..366a198 100644
--- a/bindings/cpp/src/lib.rs
+++ b/bindings/cpp/src/lib.rs
@@ -247,6 +247,11 @@ mod ffi {
         server_nodes: Vec<FfiServerNode>,
     }
 
+    struct FfiPtrResult {
+        result: FfiResult,
+        ptr: usize,
+    }
+
     extern "Rust" {
         type Connection;
         type Admin;
@@ -263,13 +268,10 @@ mod ffi {
         type LookupResultInner;
 
         // Connection
-        // TODO: all Result<*mut T> methods lose server error codes (mapped to 
CLIENT_ERROR).
-        // Fix by introducing  some struct like { result: FfiResult, ptr: i64 
} to preserve error
-        // codes from the server, matching how Rust and Python bindings handle 
errors.
-        fn new_connection(config: &FfiConfig) -> Result<*mut Connection>;
+        fn new_connection(config: &FfiConfig) -> FfiPtrResult;
         unsafe fn delete_connection(conn: *mut Connection);
-        fn get_admin(self: &Connection) -> Result<*mut Admin>;
-        fn get_table(self: &Connection, table_path: &FfiTablePath) -> 
Result<*mut Table>;
+        fn get_admin(self: &Connection) -> FfiPtrResult;
+        fn get_table(self: &Connection, table_path: &FfiTablePath) -> 
FfiPtrResult;
 
         // Admin
         unsafe fn delete_admin(admin: *mut Admin);
@@ -344,20 +346,13 @@ mod ffi {
 
         // Table
         unsafe fn delete_table(table: *mut Table);
-        fn new_append_writer(self: &Table) -> Result<*mut AppendWriter>;
-        fn create_scanner(
-            self: &Table,
-            column_indices: Vec<usize>,
-            batch: bool,
-        ) -> Result<*mut LogScanner>;
+        fn new_append_writer(self: &Table) -> FfiPtrResult;
+        fn create_scanner(self: &Table, column_indices: Vec<usize>, batch: 
bool) -> FfiPtrResult;
         fn get_table_info_from_table(self: &Table) -> FfiTableInfo;
         fn get_table_path(self: &Table) -> FfiTablePath;
         fn has_primary_key(self: &Table) -> bool;
-        fn create_upsert_writer(
-            self: &Table,
-            column_indices: Vec<usize>,
-        ) -> Result<*mut UpsertWriter>;
-        fn new_lookuper(self: &Table) -> Result<*mut Lookuper>;
+        fn create_upsert_writer(self: &Table, column_indices: Vec<usize>) -> 
FfiPtrResult;
+        fn new_lookuper(self: &Table) -> FfiPtrResult;
 
         // GenericRowInner — opaque row for writes
         fn new_generic_row(field_count: usize) -> Box<GenericRowInner>;
@@ -378,21 +373,22 @@ mod ffi {
 
         // AppendWriter
         unsafe fn delete_append_writer(writer: *mut AppendWriter);
-        fn append(self: &mut AppendWriter, row: &GenericRowInner) -> 
Result<Box<WriteResult>>;
+        fn append(self: &mut AppendWriter, row: &GenericRowInner) -> 
FfiPtrResult;
         fn append_arrow_batch(
             self: &mut AppendWriter,
             array_ptr: usize,
             schema_ptr: usize,
-        ) -> Result<Box<WriteResult>>;
+        ) -> FfiPtrResult;
         fn flush(self: &mut AppendWriter) -> FfiResult;
 
-        // WriteResult — dropped automatically via rust::Box, or call wait() 
for ack
+        // WriteResult
+        unsafe fn delete_write_result(wr: *mut WriteResult);
         fn wait(self: &mut WriteResult) -> FfiResult;
 
         // UpsertWriter
         unsafe fn delete_upsert_writer(writer: *mut UpsertWriter);
-        fn upsert(self: &mut UpsertWriter, row: &GenericRowInner) -> 
Result<Box<WriteResult>>;
-        fn delete_row(self: &mut UpsertWriter, row: &GenericRowInner) -> 
Result<Box<WriteResult>>;
+        fn upsert(self: &mut UpsertWriter, row: &GenericRowInner) -> 
FfiPtrResult;
+        fn delete_row(self: &mut UpsertWriter, row: &GenericRowInner) -> 
FfiPtrResult;
         fn upsert_flush(self: &mut UpsertWriter) -> FfiResult;
 
         // Lookuper
@@ -630,13 +626,34 @@ fn err_from_core_error(e: &fcore::error::Error) -> 
ffi::FfiResult {
     }
 }
 
+fn ok_ptr(ptr: usize) -> ffi::FfiPtrResult {
+    ffi::FfiPtrResult {
+        result: ok_result(),
+        ptr,
+    }
+}
+
+fn client_err_ptr(msg: String) -> ffi::FfiPtrResult {
+    ffi::FfiPtrResult {
+        result: client_err(msg),
+        ptr: 0usize,
+    }
+}
+
+fn err_ptr_from_core(e: &fcore::error::Error) -> ffi::FfiPtrResult {
+    ffi::FfiPtrResult {
+        result: err_from_core_error(e),
+        ptr: 0usize,
+    }
+}
+
 // Connection implementation
-fn new_connection(config: &ffi::FfiConfig) -> Result<*mut Connection, String> {
+fn new_connection(config: &ffi::FfiConfig) -> ffi::FfiPtrResult {
     let assigner_type = match config.writer_bucket_no_key_assigner.as_str() {
         "round_robin" => fluss::config::NoKeyAssigner::RoundRobin,
         "sticky" => fluss::config::NoKeyAssigner::Sticky,
         other => {
-            return Err(format!(
+            return client_err_ptr(format!(
                 "Unknown bucket assigner type: '{other}', expected 'sticky' or 
'round_robin'"
             ));
         }
@@ -664,10 +681,10 @@ fn new_connection(config: &ffi::FfiConfig) -> Result<*mut 
Connection, String> {
 
     match conn {
         Ok(c) => {
-            let conn = Box::into_raw(Box::new(Connection { inner: Arc::new(c) 
}));
-            Ok(conn)
+            let ptr = Box::into_raw(Box::new(Connection { inner: Arc::new(c) 
}));
+            ok_ptr(ptr as usize)
         }
-        Err(e) => Err(format!("Failed to connect: {e}")),
+        Err(e) => err_ptr_from_core(&e),
     }
 }
 
@@ -680,19 +697,19 @@ unsafe fn delete_connection(conn: *mut Connection) {
 }
 
 impl Connection {
-    fn get_admin(&self) -> Result<*mut Admin, String> {
+    fn get_admin(&self) -> ffi::FfiPtrResult {
         let admin_result = RUNTIME.block_on(async { 
self.inner.get_admin().await });
 
         match admin_result {
             Ok(admin) => {
-                let admin = Box::into_raw(Box::new(Admin { inner: admin }));
-                Ok(admin)
+                let ptr = Box::into_raw(Box::new(Admin { inner: admin }));
+                ok_ptr(ptr as usize)
             }
-            Err(e) => Err(format!("Failed to get admin: {e}")),
+            Err(e) => err_ptr_from_core(&e),
         }
     }
 
-    fn get_table(&self, table_path: &ffi::FfiTablePath) -> Result<*mut Table, 
String> {
+    fn get_table(&self, table_path: &ffi::FfiTablePath) -> ffi::FfiPtrResult {
         let path = fcore::metadata::TablePath::new(
             table_path.database_name.clone(),
             table_path.table_name.clone(),
@@ -702,16 +719,16 @@ impl Connection {
 
         match table_result {
             Ok(t) => {
-                let table = Box::into_raw(Box::new(Table {
+                let ptr = Box::into_raw(Box::new(Table {
                     connection: self.inner.clone(),
                     metadata: t.metadata().clone(),
                     table_info: t.get_table_info().clone(),
                     table_path: t.table_path().clone(),
                     has_pk: t.has_primary_key(),
                 }));
-                Ok(table)
+                ok_ptr(ptr as usize)
             }
-            Err(e) => Err(format!("Failed to get table: {e}")),
+            Err(e) => err_ptr_from_core(&e),
         }
     }
 }
@@ -1196,29 +1213,27 @@ impl Table {
             .collect()
     }
 
-    fn new_append_writer(&self) -> Result<*mut AppendWriter, String> {
+    fn new_append_writer(&self) -> ffi::FfiPtrResult {
         let _enter = RUNTIME.enter();
 
-        let table_append = self
-            .fluss_table()
-            .new_append()
-            .map_err(|e| format!("Failed to create append: {e}"))?;
+        let table_append = match self.fluss_table().new_append() {
+            Ok(a) => a,
+            Err(e) => return err_ptr_from_core(&e),
+        };
 
-        let writer = table_append
-            .create_writer()
-            .map_err(|e| format!("Failed to create writer: {e}"))?;
+        let writer = match table_append.create_writer() {
+            Ok(w) => w,
+            Err(e) => return err_ptr_from_core(&e),
+        };
 
-        Ok(Box::into_raw(Box::new(AppendWriter {
+        let ptr = Box::into_raw(Box::new(AppendWriter {
             inner: writer,
             table_info: self.table_info.clone(),
-        })))
+        }));
+        ok_ptr(ptr as usize)
     }
 
-    fn create_scanner(
-        &self,
-        column_indices: Vec<usize>,
-        batch: bool,
-    ) -> Result<*mut LogScanner, String> {
+    fn create_scanner(&self, column_indices: Vec<usize>, batch: bool) -> 
ffi::FfiPtrResult {
         RUNTIME.block_on(async {
             let fluss_table = self.fluss_table();
             let scan = fluss_table.new_scan();
@@ -1226,29 +1241,34 @@ impl Table {
             let (projected_columns, scan) = if column_indices.is_empty() {
                 (self.table_info.get_schema().columns().to_vec(), scan)
             } else {
-                let cols = self.resolve_projected_columns(&column_indices)?;
-                let scan = scan
-                    .project(&column_indices)
-                    .map_err(|e| format!("Failed to project columns: {e}"))?;
+                let cols = match 
self.resolve_projected_columns(&column_indices) {
+                    Ok(c) => c,
+                    Err(e) => return client_err_ptr(e),
+                };
+                let scan = match scan.project(&column_indices) {
+                    Ok(s) => s,
+                    Err(e) => return err_ptr_from_core(&e),
+                };
                 (cols, scan)
             };
 
             let scanner = if batch {
-                let s = scan
-                    .create_record_batch_log_scanner()
-                    .map_err(|e| format!("Failed to create record batch log 
scanner: {e}"))?;
-                ScannerKind::Batch(s)
+                match scan.create_record_batch_log_scanner() {
+                    Ok(s) => ScannerKind::Batch(s),
+                    Err(e) => return err_ptr_from_core(&e),
+                }
             } else {
-                let s = scan
-                    .create_log_scanner()
-                    .map_err(|e| format!("Failed to create log scanner: 
{e}"))?;
-                ScannerKind::Record(s)
+                match scan.create_log_scanner() {
+                    Ok(s) => ScannerKind::Record(s),
+                    Err(e) => return err_ptr_from_core(&e),
+                }
             };
 
-            Ok(Box::into_raw(Box::new(LogScanner {
+            let ptr = Box::into_raw(Box::new(LogScanner {
                 scanner,
                 projected_columns,
-            })))
+            }));
+            ok_ptr(ptr as usize)
         })
     }
 
@@ -1267,51 +1287,53 @@ impl Table {
         self.has_pk
     }
 
-    fn create_upsert_writer(
-        &self,
-        column_indices: Vec<usize>,
-    ) -> Result<*mut UpsertWriter, String> {
+    fn create_upsert_writer(&self, column_indices: Vec<usize>) -> 
ffi::FfiPtrResult {
         let _enter = RUNTIME.enter();
 
-        let table_upsert = self
-            .fluss_table()
-            .new_upsert()
-            .map_err(|e| format!("Failed to create upsert: {e}"))?;
+        let table_upsert = match self.fluss_table().new_upsert() {
+            Ok(u) => u,
+            Err(e) => return err_ptr_from_core(&e),
+        };
 
         let table_upsert = if column_indices.is_empty() {
             table_upsert
         } else {
-            table_upsert
-                .partial_update(Some(column_indices))
-                .map_err(|e| format!("Failed to set partial update columns: 
{e}"))?
+            match table_upsert.partial_update(Some(column_indices)) {
+                Ok(u) => u,
+                Err(e) => return err_ptr_from_core(&e),
+            }
         };
 
-        let writer = table_upsert
-            .create_writer()
-            .map_err(|e| format!("Failed to create upsert writer: {e}"))?;
+        let writer = match table_upsert.create_writer() {
+            Ok(w) => w,
+            Err(e) => return err_ptr_from_core(&e),
+        };
 
-        Ok(Box::into_raw(Box::new(UpsertWriter {
+        let ptr = Box::into_raw(Box::new(UpsertWriter {
             inner: writer,
             table_info: self.table_info.clone(),
-        })))
+        }));
+        ok_ptr(ptr as usize)
     }
 
-    fn new_lookuper(&self) -> Result<*mut Lookuper, String> {
+    fn new_lookuper(&self) -> ffi::FfiPtrResult {
         let _enter = RUNTIME.enter();
 
-        let table_lookup = self
-            .fluss_table()
-            .new_lookup()
-            .map_err(|e| format!("Failed to create lookup: {e}"))?;
+        let table_lookup = match self.fluss_table().new_lookup() {
+            Ok(l) => l,
+            Err(e) => return err_ptr_from_core(&e),
+        };
 
-        let lookuper = table_lookup
-            .create_lookuper()
-            .map_err(|e| format!("Failed to create lookuper: {e}"))?;
+        let lookuper = match table_lookup.create_lookuper() {
+            Ok(l) => l,
+            Err(e) => return err_ptr_from_core(&e),
+        };
 
-        Ok(Box::into_raw(Box::new(Lookuper {
+        let ptr = Box::into_raw(Box::new(Lookuper {
             inner: lookuper,
             table_info: self.table_info.clone(),
-        })))
+        }));
+        ok_ptr(ptr as usize)
     }
 }
 
@@ -1325,26 +1347,25 @@ unsafe fn delete_append_writer(writer: *mut 
AppendWriter) {
 }
 
 impl AppendWriter {
-    fn append(&mut self, row: &GenericRowInner) -> Result<Box<WriteResult>, 
String> {
+    fn append(&mut self, row: &GenericRowInner) -> ffi::FfiPtrResult {
         let schema = self.table_info.get_schema();
-        let generic_row =
-            types::resolve_row_types(&row.row, Some(schema)).map_err(|e| 
e.to_string())?;
+        let generic_row = match types::resolve_row_types(&row.row, 
Some(schema)) {
+            Ok(r) => r,
+            Err(e) => return client_err_ptr(e.to_string()),
+        };
 
-        let result_future = self
-            .inner
-            .append(&generic_row)
-            .map_err(|e| format!("Failed to append: {e}"))?;
+        let result_future = match self.inner.append(&generic_row) {
+            Ok(f) => f,
+            Err(e) => return err_ptr_from_core(&e),
+        };
 
-        Ok(Box::new(WriteResult {
+        let ptr = Box::into_raw(Box::new(WriteResult {
             inner: Some(result_future),
-        }))
+        }));
+        ok_ptr(ptr as usize)
     }
 
-    fn append_arrow_batch(
-        &mut self,
-        array_ptr: usize,
-        schema_ptr: usize,
-    ) -> Result<Box<WriteResult>, String> {
+    fn append_arrow_batch(&mut self, array_ptr: usize, schema_ptr: usize) -> 
ffi::FfiPtrResult {
         use arrow::ffi::{FFI_ArrowArray, FFI_ArrowSchema};
 
         // Safety: C++ allocates these via `new ArrowArray/ArrowSchema` after a
@@ -1355,22 +1376,25 @@ impl AppendWriter {
 
         // Safety: `from_ffi` requires that the array and schema conform to the
         // Arrow C Data Interface, which is guaranteed by C++'s 
ExportRecordBatch.
-        let array_data = unsafe { arrow::ffi::from_ffi(ffi_array, &ffi_schema) 
}
-            .map_err(|e| format!("Failed to import Arrow batch: {e}"))?;
+        let array_data = match unsafe { arrow::ffi::from_ffi(ffi_array, 
&ffi_schema) } {
+            Ok(d) => d,
+            Err(e) => return client_err_ptr(format!("Failed to import Arrow 
batch: {e}")),
+        };
         // ffi_array is consumed by from_ffi; ffi_schema is dropped here (Box 
goes out of scope)
 
         // Reconstruct RecordBatch from the imported StructArray data
         let struct_array = arrow::array::StructArray::from(array_data);
         let batch = arrow::record_batch::RecordBatch::from(struct_array);
 
-        let result_future = self
-            .inner
-            .append_arrow_batch(batch)
-            .map_err(|e| format!("Failed to append Arrow batch: {e}"))?;
+        let result_future = match self.inner.append_arrow_batch(batch) {
+            Ok(f) => f,
+            Err(e) => return err_ptr_from_core(&e),
+        };
 
-        Ok(Box::new(WriteResult {
+        let ptr = Box::into_raw(Box::new(WriteResult {
             inner: Some(result_future),
-        }))
+        }));
+        ok_ptr(ptr as usize)
     }
 
     fn flush(&mut self) -> ffi::FfiResult {
@@ -1383,6 +1407,14 @@ impl AppendWriter {
     }
 }
 
+unsafe fn delete_write_result(wr: *mut WriteResult) {
+    if !wr.is_null() {
+        unsafe {
+            drop(Box::from_raw(wr));
+        }
+    }
+}
+
 impl WriteResult {
     fn wait(&mut self) -> ffi::FfiResult {
         if let Some(future) = self.inner.take() {
@@ -1417,36 +1449,42 @@ impl UpsertWriter {
         row
     }
 
-    fn upsert(&mut self, row: &GenericRowInner) -> Result<Box<WriteResult>, 
String> {
+    fn upsert(&mut self, row: &GenericRowInner) -> ffi::FfiPtrResult {
         let schema = self.table_info.get_schema();
-        let generic_row =
-            types::resolve_row_types(&row.row, Some(schema)).map_err(|e| 
e.to_string())?;
+        let generic_row = match types::resolve_row_types(&row.row, 
Some(schema)) {
+            Ok(r) => r,
+            Err(e) => return client_err_ptr(e.to_string()),
+        };
         let generic_row = self.pad_row(generic_row);
 
-        let result_future = self
-            .inner
-            .upsert(&generic_row)
-            .map_err(|e| format!("Failed to upsert: {e}"))?;
+        let result_future = match self.inner.upsert(&generic_row) {
+            Ok(f) => f,
+            Err(e) => return err_ptr_from_core(&e),
+        };
 
-        Ok(Box::new(WriteResult {
+        let ptr = Box::into_raw(Box::new(WriteResult {
             inner: Some(result_future),
-        }))
+        }));
+        ok_ptr(ptr as usize)
     }
 
-    fn delete_row(&mut self, row: &GenericRowInner) -> 
Result<Box<WriteResult>, String> {
+    fn delete_row(&mut self, row: &GenericRowInner) -> ffi::FfiPtrResult {
         let schema = self.table_info.get_schema();
-        let generic_row =
-            types::resolve_row_types(&row.row, Some(schema)).map_err(|e| 
e.to_string())?;
+        let generic_row = match types::resolve_row_types(&row.row, 
Some(schema)) {
+            Ok(r) => r,
+            Err(e) => return client_err_ptr(e.to_string()),
+        };
         let generic_row = self.pad_row(generic_row);
 
-        let result_future = self
-            .inner
-            .delete(&generic_row)
-            .map_err(|e| format!("Failed to delete: {e}"))?;
+        let result_future = match self.inner.delete(&generic_row) {
+            Ok(f) => f,
+            Err(e) => return err_ptr_from_core(&e),
+        };
 
-        Ok(Box::new(WriteResult {
+        let ptr = Box::into_raw(Box::new(WriteResult {
             inner: Some(result_future),
-        }))
+        }));
+        ok_ptr(ptr as usize)
     }
 
     fn upsert_flush(&mut self) -> ffi::FfiResult {
diff --git a/bindings/cpp/src/table.cpp b/bindings/cpp/src/table.cpp
index b0b7029..c49a644 100644
--- a/bindings/cpp/src/table.cpp
+++ b/bindings/cpp/src/table.cpp
@@ -578,14 +578,12 @@ Result TableAppend::CreateWriter(AppendWriter& out) {
         return utils::make_client_error("Table not available");
     }
 
-    try {
-        out = AppendWriter(table_->new_append_writer());
-        return utils::make_ok();
-    } catch (const rust::Error& e) {
-        return utils::make_client_error(e.what());
-    } catch (const std::exception& e) {
-        return utils::make_client_error(e.what());
+    auto ffi_result = table_->new_append_writer();
+    auto result = utils::from_ffi_result(ffi_result.result);
+    if (result.Ok()) {
+        out = AppendWriter(utils::ptr_from_ffi<ffi::AppendWriter>(ffi_result));
     }
+    return result;
 }
 
 // ============================================================================
@@ -645,11 +643,14 @@ Result TableUpsert::CreateWriter(UpsertWriter& out) {
         for (size_t idx : resolved_indices) {
             rust_indices.push_back(idx);
         }
-        out = 
UpsertWriter(table_->create_upsert_writer(std::move(rust_indices)));
-        return utils::make_ok();
-    } catch (const rust::Error& e) {
-        return utils::make_client_error(e.what());
+        auto ffi_result = 
table_->create_upsert_writer(std::move(rust_indices));
+        auto result = utils::from_ffi_result(ffi_result.result);
+        if (result.Ok()) {
+            out = 
UpsertWriter(utils::ptr_from_ffi<ffi::UpsertWriter>(ffi_result));
+        }
+        return result;
     } catch (const std::exception& e) {
+        // ResolveNameProjection() may throw
         return utils::make_client_error(e.what());
     }
 }
@@ -665,14 +666,12 @@ Result TableLookup::CreateLookuper(Lookuper& out) {
         return utils::make_client_error("Table not available");
     }
 
-    try {
-        out = Lookuper(table_->new_lookuper());
-        return utils::make_ok();
-    } catch (const rust::Error& e) {
-        return utils::make_client_error(e.what());
-    } catch (const std::exception& e) {
-        return utils::make_client_error(e.what());
+    auto ffi_result = table_->new_lookuper();
+    auto result = utils::from_ffi_result(ffi_result.result);
+    if (result.Ok()) {
+        out = Lookuper(utils::ptr_from_ffi<ffi::Lookuper>(ffi_result));
     }
+    return result;
 }
 
 // ============================================================================
@@ -731,11 +730,14 @@ Result TableScan::DoCreateScanner(LogScanner& out, bool 
is_record_batch) {
         for (size_t idx : resolved_indices) {
             rust_indices.push_back(idx);
         }
-        out.scanner_ = table_->create_scanner(std::move(rust_indices), 
is_record_batch);
-        return utils::make_ok();
-    } catch (const rust::Error& e) {
-        return utils::make_client_error(e.what());
+        auto ffi_result = table_->create_scanner(std::move(rust_indices), 
is_record_batch);
+        auto result = utils::from_ffi_result(ffi_result.result);
+        if (result.Ok()) {
+            out.scanner_ = utils::ptr_from_ffi<ffi::LogScanner>(ffi_result);
+        }
+        return result;
     } catch (const std::exception& e) {
+        // ResolveNameProjection() may throw
         return utils::make_client_error(e.what());
     }
 }
@@ -752,8 +754,7 @@ WriteResult::~WriteResult() noexcept { Destroy(); }
 
 void WriteResult::Destroy() noexcept {
     if (inner_) {
-        // Reconstruct the rust::Box to let Rust drop the value
-        rust::Box<ffi::WriteResult>::from_raw(inner_);
+        ffi::delete_write_result(inner_);
         inner_ = nullptr;
     }
 }
@@ -827,15 +828,12 @@ Result AppendWriter::Append(const GenericRow& row, 
WriteResult& out) {
         return utils::make_client_error("GenericRow not available");
     }
 
-    try {
-        auto rust_box = writer_->append(*row.inner_);
-        out = WriteResult(rust_box.into_raw());
-        return utils::make_ok();
-    } catch (const rust::Error& e) {
-        return utils::make_client_error(e.what());
-    } catch (const std::exception& e) {
-        return utils::make_client_error(e.what());
+    auto ffi_result = writer_->append(*row.inner_);
+    auto result = utils::from_ffi_result(ffi_result.result);
+    if (result.Ok()) {
+        out = WriteResult(utils::ptr_from_ffi<ffi::WriteResult>(ffi_result));
     }
+    return result;
 }
 
 Result AppendWriter::AppendArrowBatch(const 
std::shared_ptr<arrow::RecordBatch>& batch) {
@@ -864,19 +862,16 @@ Result AppendWriter::AppendArrowBatch(const 
std::shared_ptr<arrow::RecordBatch>&
     auto* array_heap = new ArrowArray(std::move(c_array));
     auto* schema_heap = new ArrowSchema(std::move(c_schema));
 
-    try {
-        // Rust takes ownership of both pointers immediately via 
Box::from_raw(),
-        // so after this call (success or exception) C++ must NOT free them.
-        auto result_box = 
writer_->append_arrow_batch(reinterpret_cast<size_t>(array_heap),
-                                                      
reinterpret_cast<size_t>(schema_heap));
+    // Rust takes ownership of both pointers immediately via Box::from_raw(),
+    // so after this call C++ must NOT free them.
+    auto ffi_result = 
writer_->append_arrow_batch(reinterpret_cast<size_t>(array_heap),
+                                                  
reinterpret_cast<size_t>(schema_heap));
+    auto result = utils::from_ffi_result(ffi_result.result);
+    if (result.Ok()) {
         out.Destroy();
-        out.inner_ = result_box.into_raw();
-        return utils::make_ok();
-    } catch (const rust::Error& e) {
-        return utils::make_client_error(std::string(e.what()));
-    } catch (const std::exception& e) {
-        return utils::make_client_error(std::string(e.what()));
+        out.inner_ = utils::ptr_from_ffi<ffi::WriteResult>(ffi_result);
     }
+    return result;
 }
 
 Result AppendWriter::Flush() {
@@ -933,15 +928,12 @@ Result UpsertWriter::Upsert(const GenericRow& row, 
WriteResult& out) {
         return utils::make_client_error("GenericRow not available");
     }
 
-    try {
-        auto rust_box = writer_->upsert(*row.inner_);
-        out = WriteResult(rust_box.into_raw());
-        return utils::make_ok();
-    } catch (const rust::Error& e) {
-        return utils::make_client_error(e.what());
-    } catch (const std::exception& e) {
-        return utils::make_client_error(e.what());
+    auto ffi_result = writer_->upsert(*row.inner_);
+    auto result = utils::from_ffi_result(ffi_result.result);
+    if (result.Ok()) {
+        out = WriteResult(utils::ptr_from_ffi<ffi::WriteResult>(ffi_result));
     }
+    return result;
 }
 
 Result UpsertWriter::Delete(const GenericRow& row) {
@@ -957,15 +949,12 @@ Result UpsertWriter::Delete(const GenericRow& row, 
WriteResult& out) {
         return utils::make_client_error("GenericRow not available");
     }
 
-    try {
-        auto rust_box = writer_->delete_row(*row.inner_);
-        out = WriteResult(rust_box.into_raw());
-        return utils::make_ok();
-    } catch (const rust::Error& e) {
-        return utils::make_client_error(e.what());
-    } catch (const std::exception& e) {
-        return utils::make_client_error(e.what());
+    auto ffi_result = writer_->delete_row(*row.inner_);
+    auto result = utils::from_ffi_result(ffi_result.result);
+    if (result.Ok()) {
+        out = WriteResult(utils::ptr_from_ffi<ffi::WriteResult>(ffi_result));
     }
+    return result;
 }
 
 Result UpsertWriter::Flush() {
diff --git a/bindings/cpp/test/test_sasl_auth.cpp 
b/bindings/cpp/test/test_sasl_auth.cpp
index 2208db3..5a52a1a 100644
--- a/bindings/cpp/test/test_sasl_auth.cpp
+++ b/bindings/cpp/test/test_sasl_auth.cpp
@@ -89,8 +89,7 @@ TEST_F(SaslAuthTest, SaslConnectWithWrongPassword) {
     fluss::Connection conn;
     auto result = fluss::Connection::Create(config, conn);
     ASSERT_FALSE(result.Ok());
-    // TODO: error_code is CLIENT_ERROR (-2) because CXX Result<*mut T> loses 
the server
-    // error code. Should be AUTHENTICATE_EXCEPTION (46) once fixed
+    EXPECT_EQ(result.error_code, fluss::ErrorCode::AUTHENTICATE_EXCEPTION);
     EXPECT_NE(result.error_message.find("Authentication failed"), 
std::string::npos)
         << "Expected 'Authentication failed' in: " << result.error_message;
 }
@@ -106,7 +105,7 @@ TEST_F(SaslAuthTest, SaslConnectWithUnknownUser) {
     fluss::Connection conn;
     auto result = fluss::Connection::Create(config, conn);
     ASSERT_FALSE(result.Ok());
-    // TODO: same as above — should check error_code == AUTHENTICATE_EXCEPTION 
once fixed.
+    EXPECT_EQ(result.error_code, fluss::ErrorCode::AUTHENTICATE_EXCEPTION);
     EXPECT_NE(result.error_message.find("Authentication failed"), 
std::string::npos)
         << "Expected 'Authentication failed' in: " << result.error_message;
 }

Reply via email to