Copilot commented on code in PR #288:
URL: https://github.com/apache/fluss-rust/pull/288#discussion_r2780978916
##########
bindings/cpp/src/table.cpp:
##########
@@ -299,6 +321,209 @@ Result AppendWriter::Flush() {
return utils::from_ffi_result(ffi_result);
}
+// UpsertWriter implementation
+UpsertWriter::UpsertWriter() noexcept = default;
+
+UpsertWriter::~UpsertWriter() noexcept { Destroy(); }
+
+void UpsertWriter::Destroy() noexcept {
+ if (writer_) {
+ ffi::delete_upsert_writer(writer_);
+ writer_ = nullptr;
+ }
+}
+
+UpsertWriter::UpsertWriter(UpsertWriter&& other) noexcept :
writer_(other.writer_) {
+ other.writer_ = nullptr;
+}
+
+UpsertWriter& UpsertWriter::operator=(UpsertWriter&& other) noexcept {
+ if (this != &other) {
+ Destroy();
+ writer_ = other.writer_;
+ other.writer_ = nullptr;
+ }
+ return *this;
+}
+
+bool UpsertWriter::Available() const { return writer_ != nullptr; }
+
+Result UpsertWriter::Upsert(const GenericRow& row) {
+ WriteResult wr;
+ return Upsert(row, wr);
+}
+
+Result UpsertWriter::Upsert(const GenericRow& row, WriteResult& out) {
+ if (!Available()) {
+ return utils::make_error(1, "UpsertWriter not available");
+ }
+
+ try {
+ auto ffi_row = utils::to_ffi_generic_row(row);
+ auto rust_box = writer_->upsert(ffi_row);
+ out.inner_ = rust_box.into_raw();
+ return utils::make_ok();
+ } catch (const rust::Error& e) {
+ return utils::make_error(1, e.what());
+ } catch (const std::exception& e) {
+ return utils::make_error(1, e.what());
+ }
+}
+
+Result UpsertWriter::Delete(const GenericRow& row) {
+ WriteResult wr;
+ return Delete(row, wr);
+}
+
+Result UpsertWriter::Delete(const GenericRow& row, WriteResult& out) {
+ if (!Available()) {
+ return utils::make_error(1, "UpsertWriter not available");
+ }
+
+ try {
+ auto ffi_row = utils::to_ffi_generic_row(row);
+ auto rust_box = writer_->delete_row(ffi_row);
+ out.inner_ = rust_box.into_raw();
+ return utils::make_ok();
+ } catch (const rust::Error& e) {
+ return utils::make_error(1, e.what());
+ } catch (const std::exception& e) {
+ return utils::make_error(1, e.what());
+ }
Review Comment:
`UpsertWriter::Delete(const GenericRow&, WriteResult&)` assigns to
`out.inner_` without first releasing any existing `WriteResult` state, which
can leak if `out` is reused. Consider resetting/destroying `out` before
overwriting `out.inner_`.
##########
bindings/cpp/src/table.cpp:
##########
@@ -299,6 +321,209 @@ Result AppendWriter::Flush() {
return utils::from_ffi_result(ffi_result);
}
+// UpsertWriter implementation
+UpsertWriter::UpsertWriter() noexcept = default;
+
+UpsertWriter::~UpsertWriter() noexcept { Destroy(); }
+
+void UpsertWriter::Destroy() noexcept {
+ if (writer_) {
+ ffi::delete_upsert_writer(writer_);
+ writer_ = nullptr;
+ }
+}
+
+UpsertWriter::UpsertWriter(UpsertWriter&& other) noexcept :
writer_(other.writer_) {
+ other.writer_ = nullptr;
+}
+
+UpsertWriter& UpsertWriter::operator=(UpsertWriter&& other) noexcept {
+ if (this != &other) {
+ Destroy();
+ writer_ = other.writer_;
+ other.writer_ = nullptr;
+ }
+ return *this;
+}
+
+bool UpsertWriter::Available() const { return writer_ != nullptr; }
+
+Result UpsertWriter::Upsert(const GenericRow& row) {
+ WriteResult wr;
+ return Upsert(row, wr);
+}
+
+Result UpsertWriter::Upsert(const GenericRow& row, WriteResult& out) {
+ if (!Available()) {
+ return utils::make_error(1, "UpsertWriter not available");
+ }
+
+ try {
+ auto ffi_row = utils::to_ffi_generic_row(row);
+ auto rust_box = writer_->upsert(ffi_row);
+ out.inner_ = rust_box.into_raw();
+ return utils::make_ok();
+ } catch (const rust::Error& e) {
+ return utils::make_error(1, e.what());
+ } catch (const std::exception& e) {
+ return utils::make_error(1, e.what());
+ }
Review Comment:
`UpsertWriter::Upsert(const GenericRow&, WriteResult&)` assigns to
`out.inner_` without first releasing any existing `WriteResult` state. If
callers reuse a `WriteResult` instance, this can leak the previously owned Rust
box. Consider calling `out.Destroy()`/resetting `out.inner_` before overwriting
it.
##########
bindings/cpp/src/table.cpp:
##########
@@ -299,6 +321,209 @@ Result AppendWriter::Flush() {
return utils::from_ffi_result(ffi_result);
}
+// UpsertWriter implementation
+UpsertWriter::UpsertWriter() noexcept = default;
+
+UpsertWriter::~UpsertWriter() noexcept { Destroy(); }
+
+void UpsertWriter::Destroy() noexcept {
+ if (writer_) {
+ ffi::delete_upsert_writer(writer_);
+ writer_ = nullptr;
+ }
+}
+
+UpsertWriter::UpsertWriter(UpsertWriter&& other) noexcept :
writer_(other.writer_) {
+ other.writer_ = nullptr;
+}
+
+UpsertWriter& UpsertWriter::operator=(UpsertWriter&& other) noexcept {
+ if (this != &other) {
+ Destroy();
+ writer_ = other.writer_;
+ other.writer_ = nullptr;
+ }
+ return *this;
+}
+
+bool UpsertWriter::Available() const { return writer_ != nullptr; }
+
+Result UpsertWriter::Upsert(const GenericRow& row) {
+ WriteResult wr;
+ return Upsert(row, wr);
+}
+
+Result UpsertWriter::Upsert(const GenericRow& row, WriteResult& out) {
+ if (!Available()) {
+ return utils::make_error(1, "UpsertWriter not available");
+ }
+
+ try {
+ auto ffi_row = utils::to_ffi_generic_row(row);
+ auto rust_box = writer_->upsert(ffi_row);
+ out.inner_ = rust_box.into_raw();
+ return utils::make_ok();
+ } catch (const rust::Error& e) {
+ return utils::make_error(1, e.what());
+ } catch (const std::exception& e) {
+ return utils::make_error(1, e.what());
+ }
+}
+
+Result UpsertWriter::Delete(const GenericRow& row) {
+ WriteResult wr;
+ return Delete(row, wr);
+}
+
+Result UpsertWriter::Delete(const GenericRow& row, WriteResult& out) {
+ if (!Available()) {
+ return utils::make_error(1, "UpsertWriter not available");
+ }
+
+ try {
+ auto ffi_row = utils::to_ffi_generic_row(row);
+ auto rust_box = writer_->delete_row(ffi_row);
+ out.inner_ = rust_box.into_raw();
+ return utils::make_ok();
+ } catch (const rust::Error& e) {
+ return utils::make_error(1, e.what());
+ } catch (const std::exception& e) {
+ return utils::make_error(1, e.what());
+ }
+}
+
+Result UpsertWriter::Flush() {
+ if (!Available()) {
+ return utils::make_error(1, "UpsertWriter not available");
+ }
+
+ auto ffi_result = writer_->upsert_flush();
+ return utils::from_ffi_result(ffi_result);
+}
+
+// Lookuper implementation
+Lookuper::Lookuper() noexcept = default;
+
+Lookuper::~Lookuper() noexcept { Destroy(); }
+
+void Lookuper::Destroy() noexcept {
+ if (lookuper_) {
+ ffi::delete_lookuper(lookuper_);
+ lookuper_ = nullptr;
+ }
+}
+
+Lookuper::Lookuper(Lookuper&& other) noexcept : lookuper_(other.lookuper_) {
+ other.lookuper_ = nullptr;
+}
+
+Lookuper& Lookuper::operator=(Lookuper&& other) noexcept {
+ if (this != &other) {
+ Destroy();
+ lookuper_ = other.lookuper_;
+ other.lookuper_ = nullptr;
+ }
+ return *this;
+}
+
+bool Lookuper::Available() const { return lookuper_ != nullptr; }
+
+Result Lookuper::Lookup(const GenericRow& pk_row, bool& found, GenericRow&
out) {
+ if (!Available()) {
+ return utils::make_error(1, "Lookuper not available");
+ }
+
+ try {
+ auto ffi_row = utils::to_ffi_generic_row(pk_row);
+ auto ffi_result = lookuper_->lookup(ffi_row);
+ auto result = utils::from_ffi_result(ffi_result.result);
+ if (!result.Ok()) {
+ found = false;
+ return result;
+ }
+ found = ffi_result.found;
+ if (found) {
+ out = utils::from_ffi_generic_row(ffi_result.row);
+ }
+ return utils::make_ok();
+ } catch (const rust::Error& e) {
+ found = false;
+ return utils::make_error(1, e.what());
+ } catch (const std::exception& e) {
+ found = false;
+ return utils::make_error(1, e.what());
+ }
+}
+
+// Table KV methods
+Result Table::NewUpsertWriter(UpsertWriter& out) {
+ if (!Available()) {
+ return utils::make_error(1, "Table not available");
+ }
+
+ try {
+ out.writer_ = table_->new_upsert_writer();
+ return utils::make_ok();
+ } catch (const rust::Error& e) {
+ return utils::make_error(1, e.what());
+ } catch (const std::exception& e) {
+ return utils::make_error(1, e.what());
+ }
+}
+
+Result Table::NewUpsertWriter(UpsertWriter& out, const
std::vector<std::string>& column_names) {
+ if (!Available()) {
+ return utils::make_error(1, "Table not available");
+ }
+
+ try {
+ rust::Vec<rust::String> rust_names;
+ for (const auto& name : column_names) {
+ rust_names.push_back(rust::String(name));
+ }
+ out.writer_ =
table_->new_upsert_writer_with_column_names(std::move(rust_names));
+ return utils::make_ok();
+ } catch (const rust::Error& e) {
+ return utils::make_error(1, e.what());
+ } catch (const std::exception& e) {
+ return utils::make_error(1, e.what());
+ }
+}
+
+Result Table::NewUpsertWriter(UpsertWriter& out, const std::vector<size_t>&
column_indices) {
+ if (!Available()) {
+ return utils::make_error(1, "Table not available");
+ }
+
+ try {
+ rust::Vec<size_t> rust_indices;
+ for (size_t idx : column_indices) {
+ rust_indices.push_back(idx);
+ }
+ out.writer_ =
table_->new_upsert_writer_with_column_indices(std::move(rust_indices));
+ return utils::make_ok();
+ } catch (const rust::Error& e) {
+ return utils::make_error(1, e.what());
+ } catch (const std::exception& e) {
+ return utils::make_error(1, e.what());
+ }
+}
+
+Result Table::NewLookuper(Lookuper& out) {
+ if (!Available()) {
+ return utils::make_error(1, "Table not available");
+ }
+
+ try {
+ out.lookuper_ = table_->new_lookuper();
+ return utils::make_ok();
+ } catch (const rust::Error& e) {
+ return utils::make_error(1, e.what());
+ } catch (const std::exception& e) {
+ return utils::make_error(1, e.what());
+ }
+}
Review Comment:
`Table::NewLookuper` overwrites `out.lookuper_` without releasing any
existing lookuper held by `out`. Reusing a `Lookuper` instance would leak the
old handle. Consider calling `out.Destroy()` (or resetting) before assignment.
##########
bindings/cpp/src/table.cpp:
##########
@@ -299,6 +321,209 @@ Result AppendWriter::Flush() {
return utils::from_ffi_result(ffi_result);
}
+// UpsertWriter implementation
+UpsertWriter::UpsertWriter() noexcept = default;
+
+UpsertWriter::~UpsertWriter() noexcept { Destroy(); }
+
+void UpsertWriter::Destroy() noexcept {
+ if (writer_) {
+ ffi::delete_upsert_writer(writer_);
+ writer_ = nullptr;
+ }
+}
+
+UpsertWriter::UpsertWriter(UpsertWriter&& other) noexcept :
writer_(other.writer_) {
+ other.writer_ = nullptr;
+}
+
+UpsertWriter& UpsertWriter::operator=(UpsertWriter&& other) noexcept {
+ if (this != &other) {
+ Destroy();
+ writer_ = other.writer_;
+ other.writer_ = nullptr;
+ }
+ return *this;
+}
+
+bool UpsertWriter::Available() const { return writer_ != nullptr; }
+
+Result UpsertWriter::Upsert(const GenericRow& row) {
+ WriteResult wr;
+ return Upsert(row, wr);
+}
+
+Result UpsertWriter::Upsert(const GenericRow& row, WriteResult& out) {
+ if (!Available()) {
+ return utils::make_error(1, "UpsertWriter not available");
+ }
+
+ try {
+ auto ffi_row = utils::to_ffi_generic_row(row);
+ auto rust_box = writer_->upsert(ffi_row);
+ out.inner_ = rust_box.into_raw();
+ return utils::make_ok();
+ } catch (const rust::Error& e) {
+ return utils::make_error(1, e.what());
+ } catch (const std::exception& e) {
+ return utils::make_error(1, e.what());
+ }
+}
+
+Result UpsertWriter::Delete(const GenericRow& row) {
+ WriteResult wr;
+ return Delete(row, wr);
+}
+
+Result UpsertWriter::Delete(const GenericRow& row, WriteResult& out) {
+ if (!Available()) {
+ return utils::make_error(1, "UpsertWriter not available");
+ }
+
+ try {
+ auto ffi_row = utils::to_ffi_generic_row(row);
+ auto rust_box = writer_->delete_row(ffi_row);
+ out.inner_ = rust_box.into_raw();
+ return utils::make_ok();
+ } catch (const rust::Error& e) {
+ return utils::make_error(1, e.what());
+ } catch (const std::exception& e) {
+ return utils::make_error(1, e.what());
+ }
+}
+
+Result UpsertWriter::Flush() {
+ if (!Available()) {
+ return utils::make_error(1, "UpsertWriter not available");
+ }
+
+ auto ffi_result = writer_->upsert_flush();
+ return utils::from_ffi_result(ffi_result);
+}
+
+// Lookuper implementation
+Lookuper::Lookuper() noexcept = default;
+
+Lookuper::~Lookuper() noexcept { Destroy(); }
+
+void Lookuper::Destroy() noexcept {
+ if (lookuper_) {
+ ffi::delete_lookuper(lookuper_);
+ lookuper_ = nullptr;
+ }
+}
+
+Lookuper::Lookuper(Lookuper&& other) noexcept : lookuper_(other.lookuper_) {
+ other.lookuper_ = nullptr;
+}
+
+Lookuper& Lookuper::operator=(Lookuper&& other) noexcept {
+ if (this != &other) {
+ Destroy();
+ lookuper_ = other.lookuper_;
+ other.lookuper_ = nullptr;
+ }
+ return *this;
+}
+
+bool Lookuper::Available() const { return lookuper_ != nullptr; }
+
+Result Lookuper::Lookup(const GenericRow& pk_row, bool& found, GenericRow&
out) {
+ if (!Available()) {
+ return utils::make_error(1, "Lookuper not available");
+ }
+
+ try {
+ auto ffi_row = utils::to_ffi_generic_row(pk_row);
+ auto ffi_result = lookuper_->lookup(ffi_row);
+ auto result = utils::from_ffi_result(ffi_result.result);
+ if (!result.Ok()) {
+ found = false;
+ return result;
+ }
+ found = ffi_result.found;
+ if (found) {
+ out = utils::from_ffi_generic_row(ffi_result.row);
+ }
+ return utils::make_ok();
+ } catch (const rust::Error& e) {
+ found = false;
+ return utils::make_error(1, e.what());
+ } catch (const std::exception& e) {
+ found = false;
+ return utils::make_error(1, e.what());
+ }
+}
+
+// Table KV methods
+Result Table::NewUpsertWriter(UpsertWriter& out) {
+ if (!Available()) {
+ return utils::make_error(1, "Table not available");
+ }
+
+ try {
+ out.writer_ = table_->new_upsert_writer();
+ return utils::make_ok();
+ } catch (const rust::Error& e) {
+ return utils::make_error(1, e.what());
+ } catch (const std::exception& e) {
+ return utils::make_error(1, e.what());
+ }
+}
Review Comment:
`Table::NewUpsertWriter(...)` overwrites `out.writer_` without first
releasing any existing writer held by `out`. If callers reuse an `UpsertWriter`
instance, this will leak the previously owned writer. Call `out.Destroy()` (or
otherwise delete/reset the existing handle) before assigning the new pointer.
##########
bindings/cpp/src/table.cpp:
##########
@@ -299,6 +321,209 @@ Result AppendWriter::Flush() {
return utils::from_ffi_result(ffi_result);
}
+// UpsertWriter implementation
+UpsertWriter::UpsertWriter() noexcept = default;
+
+UpsertWriter::~UpsertWriter() noexcept { Destroy(); }
+
+void UpsertWriter::Destroy() noexcept {
+ if (writer_) {
+ ffi::delete_upsert_writer(writer_);
+ writer_ = nullptr;
+ }
+}
+
+UpsertWriter::UpsertWriter(UpsertWriter&& other) noexcept :
writer_(other.writer_) {
+ other.writer_ = nullptr;
+}
+
+UpsertWriter& UpsertWriter::operator=(UpsertWriter&& other) noexcept {
+ if (this != &other) {
+ Destroy();
+ writer_ = other.writer_;
+ other.writer_ = nullptr;
+ }
+ return *this;
+}
+
+bool UpsertWriter::Available() const { return writer_ != nullptr; }
+
+Result UpsertWriter::Upsert(const GenericRow& row) {
+ WriteResult wr;
+ return Upsert(row, wr);
+}
+
+Result UpsertWriter::Upsert(const GenericRow& row, WriteResult& out) {
+ if (!Available()) {
+ return utils::make_error(1, "UpsertWriter not available");
+ }
+
+ try {
+ auto ffi_row = utils::to_ffi_generic_row(row);
+ auto rust_box = writer_->upsert(ffi_row);
+ out.inner_ = rust_box.into_raw();
+ return utils::make_ok();
+ } catch (const rust::Error& e) {
+ return utils::make_error(1, e.what());
+ } catch (const std::exception& e) {
+ return utils::make_error(1, e.what());
+ }
+}
+
+Result UpsertWriter::Delete(const GenericRow& row) {
+ WriteResult wr;
+ return Delete(row, wr);
+}
+
+Result UpsertWriter::Delete(const GenericRow& row, WriteResult& out) {
+ if (!Available()) {
+ return utils::make_error(1, "UpsertWriter not available");
+ }
+
+ try {
+ auto ffi_row = utils::to_ffi_generic_row(row);
+ auto rust_box = writer_->delete_row(ffi_row);
+ out.inner_ = rust_box.into_raw();
+ return utils::make_ok();
+ } catch (const rust::Error& e) {
+ return utils::make_error(1, e.what());
+ } catch (const std::exception& e) {
+ return utils::make_error(1, e.what());
+ }
+}
+
+Result UpsertWriter::Flush() {
+ if (!Available()) {
+ return utils::make_error(1, "UpsertWriter not available");
+ }
+
+ auto ffi_result = writer_->upsert_flush();
+ return utils::from_ffi_result(ffi_result);
+}
+
+// Lookuper implementation
+Lookuper::Lookuper() noexcept = default;
+
+Lookuper::~Lookuper() noexcept { Destroy(); }
+
+void Lookuper::Destroy() noexcept {
+ if (lookuper_) {
+ ffi::delete_lookuper(lookuper_);
+ lookuper_ = nullptr;
+ }
+}
+
+Lookuper::Lookuper(Lookuper&& other) noexcept : lookuper_(other.lookuper_) {
+ other.lookuper_ = nullptr;
+}
+
+Lookuper& Lookuper::operator=(Lookuper&& other) noexcept {
+ if (this != &other) {
+ Destroy();
+ lookuper_ = other.lookuper_;
+ other.lookuper_ = nullptr;
+ }
+ return *this;
+}
+
+bool Lookuper::Available() const { return lookuper_ != nullptr; }
+
+Result Lookuper::Lookup(const GenericRow& pk_row, bool& found, GenericRow&
out) {
+ if (!Available()) {
+ return utils::make_error(1, "Lookuper not available");
+ }
+
+ try {
+ auto ffi_row = utils::to_ffi_generic_row(pk_row);
+ auto ffi_result = lookuper_->lookup(ffi_row);
+ auto result = utils::from_ffi_result(ffi_result.result);
+ if (!result.Ok()) {
+ found = false;
+ return result;
+ }
+ found = ffi_result.found;
+ if (found) {
+ out = utils::from_ffi_generic_row(ffi_result.row);
+ }
+ return utils::make_ok();
Review Comment:
When `found` is false, `Lookup` leaves `out` unchanged. If callers reuse the
same `GenericRow` for multiple lookups, they may accidentally read stale data
from a previous successful lookup. Consider explicitly resetting `out` (e.g.,
`out = GenericRow{}`) when `found` is false or when an error occurs.
##########
bindings/cpp/src/table.cpp:
##########
@@ -299,6 +321,209 @@ Result AppendWriter::Flush() {
return utils::from_ffi_result(ffi_result);
}
+// UpsertWriter implementation
+UpsertWriter::UpsertWriter() noexcept = default;
+
+UpsertWriter::~UpsertWriter() noexcept { Destroy(); }
+
+void UpsertWriter::Destroy() noexcept {
+ if (writer_) {
+ ffi::delete_upsert_writer(writer_);
+ writer_ = nullptr;
+ }
+}
+
+UpsertWriter::UpsertWriter(UpsertWriter&& other) noexcept :
writer_(other.writer_) {
+ other.writer_ = nullptr;
+}
+
+UpsertWriter& UpsertWriter::operator=(UpsertWriter&& other) noexcept {
+ if (this != &other) {
+ Destroy();
+ writer_ = other.writer_;
+ other.writer_ = nullptr;
+ }
+ return *this;
+}
+
+bool UpsertWriter::Available() const { return writer_ != nullptr; }
+
+Result UpsertWriter::Upsert(const GenericRow& row) {
+ WriteResult wr;
+ return Upsert(row, wr);
+}
+
+Result UpsertWriter::Upsert(const GenericRow& row, WriteResult& out) {
+ if (!Available()) {
+ return utils::make_error(1, "UpsertWriter not available");
+ }
+
+ try {
+ auto ffi_row = utils::to_ffi_generic_row(row);
+ auto rust_box = writer_->upsert(ffi_row);
+ out.inner_ = rust_box.into_raw();
+ return utils::make_ok();
+ } catch (const rust::Error& e) {
+ return utils::make_error(1, e.what());
+ } catch (const std::exception& e) {
+ return utils::make_error(1, e.what());
+ }
+}
+
+Result UpsertWriter::Delete(const GenericRow& row) {
+ WriteResult wr;
+ return Delete(row, wr);
+}
+
+Result UpsertWriter::Delete(const GenericRow& row, WriteResult& out) {
+ if (!Available()) {
+ return utils::make_error(1, "UpsertWriter not available");
+ }
+
+ try {
+ auto ffi_row = utils::to_ffi_generic_row(row);
+ auto rust_box = writer_->delete_row(ffi_row);
+ out.inner_ = rust_box.into_raw();
+ return utils::make_ok();
+ } catch (const rust::Error& e) {
+ return utils::make_error(1, e.what());
+ } catch (const std::exception& e) {
+ return utils::make_error(1, e.what());
+ }
+}
+
+Result UpsertWriter::Flush() {
+ if (!Available()) {
+ return utils::make_error(1, "UpsertWriter not available");
+ }
+
+ auto ffi_result = writer_->upsert_flush();
+ return utils::from_ffi_result(ffi_result);
+}
+
+// Lookuper implementation
+Lookuper::Lookuper() noexcept = default;
+
+Lookuper::~Lookuper() noexcept { Destroy(); }
+
+void Lookuper::Destroy() noexcept {
+ if (lookuper_) {
+ ffi::delete_lookuper(lookuper_);
+ lookuper_ = nullptr;
+ }
+}
+
+Lookuper::Lookuper(Lookuper&& other) noexcept : lookuper_(other.lookuper_) {
+ other.lookuper_ = nullptr;
+}
+
+Lookuper& Lookuper::operator=(Lookuper&& other) noexcept {
+ if (this != &other) {
+ Destroy();
+ lookuper_ = other.lookuper_;
+ other.lookuper_ = nullptr;
+ }
+ return *this;
+}
+
+bool Lookuper::Available() const { return lookuper_ != nullptr; }
+
+Result Lookuper::Lookup(const GenericRow& pk_row, bool& found, GenericRow&
out) {
+ if (!Available()) {
+ return utils::make_error(1, "Lookuper not available");
+ }
+
+ try {
+ auto ffi_row = utils::to_ffi_generic_row(pk_row);
+ auto ffi_result = lookuper_->lookup(ffi_row);
+ auto result = utils::from_ffi_result(ffi_result.result);
+ if (!result.Ok()) {
+ found = false;
+ return result;
+ }
+ found = ffi_result.found;
+ if (found) {
+ out = utils::from_ffi_generic_row(ffi_result.row);
+ }
+ return utils::make_ok();
+ } catch (const rust::Error& e) {
+ found = false;
+ return utils::make_error(1, e.what());
+ } catch (const std::exception& e) {
+ found = false;
+ return utils::make_error(1, e.what());
+ }
+}
+
+// Table KV methods
+Result Table::NewUpsertWriter(UpsertWriter& out) {
+ if (!Available()) {
+ return utils::make_error(1, "Table not available");
+ }
+
+ try {
+ out.writer_ = table_->new_upsert_writer();
+ return utils::make_ok();
+ } catch (const rust::Error& e) {
+ return utils::make_error(1, e.what());
+ } catch (const std::exception& e) {
+ return utils::make_error(1, e.what());
+ }
+}
+
+Result Table::NewUpsertWriter(UpsertWriter& out, const
std::vector<std::string>& column_names) {
+ if (!Available()) {
+ return utils::make_error(1, "Table not available");
+ }
+
+ try {
+ rust::Vec<rust::String> rust_names;
+ for (const auto& name : column_names) {
+ rust_names.push_back(rust::String(name));
+ }
+ out.writer_ =
table_->new_upsert_writer_with_column_names(std::move(rust_names));
+ return utils::make_ok();
+ } catch (const rust::Error& e) {
+ return utils::make_error(1, e.what());
+ } catch (const std::exception& e) {
+ return utils::make_error(1, e.what());
+ }
+}
+
+Result Table::NewUpsertWriter(UpsertWriter& out, const std::vector<size_t>&
column_indices) {
+ if (!Available()) {
+ return utils::make_error(1, "Table not available");
+ }
+
+ try {
+ rust::Vec<size_t> rust_indices;
+ for (size_t idx : column_indices) {
+ rust_indices.push_back(idx);
+ }
+ out.writer_ =
table_->new_upsert_writer_with_column_indices(std::move(rust_indices));
+ return utils::make_ok();
+ } catch (const rust::Error& e) {
+ return utils::make_error(1, e.what());
+ } catch (const std::exception& e) {
+ return utils::make_error(1, e.what());
+ }
+}
Review Comment:
`Table::NewUpsertWriter(..., column_indices)` overwrites `out.writer_`
without releasing an existing writer, which can leak if `out` is reused.
Consider resetting/destroying `out` before assignment.
##########
bindings/cpp/include/fluss.hpp:
##########
@@ -593,8 +597,68 @@ struct GenericRow {
fields[idx] = Datum::DecimalString(value);
}
+ // ── Name-based setters (require schema — see Table::NewRow()) ───
+ void Set(const std::string& name, std::nullptr_t) {
SetNull(Resolve(name)); }
+ void Set(const std::string& name, bool v) { SetBool(Resolve(name), v); }
+ void Set(const std::string& name, int32_t v) { SetInt32(Resolve(name), v);
}
+ void Set(const std::string& name, int64_t v) { SetInt64(Resolve(name), v);
}
+ void Set(const std::string& name, float v) { SetFloat32(Resolve(name), v);
}
+ void Set(const std::string& name, double v) { SetFloat64(Resolve(name),
v); }
+ // const char* overload to prevent "string literal" → bool conversion
+ void Set(const std::string& name, const char* v) {
+ auto [idx, type] = ResolveColumn(name);
+ if (type == TypeId::Decimal) {
+ SetDecimal(idx, v);
+ } else {
+ SetString(idx, v);
+ }
+ }
+ void Set(const std::string& name, std::string v) {
+ auto [idx, type] = ResolveColumn(name);
+ if (type == TypeId::Decimal) {
+ SetDecimal(idx, v);
+ } else {
+ SetString(idx, std::move(v));
+ }
+ }
+ void Set(const std::string& name, std::vector<uint8_t> v) {
+ SetBytes(Resolve(name), std::move(v));
+ }
+ void Set(const std::string& name, fluss::Date d) { SetDate(Resolve(name),
d); }
+ void Set(const std::string& name, fluss::Time t) { SetTime(Resolve(name),
t); }
+ void Set(const std::string& name, fluss::Timestamp ts) {
+ auto [idx, type] = ResolveColumn(name);
+ if (type == TypeId::TimestampLtz) {
+ SetTimestampLtz(idx, ts);
+ } else {
+ SetTimestampNtz(idx, ts);
Review Comment:
`GenericRow::Set(const std::string&, fluss::Timestamp)` routes everything
that is not `TypeId::TimestampLtz` to `SetTimestampNtz`, including
non-timestamp columns. This can silently create a row with a timestamp datum
for (e.g.) a String/Int column, leading to confusing failures later. Consider
validating `type` is one of the timestamp type IDs and throwing a clear error
if the column is not a timestamp.
```suggestion
} else if (type == TypeId::TimestampNtz) {
SetTimestampNtz(idx, ts);
} else {
throw std::runtime_error(
"GenericRow: column '" + name +
"' is not a timestamp column for Set(Timestamp)");
```
##########
bindings/cpp/src/table.cpp:
##########
@@ -299,6 +321,209 @@ Result AppendWriter::Flush() {
return utils::from_ffi_result(ffi_result);
}
+// UpsertWriter implementation
+UpsertWriter::UpsertWriter() noexcept = default;
+
+UpsertWriter::~UpsertWriter() noexcept { Destroy(); }
+
+void UpsertWriter::Destroy() noexcept {
+ if (writer_) {
+ ffi::delete_upsert_writer(writer_);
+ writer_ = nullptr;
+ }
+}
+
+UpsertWriter::UpsertWriter(UpsertWriter&& other) noexcept :
writer_(other.writer_) {
+ other.writer_ = nullptr;
+}
+
+UpsertWriter& UpsertWriter::operator=(UpsertWriter&& other) noexcept {
+ if (this != &other) {
+ Destroy();
+ writer_ = other.writer_;
+ other.writer_ = nullptr;
+ }
+ return *this;
+}
+
+bool UpsertWriter::Available() const { return writer_ != nullptr; }
+
+Result UpsertWriter::Upsert(const GenericRow& row) {
+ WriteResult wr;
+ return Upsert(row, wr);
+}
+
+Result UpsertWriter::Upsert(const GenericRow& row, WriteResult& out) {
+ if (!Available()) {
+ return utils::make_error(1, "UpsertWriter not available");
+ }
+
+ try {
+ auto ffi_row = utils::to_ffi_generic_row(row);
+ auto rust_box = writer_->upsert(ffi_row);
+ out.inner_ = rust_box.into_raw();
+ return utils::make_ok();
+ } catch (const rust::Error& e) {
+ return utils::make_error(1, e.what());
+ } catch (const std::exception& e) {
+ return utils::make_error(1, e.what());
+ }
+}
+
+Result UpsertWriter::Delete(const GenericRow& row) {
+ WriteResult wr;
+ return Delete(row, wr);
+}
+
+Result UpsertWriter::Delete(const GenericRow& row, WriteResult& out) {
+ if (!Available()) {
+ return utils::make_error(1, "UpsertWriter not available");
+ }
+
+ try {
+ auto ffi_row = utils::to_ffi_generic_row(row);
+ auto rust_box = writer_->delete_row(ffi_row);
+ out.inner_ = rust_box.into_raw();
+ return utils::make_ok();
+ } catch (const rust::Error& e) {
+ return utils::make_error(1, e.what());
+ } catch (const std::exception& e) {
+ return utils::make_error(1, e.what());
+ }
+}
+
+Result UpsertWriter::Flush() {
+ if (!Available()) {
+ return utils::make_error(1, "UpsertWriter not available");
+ }
+
+ auto ffi_result = writer_->upsert_flush();
+ return utils::from_ffi_result(ffi_result);
+}
+
+// Lookuper implementation
+Lookuper::Lookuper() noexcept = default;
+
+Lookuper::~Lookuper() noexcept { Destroy(); }
+
+void Lookuper::Destroy() noexcept {
+ if (lookuper_) {
+ ffi::delete_lookuper(lookuper_);
+ lookuper_ = nullptr;
+ }
+}
+
+Lookuper::Lookuper(Lookuper&& other) noexcept : lookuper_(other.lookuper_) {
+ other.lookuper_ = nullptr;
+}
+
+Lookuper& Lookuper::operator=(Lookuper&& other) noexcept {
+ if (this != &other) {
+ Destroy();
+ lookuper_ = other.lookuper_;
+ other.lookuper_ = nullptr;
+ }
+ return *this;
+}
+
+bool Lookuper::Available() const { return lookuper_ != nullptr; }
+
+Result Lookuper::Lookup(const GenericRow& pk_row, bool& found, GenericRow&
out) {
+ if (!Available()) {
+ return utils::make_error(1, "Lookuper not available");
+ }
+
+ try {
+ auto ffi_row = utils::to_ffi_generic_row(pk_row);
+ auto ffi_result = lookuper_->lookup(ffi_row);
+ auto result = utils::from_ffi_result(ffi_result.result);
+ if (!result.Ok()) {
+ found = false;
+ return result;
+ }
+ found = ffi_result.found;
+ if (found) {
+ out = utils::from_ffi_generic_row(ffi_result.row);
+ }
+ return utils::make_ok();
+ } catch (const rust::Error& e) {
+ found = false;
+ return utils::make_error(1, e.what());
+ } catch (const std::exception& e) {
+ found = false;
+ return utils::make_error(1, e.what());
+ }
+}
+
+// Table KV methods
+Result Table::NewUpsertWriter(UpsertWriter& out) {
+ if (!Available()) {
+ return utils::make_error(1, "Table not available");
+ }
+
+ try {
+ out.writer_ = table_->new_upsert_writer();
+ return utils::make_ok();
+ } catch (const rust::Error& e) {
+ return utils::make_error(1, e.what());
+ } catch (const std::exception& e) {
+ return utils::make_error(1, e.what());
+ }
+}
+
+Result Table::NewUpsertWriter(UpsertWriter& out, const
std::vector<std::string>& column_names) {
+ if (!Available()) {
+ return utils::make_error(1, "Table not available");
+ }
+
+ try {
+ rust::Vec<rust::String> rust_names;
+ for (const auto& name : column_names) {
+ rust_names.push_back(rust::String(name));
+ }
+ out.writer_ =
table_->new_upsert_writer_with_column_names(std::move(rust_names));
+ return utils::make_ok();
+ } catch (const rust::Error& e) {
+ return utils::make_error(1, e.what());
+ } catch (const std::exception& e) {
+ return utils::make_error(1, e.what());
+ }
+}
Review Comment:
`Table::NewUpsertWriter(..., column_names)` overwrites `out.writer_` without
releasing an existing writer, which can leak if `out` is reused. Consider
resetting/destroying `out` before assignment.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]