Copilot commented on code in PR #288:
URL: https://github.com/apache/fluss-rust/pull/288#discussion_r2780978916


##########
bindings/cpp/src/table.cpp:
##########
@@ -299,6 +321,209 @@ Result AppendWriter::Flush() {
     return utils::from_ffi_result(ffi_result);
 }
 
+// UpsertWriter implementation
+UpsertWriter::UpsertWriter() noexcept = default;
+
+UpsertWriter::~UpsertWriter() noexcept { Destroy(); }
+
+void UpsertWriter::Destroy() noexcept {
+    if (writer_) {
+        ffi::delete_upsert_writer(writer_);
+        writer_ = nullptr;
+    }
+}
+
+UpsertWriter::UpsertWriter(UpsertWriter&& other) noexcept : 
writer_(other.writer_) {
+    other.writer_ = nullptr;
+}
+
+UpsertWriter& UpsertWriter::operator=(UpsertWriter&& other) noexcept {
+    if (this != &other) {
+        Destroy();
+        writer_ = other.writer_;
+        other.writer_ = nullptr;
+    }
+    return *this;
+}
+
+bool UpsertWriter::Available() const { return writer_ != nullptr; }
+
+Result UpsertWriter::Upsert(const GenericRow& row) {
+    WriteResult wr;
+    return Upsert(row, wr);
+}
+
+Result UpsertWriter::Upsert(const GenericRow& row, WriteResult& out) {
+    if (!Available()) {
+        return utils::make_error(1, "UpsertWriter not available");
+    }
+
+    try {
+        auto ffi_row = utils::to_ffi_generic_row(row);
+        auto rust_box = writer_->upsert(ffi_row);
+        out.inner_ = rust_box.into_raw();
+        return utils::make_ok();
+    } catch (const rust::Error& e) {
+        return utils::make_error(1, e.what());
+    } catch (const std::exception& e) {
+        return utils::make_error(1, e.what());
+    }
+}
+
+Result UpsertWriter::Delete(const GenericRow& row) {
+    WriteResult wr;
+    return Delete(row, wr);
+}
+
+Result UpsertWriter::Delete(const GenericRow& row, WriteResult& out) {
+    if (!Available()) {
+        return utils::make_error(1, "UpsertWriter not available");
+    }
+
+    try {
+        auto ffi_row = utils::to_ffi_generic_row(row);
+        auto rust_box = writer_->delete_row(ffi_row);
+        out.inner_ = rust_box.into_raw();
+        return utils::make_ok();
+    } catch (const rust::Error& e) {
+        return utils::make_error(1, e.what());
+    } catch (const std::exception& e) {
+        return utils::make_error(1, e.what());
+    }

Review Comment:
   `UpsertWriter::Delete(const GenericRow&, WriteResult&)` assigns to 
`out.inner_` without first releasing any existing `WriteResult` state, which 
can leak if `out` is reused. Consider resetting/destroying `out` before 
overwriting `out.inner_`.



##########
bindings/cpp/src/table.cpp:
##########
@@ -299,6 +321,209 @@ Result AppendWriter::Flush() {
     return utils::from_ffi_result(ffi_result);
 }
 
+// UpsertWriter implementation
+UpsertWriter::UpsertWriter() noexcept = default;
+
+UpsertWriter::~UpsertWriter() noexcept { Destroy(); }
+
+void UpsertWriter::Destroy() noexcept {
+    if (writer_) {
+        ffi::delete_upsert_writer(writer_);
+        writer_ = nullptr;
+    }
+}
+
+UpsertWriter::UpsertWriter(UpsertWriter&& other) noexcept : 
writer_(other.writer_) {
+    other.writer_ = nullptr;
+}
+
+UpsertWriter& UpsertWriter::operator=(UpsertWriter&& other) noexcept {
+    if (this != &other) {
+        Destroy();
+        writer_ = other.writer_;
+        other.writer_ = nullptr;
+    }
+    return *this;
+}
+
+bool UpsertWriter::Available() const { return writer_ != nullptr; }
+
+Result UpsertWriter::Upsert(const GenericRow& row) {
+    WriteResult wr;
+    return Upsert(row, wr);
+}
+
+Result UpsertWriter::Upsert(const GenericRow& row, WriteResult& out) {
+    if (!Available()) {
+        return utils::make_error(1, "UpsertWriter not available");
+    }
+
+    try {
+        auto ffi_row = utils::to_ffi_generic_row(row);
+        auto rust_box = writer_->upsert(ffi_row);
+        out.inner_ = rust_box.into_raw();
+        return utils::make_ok();
+    } catch (const rust::Error& e) {
+        return utils::make_error(1, e.what());
+    } catch (const std::exception& e) {
+        return utils::make_error(1, e.what());
+    }

Review Comment:
   `UpsertWriter::Upsert(const GenericRow&, WriteResult&)` assigns to 
`out.inner_` without first releasing any existing `WriteResult` state. If 
callers reuse a `WriteResult` instance, this can leak the previously owned Rust 
box. Consider calling `out.Destroy()`/resetting `out.inner_` before overwriting 
it.



##########
bindings/cpp/src/table.cpp:
##########
@@ -299,6 +321,209 @@ Result AppendWriter::Flush() {
     return utils::from_ffi_result(ffi_result);
 }
 
+// UpsertWriter implementation
+UpsertWriter::UpsertWriter() noexcept = default;
+
+UpsertWriter::~UpsertWriter() noexcept { Destroy(); }
+
+void UpsertWriter::Destroy() noexcept {
+    if (writer_) {
+        ffi::delete_upsert_writer(writer_);
+        writer_ = nullptr;
+    }
+}
+
+UpsertWriter::UpsertWriter(UpsertWriter&& other) noexcept : 
writer_(other.writer_) {
+    other.writer_ = nullptr;
+}
+
+UpsertWriter& UpsertWriter::operator=(UpsertWriter&& other) noexcept {
+    if (this != &other) {
+        Destroy();
+        writer_ = other.writer_;
+        other.writer_ = nullptr;
+    }
+    return *this;
+}
+
+bool UpsertWriter::Available() const { return writer_ != nullptr; }
+
+Result UpsertWriter::Upsert(const GenericRow& row) {
+    WriteResult wr;
+    return Upsert(row, wr);
+}
+
+Result UpsertWriter::Upsert(const GenericRow& row, WriteResult& out) {
+    if (!Available()) {
+        return utils::make_error(1, "UpsertWriter not available");
+    }
+
+    try {
+        auto ffi_row = utils::to_ffi_generic_row(row);
+        auto rust_box = writer_->upsert(ffi_row);
+        out.inner_ = rust_box.into_raw();
+        return utils::make_ok();
+    } catch (const rust::Error& e) {
+        return utils::make_error(1, e.what());
+    } catch (const std::exception& e) {
+        return utils::make_error(1, e.what());
+    }
+}
+
+Result UpsertWriter::Delete(const GenericRow& row) {
+    WriteResult wr;
+    return Delete(row, wr);
+}
+
+Result UpsertWriter::Delete(const GenericRow& row, WriteResult& out) {
+    if (!Available()) {
+        return utils::make_error(1, "UpsertWriter not available");
+    }
+
+    try {
+        auto ffi_row = utils::to_ffi_generic_row(row);
+        auto rust_box = writer_->delete_row(ffi_row);
+        out.inner_ = rust_box.into_raw();
+        return utils::make_ok();
+    } catch (const rust::Error& e) {
+        return utils::make_error(1, e.what());
+    } catch (const std::exception& e) {
+        return utils::make_error(1, e.what());
+    }
+}
+
+Result UpsertWriter::Flush() {
+    if (!Available()) {
+        return utils::make_error(1, "UpsertWriter not available");
+    }
+
+    auto ffi_result = writer_->upsert_flush();
+    return utils::from_ffi_result(ffi_result);
+}
+
+// Lookuper implementation
+Lookuper::Lookuper() noexcept = default;
+
+Lookuper::~Lookuper() noexcept { Destroy(); }
+
+void Lookuper::Destroy() noexcept {
+    if (lookuper_) {
+        ffi::delete_lookuper(lookuper_);
+        lookuper_ = nullptr;
+    }
+}
+
+Lookuper::Lookuper(Lookuper&& other) noexcept : lookuper_(other.lookuper_) {
+    other.lookuper_ = nullptr;
+}
+
+Lookuper& Lookuper::operator=(Lookuper&& other) noexcept {
+    if (this != &other) {
+        Destroy();
+        lookuper_ = other.lookuper_;
+        other.lookuper_ = nullptr;
+    }
+    return *this;
+}
+
+bool Lookuper::Available() const { return lookuper_ != nullptr; }
+
+Result Lookuper::Lookup(const GenericRow& pk_row, bool& found, GenericRow& 
out) {
+    if (!Available()) {
+        return utils::make_error(1, "Lookuper not available");
+    }
+
+    try {
+        auto ffi_row = utils::to_ffi_generic_row(pk_row);
+        auto ffi_result = lookuper_->lookup(ffi_row);
+        auto result = utils::from_ffi_result(ffi_result.result);
+        if (!result.Ok()) {
+            found = false;
+            return result;
+        }
+        found = ffi_result.found;
+        if (found) {
+            out = utils::from_ffi_generic_row(ffi_result.row);
+        }
+        return utils::make_ok();
+    } catch (const rust::Error& e) {
+        found = false;
+        return utils::make_error(1, e.what());
+    } catch (const std::exception& e) {
+        found = false;
+        return utils::make_error(1, e.what());
+    }
+}
+
+// Table KV methods
+Result Table::NewUpsertWriter(UpsertWriter& out) {
+    if (!Available()) {
+        return utils::make_error(1, "Table not available");
+    }
+
+    try {
+        out.writer_ = table_->new_upsert_writer();
+        return utils::make_ok();
+    } catch (const rust::Error& e) {
+        return utils::make_error(1, e.what());
+    } catch (const std::exception& e) {
+        return utils::make_error(1, e.what());
+    }
+}
+
+Result Table::NewUpsertWriter(UpsertWriter& out, const 
std::vector<std::string>& column_names) {
+    if (!Available()) {
+        return utils::make_error(1, "Table not available");
+    }
+
+    try {
+        rust::Vec<rust::String> rust_names;
+        for (const auto& name : column_names) {
+            rust_names.push_back(rust::String(name));
+        }
+        out.writer_ = 
table_->new_upsert_writer_with_column_names(std::move(rust_names));
+        return utils::make_ok();
+    } catch (const rust::Error& e) {
+        return utils::make_error(1, e.what());
+    } catch (const std::exception& e) {
+        return utils::make_error(1, e.what());
+    }
+}
+
+Result Table::NewUpsertWriter(UpsertWriter& out, const std::vector<size_t>& 
column_indices) {
+    if (!Available()) {
+        return utils::make_error(1, "Table not available");
+    }
+
+    try {
+        rust::Vec<size_t> rust_indices;
+        for (size_t idx : column_indices) {
+            rust_indices.push_back(idx);
+        }
+        out.writer_ = 
table_->new_upsert_writer_with_column_indices(std::move(rust_indices));
+        return utils::make_ok();
+    } catch (const rust::Error& e) {
+        return utils::make_error(1, e.what());
+    } catch (const std::exception& e) {
+        return utils::make_error(1, e.what());
+    }
+}
+
+Result Table::NewLookuper(Lookuper& out) {
+    if (!Available()) {
+        return utils::make_error(1, "Table not available");
+    }
+
+    try {
+        out.lookuper_ = table_->new_lookuper();
+        return utils::make_ok();
+    } catch (const rust::Error& e) {
+        return utils::make_error(1, e.what());
+    } catch (const std::exception& e) {
+        return utils::make_error(1, e.what());
+    }
+}

Review Comment:
   `Table::NewLookuper` overwrites `out.lookuper_` without releasing any 
existing lookuper held by `out`. Reusing a `Lookuper` instance would leak the 
old handle. Consider calling `out.Destroy()` (or resetting) before assignment.



##########
bindings/cpp/src/table.cpp:
##########
@@ -299,6 +321,209 @@ Result AppendWriter::Flush() {
     return utils::from_ffi_result(ffi_result);
 }
 
+// UpsertWriter implementation
+UpsertWriter::UpsertWriter() noexcept = default;
+
+UpsertWriter::~UpsertWriter() noexcept { Destroy(); }
+
+void UpsertWriter::Destroy() noexcept {
+    if (writer_) {
+        ffi::delete_upsert_writer(writer_);
+        writer_ = nullptr;
+    }
+}
+
+UpsertWriter::UpsertWriter(UpsertWriter&& other) noexcept : 
writer_(other.writer_) {
+    other.writer_ = nullptr;
+}
+
+UpsertWriter& UpsertWriter::operator=(UpsertWriter&& other) noexcept {
+    if (this != &other) {
+        Destroy();
+        writer_ = other.writer_;
+        other.writer_ = nullptr;
+    }
+    return *this;
+}
+
+bool UpsertWriter::Available() const { return writer_ != nullptr; }
+
+Result UpsertWriter::Upsert(const GenericRow& row) {
+    WriteResult wr;
+    return Upsert(row, wr);
+}
+
+Result UpsertWriter::Upsert(const GenericRow& row, WriteResult& out) {
+    if (!Available()) {
+        return utils::make_error(1, "UpsertWriter not available");
+    }
+
+    try {
+        auto ffi_row = utils::to_ffi_generic_row(row);
+        auto rust_box = writer_->upsert(ffi_row);
+        out.inner_ = rust_box.into_raw();
+        return utils::make_ok();
+    } catch (const rust::Error& e) {
+        return utils::make_error(1, e.what());
+    } catch (const std::exception& e) {
+        return utils::make_error(1, e.what());
+    }
+}
+
+Result UpsertWriter::Delete(const GenericRow& row) {
+    WriteResult wr;
+    return Delete(row, wr);
+}
+
+Result UpsertWriter::Delete(const GenericRow& row, WriteResult& out) {
+    if (!Available()) {
+        return utils::make_error(1, "UpsertWriter not available");
+    }
+
+    try {
+        auto ffi_row = utils::to_ffi_generic_row(row);
+        auto rust_box = writer_->delete_row(ffi_row);
+        out.inner_ = rust_box.into_raw();
+        return utils::make_ok();
+    } catch (const rust::Error& e) {
+        return utils::make_error(1, e.what());
+    } catch (const std::exception& e) {
+        return utils::make_error(1, e.what());
+    }
+}
+
+Result UpsertWriter::Flush() {
+    if (!Available()) {
+        return utils::make_error(1, "UpsertWriter not available");
+    }
+
+    auto ffi_result = writer_->upsert_flush();
+    return utils::from_ffi_result(ffi_result);
+}
+
+// Lookuper implementation
+Lookuper::Lookuper() noexcept = default;
+
+Lookuper::~Lookuper() noexcept { Destroy(); }
+
+void Lookuper::Destroy() noexcept {
+    if (lookuper_) {
+        ffi::delete_lookuper(lookuper_);
+        lookuper_ = nullptr;
+    }
+}
+
+Lookuper::Lookuper(Lookuper&& other) noexcept : lookuper_(other.lookuper_) {
+    other.lookuper_ = nullptr;
+}
+
+Lookuper& Lookuper::operator=(Lookuper&& other) noexcept {
+    if (this != &other) {
+        Destroy();
+        lookuper_ = other.lookuper_;
+        other.lookuper_ = nullptr;
+    }
+    return *this;
+}
+
+bool Lookuper::Available() const { return lookuper_ != nullptr; }
+
+Result Lookuper::Lookup(const GenericRow& pk_row, bool& found, GenericRow& 
out) {
+    if (!Available()) {
+        return utils::make_error(1, "Lookuper not available");
+    }
+
+    try {
+        auto ffi_row = utils::to_ffi_generic_row(pk_row);
+        auto ffi_result = lookuper_->lookup(ffi_row);
+        auto result = utils::from_ffi_result(ffi_result.result);
+        if (!result.Ok()) {
+            found = false;
+            return result;
+        }
+        found = ffi_result.found;
+        if (found) {
+            out = utils::from_ffi_generic_row(ffi_result.row);
+        }
+        return utils::make_ok();
+    } catch (const rust::Error& e) {
+        found = false;
+        return utils::make_error(1, e.what());
+    } catch (const std::exception& e) {
+        found = false;
+        return utils::make_error(1, e.what());
+    }
+}
+
+// Table KV methods
+Result Table::NewUpsertWriter(UpsertWriter& out) {
+    if (!Available()) {
+        return utils::make_error(1, "Table not available");
+    }
+
+    try {
+        out.writer_ = table_->new_upsert_writer();
+        return utils::make_ok();
+    } catch (const rust::Error& e) {
+        return utils::make_error(1, e.what());
+    } catch (const std::exception& e) {
+        return utils::make_error(1, e.what());
+    }
+}

Review Comment:
   `Table::NewUpsertWriter(...)` overwrites `out.writer_` without first 
releasing any existing writer held by `out`. If callers reuse an `UpsertWriter` 
instance, this will leak the previously owned writer. Call `out.Destroy()` (or 
otherwise delete/reset the existing handle) before assigning the new pointer.



##########
bindings/cpp/src/table.cpp:
##########
@@ -299,6 +321,209 @@ Result AppendWriter::Flush() {
     return utils::from_ffi_result(ffi_result);
 }
 
+// UpsertWriter implementation
+UpsertWriter::UpsertWriter() noexcept = default;
+
+UpsertWriter::~UpsertWriter() noexcept { Destroy(); }
+
+void UpsertWriter::Destroy() noexcept {
+    if (writer_) {
+        ffi::delete_upsert_writer(writer_);
+        writer_ = nullptr;
+    }
+}
+
+UpsertWriter::UpsertWriter(UpsertWriter&& other) noexcept : 
writer_(other.writer_) {
+    other.writer_ = nullptr;
+}
+
+UpsertWriter& UpsertWriter::operator=(UpsertWriter&& other) noexcept {
+    if (this != &other) {
+        Destroy();
+        writer_ = other.writer_;
+        other.writer_ = nullptr;
+    }
+    return *this;
+}
+
+bool UpsertWriter::Available() const { return writer_ != nullptr; }
+
+Result UpsertWriter::Upsert(const GenericRow& row) {
+    WriteResult wr;
+    return Upsert(row, wr);
+}
+
+Result UpsertWriter::Upsert(const GenericRow& row, WriteResult& out) {
+    if (!Available()) {
+        return utils::make_error(1, "UpsertWriter not available");
+    }
+
+    try {
+        auto ffi_row = utils::to_ffi_generic_row(row);
+        auto rust_box = writer_->upsert(ffi_row);
+        out.inner_ = rust_box.into_raw();
+        return utils::make_ok();
+    } catch (const rust::Error& e) {
+        return utils::make_error(1, e.what());
+    } catch (const std::exception& e) {
+        return utils::make_error(1, e.what());
+    }
+}
+
+Result UpsertWriter::Delete(const GenericRow& row) {
+    WriteResult wr;
+    return Delete(row, wr);
+}
+
+Result UpsertWriter::Delete(const GenericRow& row, WriteResult& out) {
+    if (!Available()) {
+        return utils::make_error(1, "UpsertWriter not available");
+    }
+
+    try {
+        auto ffi_row = utils::to_ffi_generic_row(row);
+        auto rust_box = writer_->delete_row(ffi_row);
+        out.inner_ = rust_box.into_raw();
+        return utils::make_ok();
+    } catch (const rust::Error& e) {
+        return utils::make_error(1, e.what());
+    } catch (const std::exception& e) {
+        return utils::make_error(1, e.what());
+    }
+}
+
+Result UpsertWriter::Flush() {
+    if (!Available()) {
+        return utils::make_error(1, "UpsertWriter not available");
+    }
+
+    auto ffi_result = writer_->upsert_flush();
+    return utils::from_ffi_result(ffi_result);
+}
+
+// Lookuper implementation
+Lookuper::Lookuper() noexcept = default;
+
+Lookuper::~Lookuper() noexcept { Destroy(); }
+
+void Lookuper::Destroy() noexcept {
+    if (lookuper_) {
+        ffi::delete_lookuper(lookuper_);
+        lookuper_ = nullptr;
+    }
+}
+
+Lookuper::Lookuper(Lookuper&& other) noexcept : lookuper_(other.lookuper_) {
+    other.lookuper_ = nullptr;
+}
+
+Lookuper& Lookuper::operator=(Lookuper&& other) noexcept {
+    if (this != &other) {
+        Destroy();
+        lookuper_ = other.lookuper_;
+        other.lookuper_ = nullptr;
+    }
+    return *this;
+}
+
+bool Lookuper::Available() const { return lookuper_ != nullptr; }
+
+Result Lookuper::Lookup(const GenericRow& pk_row, bool& found, GenericRow& 
out) {
+    if (!Available()) {
+        return utils::make_error(1, "Lookuper not available");
+    }
+
+    try {
+        auto ffi_row = utils::to_ffi_generic_row(pk_row);
+        auto ffi_result = lookuper_->lookup(ffi_row);
+        auto result = utils::from_ffi_result(ffi_result.result);
+        if (!result.Ok()) {
+            found = false;
+            return result;
+        }
+        found = ffi_result.found;
+        if (found) {
+            out = utils::from_ffi_generic_row(ffi_result.row);
+        }
+        return utils::make_ok();

Review Comment:
   When `found` is false, `Lookup` leaves `out` unchanged. If callers reuse the 
same `GenericRow` for multiple lookups, they may accidentally read stale data 
from a previous successful lookup. Consider explicitly resetting `out` (e.g., 
`out = GenericRow{}`) when `found` is false or when an error occurs.



##########
bindings/cpp/src/table.cpp:
##########
@@ -299,6 +321,209 @@ Result AppendWriter::Flush() {
     return utils::from_ffi_result(ffi_result);
 }
 
+// UpsertWriter implementation
+UpsertWriter::UpsertWriter() noexcept = default;
+
+UpsertWriter::~UpsertWriter() noexcept { Destroy(); }
+
+void UpsertWriter::Destroy() noexcept {
+    if (writer_) {
+        ffi::delete_upsert_writer(writer_);
+        writer_ = nullptr;
+    }
+}
+
+UpsertWriter::UpsertWriter(UpsertWriter&& other) noexcept : 
writer_(other.writer_) {
+    other.writer_ = nullptr;
+}
+
+UpsertWriter& UpsertWriter::operator=(UpsertWriter&& other) noexcept {
+    if (this != &other) {
+        Destroy();
+        writer_ = other.writer_;
+        other.writer_ = nullptr;
+    }
+    return *this;
+}
+
+bool UpsertWriter::Available() const { return writer_ != nullptr; }
+
+Result UpsertWriter::Upsert(const GenericRow& row) {
+    WriteResult wr;
+    return Upsert(row, wr);
+}
+
+Result UpsertWriter::Upsert(const GenericRow& row, WriteResult& out) {
+    if (!Available()) {
+        return utils::make_error(1, "UpsertWriter not available");
+    }
+
+    try {
+        auto ffi_row = utils::to_ffi_generic_row(row);
+        auto rust_box = writer_->upsert(ffi_row);
+        out.inner_ = rust_box.into_raw();
+        return utils::make_ok();
+    } catch (const rust::Error& e) {
+        return utils::make_error(1, e.what());
+    } catch (const std::exception& e) {
+        return utils::make_error(1, e.what());
+    }
+}
+
+Result UpsertWriter::Delete(const GenericRow& row) {
+    WriteResult wr;
+    return Delete(row, wr);
+}
+
+Result UpsertWriter::Delete(const GenericRow& row, WriteResult& out) {
+    if (!Available()) {
+        return utils::make_error(1, "UpsertWriter not available");
+    }
+
+    try {
+        auto ffi_row = utils::to_ffi_generic_row(row);
+        auto rust_box = writer_->delete_row(ffi_row);
+        out.inner_ = rust_box.into_raw();
+        return utils::make_ok();
+    } catch (const rust::Error& e) {
+        return utils::make_error(1, e.what());
+    } catch (const std::exception& e) {
+        return utils::make_error(1, e.what());
+    }
+}
+
+Result UpsertWriter::Flush() {
+    if (!Available()) {
+        return utils::make_error(1, "UpsertWriter not available");
+    }
+
+    auto ffi_result = writer_->upsert_flush();
+    return utils::from_ffi_result(ffi_result);
+}
+
+// Lookuper implementation
+Lookuper::Lookuper() noexcept = default;
+
+Lookuper::~Lookuper() noexcept { Destroy(); }
+
+void Lookuper::Destroy() noexcept {
+    if (lookuper_) {
+        ffi::delete_lookuper(lookuper_);
+        lookuper_ = nullptr;
+    }
+}
+
+Lookuper::Lookuper(Lookuper&& other) noexcept : lookuper_(other.lookuper_) {
+    other.lookuper_ = nullptr;
+}
+
+Lookuper& Lookuper::operator=(Lookuper&& other) noexcept {
+    if (this != &other) {
+        Destroy();
+        lookuper_ = other.lookuper_;
+        other.lookuper_ = nullptr;
+    }
+    return *this;
+}
+
+bool Lookuper::Available() const { return lookuper_ != nullptr; }
+
+Result Lookuper::Lookup(const GenericRow& pk_row, bool& found, GenericRow& 
out) {
+    if (!Available()) {
+        return utils::make_error(1, "Lookuper not available");
+    }
+
+    try {
+        auto ffi_row = utils::to_ffi_generic_row(pk_row);
+        auto ffi_result = lookuper_->lookup(ffi_row);
+        auto result = utils::from_ffi_result(ffi_result.result);
+        if (!result.Ok()) {
+            found = false;
+            return result;
+        }
+        found = ffi_result.found;
+        if (found) {
+            out = utils::from_ffi_generic_row(ffi_result.row);
+        }
+        return utils::make_ok();
+    } catch (const rust::Error& e) {
+        found = false;
+        return utils::make_error(1, e.what());
+    } catch (const std::exception& e) {
+        found = false;
+        return utils::make_error(1, e.what());
+    }
+}
+
+// Table KV methods
+Result Table::NewUpsertWriter(UpsertWriter& out) {
+    if (!Available()) {
+        return utils::make_error(1, "Table not available");
+    }
+
+    try {
+        out.writer_ = table_->new_upsert_writer();
+        return utils::make_ok();
+    } catch (const rust::Error& e) {
+        return utils::make_error(1, e.what());
+    } catch (const std::exception& e) {
+        return utils::make_error(1, e.what());
+    }
+}
+
+Result Table::NewUpsertWriter(UpsertWriter& out, const 
std::vector<std::string>& column_names) {
+    if (!Available()) {
+        return utils::make_error(1, "Table not available");
+    }
+
+    try {
+        rust::Vec<rust::String> rust_names;
+        for (const auto& name : column_names) {
+            rust_names.push_back(rust::String(name));
+        }
+        out.writer_ = 
table_->new_upsert_writer_with_column_names(std::move(rust_names));
+        return utils::make_ok();
+    } catch (const rust::Error& e) {
+        return utils::make_error(1, e.what());
+    } catch (const std::exception& e) {
+        return utils::make_error(1, e.what());
+    }
+}
+
+Result Table::NewUpsertWriter(UpsertWriter& out, const std::vector<size_t>& 
column_indices) {
+    if (!Available()) {
+        return utils::make_error(1, "Table not available");
+    }
+
+    try {
+        rust::Vec<size_t> rust_indices;
+        for (size_t idx : column_indices) {
+            rust_indices.push_back(idx);
+        }
+        out.writer_ = 
table_->new_upsert_writer_with_column_indices(std::move(rust_indices));
+        return utils::make_ok();
+    } catch (const rust::Error& e) {
+        return utils::make_error(1, e.what());
+    } catch (const std::exception& e) {
+        return utils::make_error(1, e.what());
+    }
+}

Review Comment:
   `Table::NewUpsertWriter(..., column_indices)` overwrites `out.writer_` 
without releasing an existing writer, which can leak if `out` is reused. 
Consider resetting/destroying `out` before assignment.



##########
bindings/cpp/include/fluss.hpp:
##########
@@ -593,8 +597,68 @@ struct GenericRow {
         fields[idx] = Datum::DecimalString(value);
     }
 
+    // ── Name-based setters (require schema — see Table::NewRow()) ───
+    void Set(const std::string& name, std::nullptr_t) { 
SetNull(Resolve(name)); }
+    void Set(const std::string& name, bool v) { SetBool(Resolve(name), v); }
+    void Set(const std::string& name, int32_t v) { SetInt32(Resolve(name), v); 
}
+    void Set(const std::string& name, int64_t v) { SetInt64(Resolve(name), v); 
}
+    void Set(const std::string& name, float v) { SetFloat32(Resolve(name), v); 
}
+    void Set(const std::string& name, double v) { SetFloat64(Resolve(name), 
v); }
+    // const char* overload to prevent "string literal" → bool conversion
+    void Set(const std::string& name, const char* v) {
+        auto [idx, type] = ResolveColumn(name);
+        if (type == TypeId::Decimal) {
+            SetDecimal(idx, v);
+        } else {
+            SetString(idx, v);
+        }
+    }
+    void Set(const std::string& name, std::string v) {
+        auto [idx, type] = ResolveColumn(name);
+        if (type == TypeId::Decimal) {
+            SetDecimal(idx, v);
+        } else {
+            SetString(idx, std::move(v));
+        }
+    }
+    void Set(const std::string& name, std::vector<uint8_t> v) {
+        SetBytes(Resolve(name), std::move(v));
+    }
+    void Set(const std::string& name, fluss::Date d) { SetDate(Resolve(name), 
d); }
+    void Set(const std::string& name, fluss::Time t) { SetTime(Resolve(name), 
t); }
+    void Set(const std::string& name, fluss::Timestamp ts) {
+        auto [idx, type] = ResolveColumn(name);
+        if (type == TypeId::TimestampLtz) {
+            SetTimestampLtz(idx, ts);
+        } else {
+            SetTimestampNtz(idx, ts);

Review Comment:
   `GenericRow::Set(const std::string&, fluss::Timestamp)` routes everything 
that is not `TypeId::TimestampLtz` to `SetTimestampNtz`, including 
non-timestamp columns. This can silently create a row with a timestamp datum 
for (e.g.) a String/Int column, leading to confusing failures later. Consider 
validating `type` is one of the timestamp type IDs and throwing a clear error 
if the column is not a timestamp.
   ```suggestion
           } else if (type == TypeId::TimestampNtz) {
               SetTimestampNtz(idx, ts);
           } else {
               throw std::runtime_error(
                   "GenericRow: column '" + name +
                   "' is not a timestamp column for Set(Timestamp)");
   ```



##########
bindings/cpp/src/table.cpp:
##########
@@ -299,6 +321,209 @@ Result AppendWriter::Flush() {
     return utils::from_ffi_result(ffi_result);
 }
 
+// UpsertWriter implementation
+UpsertWriter::UpsertWriter() noexcept = default;
+
+UpsertWriter::~UpsertWriter() noexcept { Destroy(); }
+
+void UpsertWriter::Destroy() noexcept {
+    if (writer_) {
+        ffi::delete_upsert_writer(writer_);
+        writer_ = nullptr;
+    }
+}
+
+UpsertWriter::UpsertWriter(UpsertWriter&& other) noexcept : 
writer_(other.writer_) {
+    other.writer_ = nullptr;
+}
+
+UpsertWriter& UpsertWriter::operator=(UpsertWriter&& other) noexcept {
+    if (this != &other) {
+        Destroy();
+        writer_ = other.writer_;
+        other.writer_ = nullptr;
+    }
+    return *this;
+}
+
+bool UpsertWriter::Available() const { return writer_ != nullptr; }
+
+Result UpsertWriter::Upsert(const GenericRow& row) {
+    WriteResult wr;
+    return Upsert(row, wr);
+}
+
+Result UpsertWriter::Upsert(const GenericRow& row, WriteResult& out) {
+    if (!Available()) {
+        return utils::make_error(1, "UpsertWriter not available");
+    }
+
+    try {
+        auto ffi_row = utils::to_ffi_generic_row(row);
+        auto rust_box = writer_->upsert(ffi_row);
+        out.inner_ = rust_box.into_raw();
+        return utils::make_ok();
+    } catch (const rust::Error& e) {
+        return utils::make_error(1, e.what());
+    } catch (const std::exception& e) {
+        return utils::make_error(1, e.what());
+    }
+}
+
+Result UpsertWriter::Delete(const GenericRow& row) {
+    WriteResult wr;
+    return Delete(row, wr);
+}
+
+Result UpsertWriter::Delete(const GenericRow& row, WriteResult& out) {
+    if (!Available()) {
+        return utils::make_error(1, "UpsertWriter not available");
+    }
+
+    try {
+        auto ffi_row = utils::to_ffi_generic_row(row);
+        auto rust_box = writer_->delete_row(ffi_row);
+        out.inner_ = rust_box.into_raw();
+        return utils::make_ok();
+    } catch (const rust::Error& e) {
+        return utils::make_error(1, e.what());
+    } catch (const std::exception& e) {
+        return utils::make_error(1, e.what());
+    }
+}
+
+Result UpsertWriter::Flush() {
+    if (!Available()) {
+        return utils::make_error(1, "UpsertWriter not available");
+    }
+
+    auto ffi_result = writer_->upsert_flush();
+    return utils::from_ffi_result(ffi_result);
+}
+
+// Lookuper implementation
+Lookuper::Lookuper() noexcept = default;
+
+Lookuper::~Lookuper() noexcept { Destroy(); }
+
+void Lookuper::Destroy() noexcept {
+    if (lookuper_) {
+        ffi::delete_lookuper(lookuper_);
+        lookuper_ = nullptr;
+    }
+}
+
+Lookuper::Lookuper(Lookuper&& other) noexcept : lookuper_(other.lookuper_) {
+    other.lookuper_ = nullptr;
+}
+
+Lookuper& Lookuper::operator=(Lookuper&& other) noexcept {
+    if (this != &other) {
+        Destroy();
+        lookuper_ = other.lookuper_;
+        other.lookuper_ = nullptr;
+    }
+    return *this;
+}
+
+bool Lookuper::Available() const { return lookuper_ != nullptr; }
+
+Result Lookuper::Lookup(const GenericRow& pk_row, bool& found, GenericRow& 
out) {
+    if (!Available()) {
+        return utils::make_error(1, "Lookuper not available");
+    }
+
+    try {
+        auto ffi_row = utils::to_ffi_generic_row(pk_row);
+        auto ffi_result = lookuper_->lookup(ffi_row);
+        auto result = utils::from_ffi_result(ffi_result.result);
+        if (!result.Ok()) {
+            found = false;
+            return result;
+        }
+        found = ffi_result.found;
+        if (found) {
+            out = utils::from_ffi_generic_row(ffi_result.row);
+        }
+        return utils::make_ok();
+    } catch (const rust::Error& e) {
+        found = false;
+        return utils::make_error(1, e.what());
+    } catch (const std::exception& e) {
+        found = false;
+        return utils::make_error(1, e.what());
+    }
+}
+
+// Table KV methods
+Result Table::NewUpsertWriter(UpsertWriter& out) {
+    if (!Available()) {
+        return utils::make_error(1, "Table not available");
+    }
+
+    try {
+        out.writer_ = table_->new_upsert_writer();
+        return utils::make_ok();
+    } catch (const rust::Error& e) {
+        return utils::make_error(1, e.what());
+    } catch (const std::exception& e) {
+        return utils::make_error(1, e.what());
+    }
+}
+
+Result Table::NewUpsertWriter(UpsertWriter& out, const 
std::vector<std::string>& column_names) {
+    if (!Available()) {
+        return utils::make_error(1, "Table not available");
+    }
+
+    try {
+        rust::Vec<rust::String> rust_names;
+        for (const auto& name : column_names) {
+            rust_names.push_back(rust::String(name));
+        }
+        out.writer_ = 
table_->new_upsert_writer_with_column_names(std::move(rust_names));
+        return utils::make_ok();
+    } catch (const rust::Error& e) {
+        return utils::make_error(1, e.what());
+    } catch (const std::exception& e) {
+        return utils::make_error(1, e.what());
+    }
+}

Review Comment:
   `Table::NewUpsertWriter(..., column_names)` overwrites `out.writer_` without 
releasing an existing writer, which can leak if `out` is reused. Consider 
resetting/destroying `out` before assignment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to