This is an automated email from the ASF dual-hosted git repository.

martinzink pushed a commit to branch apache-rusty
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/apache-rusty by this push:
     new 6e41a7609 MINIFICPP-2731 Allow cancellation from ProcessSession::write
6e41a7609 is described below

commit 6e41a76096c1ff06285ae7d45948d5ee41b322c8
Author: Martin Zink <[email protected]>
AuthorDate: Tue Feb 24 23:28:32 2026 +0100

    MINIFICPP-2731 Allow cancellation from ProcessSession::write
---
 .../include/core/BufferedContentSession.h          |   2 +
 core-framework/src/core/BufferedContentSession.cpp |   5 +
 .../tests/DBContentRepositoryTests.cpp             |   6 ++
 libminifi/include/core/ForwardingContentSession.h  |   2 +
 libminifi/src/core/ForwardingContentSession.cpp    |   6 ++
 libminifi/src/core/ProcessSession.cpp              |  15 ++-
 .../libtest/unit/ContentRepositoryDependentTests.h | 105 +++++++++++++++++----
 libminifi/test/unit/ProcessSessionTests.cpp        |  15 ++-
 minifi-api/include/minifi-c/minifi-c.h             |   5 +
 .../include/minifi-cpp/core/ContentSession.h       |   2 +
 10 files changed, 137 insertions(+), 26 deletions(-)

diff --git a/core-framework/include/core/BufferedContentSession.h 
b/core-framework/include/core/BufferedContentSession.h
index f98987735..c32fb1e5d 100644
--- a/core-framework/include/core/BufferedContentSession.h
+++ b/core-framework/include/core/BufferedContentSession.h
@@ -44,6 +44,8 @@ class BufferedContentSession : public ContentSessionImpl {
 
   std::shared_ptr<io::BaseStream> read(const std::shared_ptr<ResourceClaim>& 
resource_id) override;
 
+  void remove(const std::shared_ptr<ResourceClaim>& resource_id) override;
+
   void commit() override;
 
   void rollback() override;
diff --git a/core-framework/src/core/BufferedContentSession.cpp 
b/core-framework/src/core/BufferedContentSession.cpp
index 418da5c16..7fbb90f84 100644
--- a/core-framework/src/core/BufferedContentSession.cpp
+++ b/core-framework/src/core/BufferedContentSession.cpp
@@ -95,6 +95,11 @@ void BufferedContentSession::commit() {
   append_state_.clear();
 }
 
+void BufferedContentSession::remove(const std::shared_ptr<ResourceClaim>& 
resource_id) {
+  managed_resources_.erase(resource_id);
+  append_state_.erase(resource_id);
+}
+
 void BufferedContentSession::rollback() {
   managed_resources_.clear();
   append_state_.clear();
diff --git a/extensions/rocksdb-repos/tests/DBContentRepositoryTests.cpp 
b/extensions/rocksdb-repos/tests/DBContentRepositoryTests.cpp
index 50195748b..f7e6fe04a 100644
--- a/extensions/rocksdb-repos/tests/DBContentRepositoryTests.cpp
+++ b/extensions/rocksdb-repos/tests/DBContentRepositoryTests.cpp
@@ -317,6 +317,12 @@ TEST_CASE("ProcessSession::read can read zero length 
flowfiles without crash (Ro
   
ContentRepositoryDependentTests::testReadFromZeroLengthFlowFile(std::make_shared<core::repository::DatabaseContentRepository>());
 }
 
+TEST_CASE("ProcessSession::write can be cancelled") {
+  
ContentRepositoryDependentTests::testOkWrite(std::make_shared<core::repository::DatabaseContentRepository>());
+  
ContentRepositoryDependentTests::testErrWrite(std::make_shared<core::repository::DatabaseContentRepository>());
+  
ContentRepositoryDependentTests::testCancelWrite(std::make_shared<core::repository::DatabaseContentRepository>());
+}
+
 size_t getDbSize(const std::filesystem::path& dir) {
   auto db = minifi::internal::RocksDatabase::create({}, {}, dir.string(), {});
   auto opendb = db->open();
diff --git a/libminifi/include/core/ForwardingContentSession.h 
b/libminifi/include/core/ForwardingContentSession.h
index 620c71fa3..2699ee745 100644
--- a/libminifi/include/core/ForwardingContentSession.h
+++ b/libminifi/include/core/ForwardingContentSession.h
@@ -43,6 +43,8 @@ class ForwardingContentSession : public ContentSessionImpl {
 
   std::shared_ptr<io::BaseStream> read(const std::shared_ptr<ResourceClaim>& 
resource_id) override;
 
+  void remove(const std::shared_ptr<ResourceClaim>& resource_id) override;
+
   void commit() override;
 
   void rollback() override;
diff --git a/libminifi/src/core/ForwardingContentSession.cpp 
b/libminifi/src/core/ForwardingContentSession.cpp
index cc7999ee8..17c715f37 100644
--- a/libminifi/src/core/ForwardingContentSession.cpp
+++ b/libminifi/src/core/ForwardingContentSession.cpp
@@ -51,6 +51,12 @@ std::shared_ptr<io::BaseStream> 
ForwardingContentSession::append(const std::shar
   return repository_->write(*resource_id, true);
 }
 
+void ForwardingContentSession::remove(const std::shared_ptr<ResourceClaim>& 
resource_id) {
+  created_claims_.erase(resource_id);
+  append_state_.erase(resource_id);
+  repository_->remove(*resource_id);
+}
+
 void ForwardingContentSession::commit() {
   created_claims_.clear();
   append_state_.clear();
diff --git a/libminifi/src/core/ProcessSession.cpp 
b/libminifi/src/core/ProcessSession.cpp
index 576ed3a91..edf13c900 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -31,10 +31,11 @@
 #include <vector>
 
 #include "core/ProcessSessionReadCallback.h"
-#include "io/StreamSlice.h"
+#include "core/Processor.h"
 #include "io/StreamPipe.h"
+#include "io/StreamSlice.h"
+#include "minifi-c/minifi-c.h"
 #include "minifi-cpp/utils/gsl.h"
-#include "core/Processor.h"
 
 /* This implementation is only for native Windows systems.  */
 #if (defined _WIN32 || defined __WIN32__) && !defined __CYGWIN__
@@ -256,7 +257,15 @@ void ProcessSessionImpl::write(core::FlowFile &flow, const 
io::OutputStreamCallb
     if (nullptr == stream) {
       throw Exception(FILE_OPERATION_EXCEPTION, "Failed to open flowfile 
content for write");
     }
-    if (callback(stream) < 0) {
+    const auto callback_result = callback(stream);
+    if (callback_result == MinifiIoStatus::MINIFI_IO_CANCEL) {
+      stream->close();
+      content_session_->remove(claim);
+      claim.reset();
+      return;
+    }
+
+    if (callback_result < 0) {
       throw Exception(FILE_OPERATION_EXCEPTION, "Failed to process flowfile 
content");
     }
 
diff --git a/libminifi/test/libtest/unit/ContentRepositoryDependentTests.h 
b/libminifi/test/libtest/unit/ContentRepositoryDependentTests.h
index f7d6bb710..b15bc2f47 100644
--- a/libminifi/test/libtest/unit/ContentRepositoryDependentTests.h
+++ b/libminifi/test/libtest/unit/ContentRepositoryDependentTests.h
@@ -16,19 +16,19 @@
  * limitations under the License.
  */
 
+#pragma once
+
 #include <array>
 #include <memory>
 #include <string>
-#include <vector>
 
 #include "catch2/catch_test_macros.hpp"
-#include "core/Processor.h"
 #include "core/ProcessSession.h"
-#include "core/Resource.h"
-#include "unit/TestBase.h"
-#include "unit/Catch.h"
-#include "unit/DummyProcessor.h"
+#include "core/Processor.h"
 #include "io/StreamPipe.h"
+#include "minifi-c/minifi-c.h"
+#include "unit/DummyProcessor.h"
+#include "unit/TestBase.h"
 
 #pragma once
 
@@ -42,11 +42,11 @@ struct ReadUntilItCan {
     std::array<std::byte, 1024> buffer{};
     size_t bytes_read = 0;
     while (true) {
-      size_t read_result = stream->read(buffer);
+      const size_t read_result = stream->read(buffer);
       if (minifi::io::isError(read_result))
         return -1;
       if (read_result == 0)
-        return bytes_read;
+        return gsl::narrow<int64_t>(bytes_read);
       bytes_read += read_result;
       const auto char_view = gsl::make_span(buffer).subspan(0, 
read_result).as_span<const char>();
       value_.append(std::begin(char_view), std::end(char_view));
@@ -69,18 +69,18 @@ class Fixture {
     process_session_ = std::make_unique<core::ProcessSessionImpl>(context_);
   }
 
-  core::ProcessSession &processSession() { return *process_session_; }
+  [[nodiscard]] core::ProcessSession& processSession() const { return 
*process_session_; }
 
-  void transferAndCommit(const std::shared_ptr<core::FlowFile>& flow_file) {
+  void transferAndCommit(const std::shared_ptr<core::FlowFile>& flow_file) 
const {
     process_session_->transfer(flow_file, Success);
     process_session_->commit();
   }
 
-  void writeToFlowFile(const std::shared_ptr<core::FlowFile>& flow_file, const 
std::string content) {
+  void writeToFlowFile(const std::shared_ptr<core::FlowFile>& flow_file, const 
std::string_view content) const {
     process_session_->writeBuffer(flow_file, content);
   }
 
-  void appendToFlowFile(const std::shared_ptr<core::FlowFile>& flow_file, 
const std::string content_to_append) {
+  void appendToFlowFile(const std::shared_ptr<core::FlowFile>& flow_file, 
const std::string_view content_to_append) const {
     process_session_->add(flow_file);
     process_session_->appendBuffer(flow_file, content_to_append);
   }
@@ -94,8 +94,8 @@ class Fixture {
   std::unique_ptr<core::ProcessSession> process_session_;
 };
 
-void testReadOnSmallerClonedFlowFiles(std::shared_ptr<core::ContentRepository> 
content_repo) {
-  Fixture fixture = Fixture(std::move(content_repo));
+inline void 
testReadOnSmallerClonedFlowFiles(std::shared_ptr<core::ContentRepository> 
content_repo) {
+  auto fixture = Fixture(std::move(content_repo));
   core::ProcessSession& process_session = fixture.processSession();
   const auto original_ff = process_session.create();
   fixture.writeToFlowFile(original_ff, "foobar");
@@ -123,8 +123,8 @@ void 
testReadOnSmallerClonedFlowFiles(std::shared_ptr<core::ContentRepository> c
   CHECK(read_until_it_can_callback.value_ == "bar");
 }
 
-void testAppendToUnmanagedFlowFile(std::shared_ptr<core::ContentRepository> 
content_repo) {
-  Fixture fixture = Fixture(std::move(content_repo));
+inline void 
testAppendToUnmanagedFlowFile(std::shared_ptr<core::ContentRepository> 
content_repo) {
+  auto fixture = Fixture(std::move(content_repo));
   core::ProcessSession& process_session = fixture.processSession();
   const auto flow_file = process_session.create();
   REQUIRE(flow_file);
@@ -142,8 +142,8 @@ void 
testAppendToUnmanagedFlowFile(std::shared_ptr<core::ContentRepository> cont
   CHECK(read_until_it_can_callback.value_ == "myfoobar");
 }
 
-void testAppendToManagedFlowFile(std::shared_ptr<core::ContentRepository> 
content_repo) {
-  Fixture fixture = Fixture(std::move(content_repo));
+inline void 
testAppendToManagedFlowFile(std::shared_ptr<core::ContentRepository> 
content_repo) {
+  auto fixture = Fixture(std::move(content_repo));
   core::ProcessSession& process_session = fixture.processSession();
   const auto flow_file = process_session.create();
   REQUIRE(flow_file);
@@ -160,8 +160,8 @@ void 
testAppendToManagedFlowFile(std::shared_ptr<core::ContentRepository> conten
   CHECK(read_until_it_can_callback.value_ == "myfoobar");
 }
 
-void testReadFromZeroLengthFlowFile(std::shared_ptr<core::ContentRepository> 
content_repo) {
-  Fixture fixture = Fixture(std::move(content_repo));
+inline void 
testReadFromZeroLengthFlowFile(std::shared_ptr<core::ContentRepository> 
content_repo) {
+  const auto fixture = Fixture(std::move(content_repo));
   core::ProcessSession& process_session = fixture.processSession();
   const auto flow_file = process_session.create();
   REQUIRE(flow_file);
@@ -171,4 +171,69 @@ void 
testReadFromZeroLengthFlowFile(std::shared_ptr<core::ContentRepository> con
   REQUIRE_NOTHROW(process_session.readBuffer(flow_file));
   REQUIRE_NOTHROW(process_session.read(flow_file, ReadUntilItCan{}));
 }
+
+inline void testErrWrite(std::shared_ptr<core::ContentRepository> 
content_repo) {
+  const auto fixture = Fixture(std::move(content_repo));
+  core::ProcessSession& process_session = fixture.processSession();
+  const auto flow_file = process_session.create();
+  fixture.writeToFlowFile(flow_file, "original_content");
+
+  REQUIRE_THROWS(
+  process_session.write(flow_file, [](const 
std::shared_ptr<minifi::io::OutputStream>& output_stream) {
+    std::string str = "new_content";
+    output_stream->write(as_bytes(std::span(str)));
+    return MinifiIoStatus::MINIFI_IO_ERROR;
+  }));
+
+  fixture.transferAndCommit(flow_file);
+
+  CHECK(flow_file->getSize() == 16);
+  ReadUntilItCan read_until_it_can_callback;
+  const auto read_result = process_session.readBuffer(flow_file);
+  process_session.read(flow_file, std::ref(read_until_it_can_callback));
+  CHECK(to_string(read_result) == "original_content");
+}
+
+inline void testOkWrite(std::shared_ptr<core::ContentRepository> content_repo) 
{
+  const auto fixture = Fixture(std::move(content_repo));
+  core::ProcessSession& process_session = fixture.processSession();
+  const auto flow_file = process_session.create();
+  fixture.writeToFlowFile(flow_file, "original_content");
+
+  CHECK(flow_file->getSize() == 16);
+
+  process_session.write(flow_file, [](const 
std::shared_ptr<minifi::io::OutputStream>& output_stream) {
+    std::string str = "new_content";
+    return output_stream->write(as_bytes(std::span(str)));
+  });
+
+  fixture.transferAndCommit(flow_file);
+
+  CHECK(flow_file->getSize() == 11);
+  ReadUntilItCan read_until_it_can_callback;
+  const auto read_result = process_session.readBuffer(flow_file);
+  process_session.read(flow_file, std::ref(read_until_it_can_callback));
+  CHECK(to_string(read_result) == "new_content");
+}
+
+inline void testCancelWrite(std::shared_ptr<core::ContentRepository> 
content_repo) {
+  const auto fixture = Fixture(std::move(content_repo));
+  core::ProcessSession& process_session = fixture.processSession();
+  const auto flow_file = process_session.create();
+  fixture.writeToFlowFile(flow_file, "original_content");
+
+  process_session.write(flow_file, [](const 
std::shared_ptr<minifi::io::OutputStream>& output_stream) {
+    std::string str = "new_content";
+    output_stream->write(as_bytes(std::span(str)));
+    return MinifiIoStatus::MINIFI_IO_CANCEL;
+  });
+
+  fixture.transferAndCommit(flow_file);
+
+  CHECK(flow_file->getSize() == 16);
+  ReadUntilItCan read_until_it_can_callback;
+  const auto read_result = process_session.readBuffer(flow_file);
+  process_session.read(flow_file, std::ref(read_until_it_can_callback));
+  CHECK(to_string(read_result) == "original_content");
+}
 }  // namespace ContentRepositoryDependentTests
diff --git a/libminifi/test/unit/ProcessSessionTests.cpp 
b/libminifi/test/unit/ProcessSessionTests.cpp
index 1987b00d6..5bd4d08ff 100644
--- a/libminifi/test/unit/ProcessSessionTests.cpp
+++ b/libminifi/test/unit/ProcessSessionTests.cpp
@@ -20,7 +20,6 @@
 #include <string>
 
 #include "core/ProcessSession.h"
-#include "core/Resource.h"
 #include "unit/TestBase.h"
 #include "unit/Catch.h"
 #include "unit/ContentRepositoryDependentTests.h"
@@ -34,13 +33,13 @@ class Fixture {
  public:
   explicit Fixture(TestController::PlanConfig config = {}): 
plan_config_(std::move(config)) {}
 
-  minifi::core::ProcessSession &processSession() { return *process_session_; }
+  [[nodiscard]] minifi::core::ProcessSession& processSession() const { return 
*process_session_; }
 
  private:
   TestController test_controller_;
   TestController::PlanConfig plan_config_;
   std::shared_ptr<TestPlan> test_plan_ = 
test_controller_.createPlan(plan_config_);
-  minifi::core::Processor* dummy_processor_ = 
test_plan_->addProcessor("DummyProcessor", "dummyProcessor");
+  [[maybe_unused]] minifi::core::Processor* dummy_processor_ = 
test_plan_->addProcessor("DummyProcessor", "dummyProcessor");
   std::shared_ptr<minifi::core::ProcessContext> context_ = [this] { 
test_plan_->runNextProcessor(); return test_plan_->getCurrentContext(); }();
   std::unique_ptr<minifi::core::ProcessSession> process_session_ = 
std::make_unique<core::ProcessSessionImpl>(context_);
 };
@@ -127,3 +126,13 @@ TEST_CASE("ProcessSession::read can read zero length 
flowfiles without crash", "
   
ContentRepositoryDependentTests::testReadFromZeroLengthFlowFile(std::make_shared<core::repository::VolatileContentRepository>());
   
ContentRepositoryDependentTests::testReadFromZeroLengthFlowFile(std::make_shared<core::repository::FileSystemRepository>());
 }
+
+TEST_CASE("Test ProcessSession::write's possible outcomes") {
+  
ContentRepositoryDependentTests::testOkWrite(std::make_shared<core::repository::VolatileContentRepository>());
+  
ContentRepositoryDependentTests::testErrWrite(std::make_shared<core::repository::VolatileContentRepository>());
+  
ContentRepositoryDependentTests::testCancelWrite(std::make_shared<core::repository::VolatileContentRepository>());
+
+  
ContentRepositoryDependentTests::testOkWrite(std::make_shared<core::repository::FileSystemRepository>());
+  
ContentRepositoryDependentTests::testErrWrite(std::make_shared<core::repository::FileSystemRepository>());
+  
ContentRepositoryDependentTests::testCancelWrite(std::make_shared<core::repository::FileSystemRepository>());
+}
diff --git a/minifi-api/include/minifi-c/minifi-c.h 
b/minifi-api/include/minifi-c/minifi-c.h
index 0b2a110c8..fe27ca565 100644
--- a/minifi-api/include/minifi-c/minifi-c.h
+++ b/minifi-api/include/minifi-c/minifi-c.h
@@ -48,6 +48,11 @@ extern "C" {
 
 typedef bool MinifiBool;
 
+typedef enum MinifiIoStatus : int64_t {
+  MINIFI_IO_ERROR = -1,
+  MINIFI_IO_CANCEL = -125
+} MinifiIoStatus;
+
 typedef enum MinifiInputRequirement : uint32_t {
   MINIFI_INPUT_REQUIRED = 0,
   MINIFI_INPUT_ALLOWED = 1,
diff --git a/minifi-api/include/minifi-cpp/core/ContentSession.h 
b/minifi-api/include/minifi-cpp/core/ContentSession.h
index ce12eab9f..6d2fdaa44 100644
--- a/minifi-api/include/minifi-cpp/core/ContentSession.h
+++ b/minifi-api/include/minifi-cpp/core/ContentSession.h
@@ -39,6 +39,8 @@ class ContentSession {
 
   virtual std::shared_ptr<io::BaseStream> read(const 
std::shared_ptr<ResourceClaim>& resource_id) = 0;
 
+  virtual void remove(const std::shared_ptr<ResourceClaim>& resource_id) = 0;
+
   virtual void commit() = 0;
 
   virtual void rollback() = 0;

Reply via email to