This is an automated email from the ASF dual-hosted git repository.
martinzink pushed a commit to branch apache-rusty
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/apache-rusty by this push:
new 6e41a7609 MINIFICPP-2731 Allow cancellation from ProcessSession::write
6e41a7609 is described below
commit 6e41a76096c1ff06285ae7d45948d5ee41b322c8
Author: Martin Zink <[email protected]>
AuthorDate: Tue Feb 24 23:28:32 2026 +0100
MINIFICPP-2731 Allow cancellation from ProcessSession::write
---
.../include/core/BufferedContentSession.h | 2 +
core-framework/src/core/BufferedContentSession.cpp | 5 +
.../tests/DBContentRepositoryTests.cpp | 6 ++
libminifi/include/core/ForwardingContentSession.h | 2 +
libminifi/src/core/ForwardingContentSession.cpp | 6 ++
libminifi/src/core/ProcessSession.cpp | 15 ++-
.../libtest/unit/ContentRepositoryDependentTests.h | 105 +++++++++++++++++----
libminifi/test/unit/ProcessSessionTests.cpp | 15 ++-
minifi-api/include/minifi-c/minifi-c.h | 5 +
.../include/minifi-cpp/core/ContentSession.h | 2 +
10 files changed, 137 insertions(+), 26 deletions(-)
diff --git a/core-framework/include/core/BufferedContentSession.h
b/core-framework/include/core/BufferedContentSession.h
index f98987735..c32fb1e5d 100644
--- a/core-framework/include/core/BufferedContentSession.h
+++ b/core-framework/include/core/BufferedContentSession.h
@@ -44,6 +44,8 @@ class BufferedContentSession : public ContentSessionImpl {
std::shared_ptr<io::BaseStream> read(const std::shared_ptr<ResourceClaim>&
resource_id) override;
+ void remove(const std::shared_ptr<ResourceClaim>& resource_id) override;
+
void commit() override;
void rollback() override;
diff --git a/core-framework/src/core/BufferedContentSession.cpp
b/core-framework/src/core/BufferedContentSession.cpp
index 418da5c16..7fbb90f84 100644
--- a/core-framework/src/core/BufferedContentSession.cpp
+++ b/core-framework/src/core/BufferedContentSession.cpp
@@ -95,6 +95,11 @@ void BufferedContentSession::commit() {
append_state_.clear();
}
+void BufferedContentSession::remove(const std::shared_ptr<ResourceClaim>&
resource_id) {
+ managed_resources_.erase(resource_id);
+ append_state_.erase(resource_id);
+}
+
void BufferedContentSession::rollback() {
managed_resources_.clear();
append_state_.clear();
diff --git a/extensions/rocksdb-repos/tests/DBContentRepositoryTests.cpp
b/extensions/rocksdb-repos/tests/DBContentRepositoryTests.cpp
index 50195748b..f7e6fe04a 100644
--- a/extensions/rocksdb-repos/tests/DBContentRepositoryTests.cpp
+++ b/extensions/rocksdb-repos/tests/DBContentRepositoryTests.cpp
@@ -317,6 +317,12 @@ TEST_CASE("ProcessSession::read can read zero length
flowfiles without crash (Ro
ContentRepositoryDependentTests::testReadFromZeroLengthFlowFile(std::make_shared<core::repository::DatabaseContentRepository>());
}
+TEST_CASE("ProcessSession::write can be cancelled") {
+
ContentRepositoryDependentTests::testOkWrite(std::make_shared<core::repository::DatabaseContentRepository>());
+
ContentRepositoryDependentTests::testErrWrite(std::make_shared<core::repository::DatabaseContentRepository>());
+
ContentRepositoryDependentTests::testCancelWrite(std::make_shared<core::repository::DatabaseContentRepository>());
+}
+
size_t getDbSize(const std::filesystem::path& dir) {
auto db = minifi::internal::RocksDatabase::create({}, {}, dir.string(), {});
auto opendb = db->open();
diff --git a/libminifi/include/core/ForwardingContentSession.h
b/libminifi/include/core/ForwardingContentSession.h
index 620c71fa3..2699ee745 100644
--- a/libminifi/include/core/ForwardingContentSession.h
+++ b/libminifi/include/core/ForwardingContentSession.h
@@ -43,6 +43,8 @@ class ForwardingContentSession : public ContentSessionImpl {
std::shared_ptr<io::BaseStream> read(const std::shared_ptr<ResourceClaim>&
resource_id) override;
+ void remove(const std::shared_ptr<ResourceClaim>& resource_id) override;
+
void commit() override;
void rollback() override;
diff --git a/libminifi/src/core/ForwardingContentSession.cpp
b/libminifi/src/core/ForwardingContentSession.cpp
index cc7999ee8..17c715f37 100644
--- a/libminifi/src/core/ForwardingContentSession.cpp
+++ b/libminifi/src/core/ForwardingContentSession.cpp
@@ -51,6 +51,12 @@ std::shared_ptr<io::BaseStream>
ForwardingContentSession::append(const std::shar
return repository_->write(*resource_id, true);
}
+void ForwardingContentSession::remove(const std::shared_ptr<ResourceClaim>&
resource_id) {
+ created_claims_.erase(resource_id);
+ append_state_.erase(resource_id);
+ repository_->remove(*resource_id);
+}
+
void ForwardingContentSession::commit() {
created_claims_.clear();
append_state_.clear();
diff --git a/libminifi/src/core/ProcessSession.cpp
b/libminifi/src/core/ProcessSession.cpp
index 576ed3a91..edf13c900 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -31,10 +31,11 @@
#include <vector>
#include "core/ProcessSessionReadCallback.h"
-#include "io/StreamSlice.h"
+#include "core/Processor.h"
#include "io/StreamPipe.h"
+#include "io/StreamSlice.h"
+#include "minifi-c/minifi-c.h"
#include "minifi-cpp/utils/gsl.h"
-#include "core/Processor.h"
/* This implementation is only for native Windows systems. */
#if (defined _WIN32 || defined __WIN32__) && !defined __CYGWIN__
@@ -256,7 +257,15 @@ void ProcessSessionImpl::write(core::FlowFile &flow, const
io::OutputStreamCallb
if (nullptr == stream) {
throw Exception(FILE_OPERATION_EXCEPTION, "Failed to open flowfile
content for write");
}
- if (callback(stream) < 0) {
+ const auto callback_result = callback(stream);
+ if (callback_result == MinifiIoStatus::MINIFI_IO_CANCEL) {
+ stream->close();
+ content_session_->remove(claim);
+ claim.reset();
+ return;
+ }
+
+ if (callback_result < 0) {
throw Exception(FILE_OPERATION_EXCEPTION, "Failed to process flowfile
content");
}
diff --git a/libminifi/test/libtest/unit/ContentRepositoryDependentTests.h
b/libminifi/test/libtest/unit/ContentRepositoryDependentTests.h
index f7d6bb710..b15bc2f47 100644
--- a/libminifi/test/libtest/unit/ContentRepositoryDependentTests.h
+++ b/libminifi/test/libtest/unit/ContentRepositoryDependentTests.h
@@ -16,19 +16,19 @@
* limitations under the License.
*/
+#pragma once
+
#include <array>
#include <memory>
#include <string>
-#include <vector>
#include "catch2/catch_test_macros.hpp"
-#include "core/Processor.h"
#include "core/ProcessSession.h"
-#include "core/Resource.h"
-#include "unit/TestBase.h"
-#include "unit/Catch.h"
-#include "unit/DummyProcessor.h"
+#include "core/Processor.h"
#include "io/StreamPipe.h"
+#include "minifi-c/minifi-c.h"
+#include "unit/DummyProcessor.h"
+#include "unit/TestBase.h"
#pragma once
@@ -42,11 +42,11 @@ struct ReadUntilItCan {
std::array<std::byte, 1024> buffer{};
size_t bytes_read = 0;
while (true) {
- size_t read_result = stream->read(buffer);
+ const size_t read_result = stream->read(buffer);
if (minifi::io::isError(read_result))
return -1;
if (read_result == 0)
- return bytes_read;
+ return gsl::narrow<int64_t>(bytes_read);
bytes_read += read_result;
const auto char_view = gsl::make_span(buffer).subspan(0,
read_result).as_span<const char>();
value_.append(std::begin(char_view), std::end(char_view));
@@ -69,18 +69,18 @@ class Fixture {
process_session_ = std::make_unique<core::ProcessSessionImpl>(context_);
}
- core::ProcessSession &processSession() { return *process_session_; }
+ [[nodiscard]] core::ProcessSession& processSession() const { return
*process_session_; }
- void transferAndCommit(const std::shared_ptr<core::FlowFile>& flow_file) {
+ void transferAndCommit(const std::shared_ptr<core::FlowFile>& flow_file)
const {
process_session_->transfer(flow_file, Success);
process_session_->commit();
}
- void writeToFlowFile(const std::shared_ptr<core::FlowFile>& flow_file, const
std::string content) {
+ void writeToFlowFile(const std::shared_ptr<core::FlowFile>& flow_file, const
std::string_view content) const {
process_session_->writeBuffer(flow_file, content);
}
- void appendToFlowFile(const std::shared_ptr<core::FlowFile>& flow_file,
const std::string content_to_append) {
+ void appendToFlowFile(const std::shared_ptr<core::FlowFile>& flow_file,
const std::string_view content_to_append) const {
process_session_->add(flow_file);
process_session_->appendBuffer(flow_file, content_to_append);
}
@@ -94,8 +94,8 @@ class Fixture {
std::unique_ptr<core::ProcessSession> process_session_;
};
-void testReadOnSmallerClonedFlowFiles(std::shared_ptr<core::ContentRepository>
content_repo) {
- Fixture fixture = Fixture(std::move(content_repo));
+inline void
testReadOnSmallerClonedFlowFiles(std::shared_ptr<core::ContentRepository>
content_repo) {
+ auto fixture = Fixture(std::move(content_repo));
core::ProcessSession& process_session = fixture.processSession();
const auto original_ff = process_session.create();
fixture.writeToFlowFile(original_ff, "foobar");
@@ -123,8 +123,8 @@ void
testReadOnSmallerClonedFlowFiles(std::shared_ptr<core::ContentRepository> c
CHECK(read_until_it_can_callback.value_ == "bar");
}
-void testAppendToUnmanagedFlowFile(std::shared_ptr<core::ContentRepository>
content_repo) {
- Fixture fixture = Fixture(std::move(content_repo));
+inline void
testAppendToUnmanagedFlowFile(std::shared_ptr<core::ContentRepository>
content_repo) {
+ auto fixture = Fixture(std::move(content_repo));
core::ProcessSession& process_session = fixture.processSession();
const auto flow_file = process_session.create();
REQUIRE(flow_file);
@@ -142,8 +142,8 @@ void
testAppendToUnmanagedFlowFile(std::shared_ptr<core::ContentRepository> cont
CHECK(read_until_it_can_callback.value_ == "myfoobar");
}
-void testAppendToManagedFlowFile(std::shared_ptr<core::ContentRepository>
content_repo) {
- Fixture fixture = Fixture(std::move(content_repo));
+inline void
testAppendToManagedFlowFile(std::shared_ptr<core::ContentRepository>
content_repo) {
+ auto fixture = Fixture(std::move(content_repo));
core::ProcessSession& process_session = fixture.processSession();
const auto flow_file = process_session.create();
REQUIRE(flow_file);
@@ -160,8 +160,8 @@ void
testAppendToManagedFlowFile(std::shared_ptr<core::ContentRepository> conten
CHECK(read_until_it_can_callback.value_ == "myfoobar");
}
-void testReadFromZeroLengthFlowFile(std::shared_ptr<core::ContentRepository>
content_repo) {
- Fixture fixture = Fixture(std::move(content_repo));
+inline void
testReadFromZeroLengthFlowFile(std::shared_ptr<core::ContentRepository>
content_repo) {
+ const auto fixture = Fixture(std::move(content_repo));
core::ProcessSession& process_session = fixture.processSession();
const auto flow_file = process_session.create();
REQUIRE(flow_file);
@@ -171,4 +171,69 @@ void
testReadFromZeroLengthFlowFile(std::shared_ptr<core::ContentRepository> con
REQUIRE_NOTHROW(process_session.readBuffer(flow_file));
REQUIRE_NOTHROW(process_session.read(flow_file, ReadUntilItCan{}));
}
+
+inline void testErrWrite(std::shared_ptr<core::ContentRepository>
content_repo) {
+ const auto fixture = Fixture(std::move(content_repo));
+ core::ProcessSession& process_session = fixture.processSession();
+ const auto flow_file = process_session.create();
+ fixture.writeToFlowFile(flow_file, "original_content");
+
+ REQUIRE_THROWS(
+ process_session.write(flow_file, [](const
std::shared_ptr<minifi::io::OutputStream>& output_stream) {
+ std::string str = "new_content";
+ output_stream->write(as_bytes(std::span(str)));
+ return MinifiIoStatus::MINIFI_IO_ERROR;
+ }));
+
+ fixture.transferAndCommit(flow_file);
+
+ CHECK(flow_file->getSize() == 16);
+ ReadUntilItCan read_until_it_can_callback;
+ const auto read_result = process_session.readBuffer(flow_file);
+ process_session.read(flow_file, std::ref(read_until_it_can_callback));
+ CHECK(to_string(read_result) == "original_content");
+}
+
+inline void testOkWrite(std::shared_ptr<core::ContentRepository> content_repo)
{
+ const auto fixture = Fixture(std::move(content_repo));
+ core::ProcessSession& process_session = fixture.processSession();
+ const auto flow_file = process_session.create();
+ fixture.writeToFlowFile(flow_file, "original_content");
+
+ CHECK(flow_file->getSize() == 16);
+
+ process_session.write(flow_file, [](const
std::shared_ptr<minifi::io::OutputStream>& output_stream) {
+ std::string str = "new_content";
+ return output_stream->write(as_bytes(std::span(str)));
+ });
+
+ fixture.transferAndCommit(flow_file);
+
+ CHECK(flow_file->getSize() == 11);
+ ReadUntilItCan read_until_it_can_callback;
+ const auto read_result = process_session.readBuffer(flow_file);
+ process_session.read(flow_file, std::ref(read_until_it_can_callback));
+ CHECK(to_string(read_result) == "new_content");
+}
+
+inline void testCancelWrite(std::shared_ptr<core::ContentRepository>
content_repo) {
+ const auto fixture = Fixture(std::move(content_repo));
+ core::ProcessSession& process_session = fixture.processSession();
+ const auto flow_file = process_session.create();
+ fixture.writeToFlowFile(flow_file, "original_content");
+
+ process_session.write(flow_file, [](const
std::shared_ptr<minifi::io::OutputStream>& output_stream) {
+ std::string str = "new_content";
+ output_stream->write(as_bytes(std::span(str)));
+ return MinifiIoStatus::MINIFI_IO_CANCEL;
+ });
+
+ fixture.transferAndCommit(flow_file);
+
+ CHECK(flow_file->getSize() == 16);
+ ReadUntilItCan read_until_it_can_callback;
+ const auto read_result = process_session.readBuffer(flow_file);
+ process_session.read(flow_file, std::ref(read_until_it_can_callback));
+ CHECK(to_string(read_result) == "original_content");
+}
} // namespace ContentRepositoryDependentTests
diff --git a/libminifi/test/unit/ProcessSessionTests.cpp
b/libminifi/test/unit/ProcessSessionTests.cpp
index 1987b00d6..5bd4d08ff 100644
--- a/libminifi/test/unit/ProcessSessionTests.cpp
+++ b/libminifi/test/unit/ProcessSessionTests.cpp
@@ -20,7 +20,6 @@
#include <string>
#include "core/ProcessSession.h"
-#include "core/Resource.h"
#include "unit/TestBase.h"
#include "unit/Catch.h"
#include "unit/ContentRepositoryDependentTests.h"
@@ -34,13 +33,13 @@ class Fixture {
public:
explicit Fixture(TestController::PlanConfig config = {}):
plan_config_(std::move(config)) {}
- minifi::core::ProcessSession &processSession() { return *process_session_; }
+ [[nodiscard]] minifi::core::ProcessSession& processSession() const { return
*process_session_; }
private:
TestController test_controller_;
TestController::PlanConfig plan_config_;
std::shared_ptr<TestPlan> test_plan_ =
test_controller_.createPlan(plan_config_);
- minifi::core::Processor* dummy_processor_ =
test_plan_->addProcessor("DummyProcessor", "dummyProcessor");
+ [[maybe_unused]] minifi::core::Processor* dummy_processor_ =
test_plan_->addProcessor("DummyProcessor", "dummyProcessor");
std::shared_ptr<minifi::core::ProcessContext> context_ = [this] {
test_plan_->runNextProcessor(); return test_plan_->getCurrentContext(); }();
std::unique_ptr<minifi::core::ProcessSession> process_session_ =
std::make_unique<core::ProcessSessionImpl>(context_);
};
@@ -127,3 +126,13 @@ TEST_CASE("ProcessSession::read can read zero length
flowfiles without crash", "
ContentRepositoryDependentTests::testReadFromZeroLengthFlowFile(std::make_shared<core::repository::VolatileContentRepository>());
ContentRepositoryDependentTests::testReadFromZeroLengthFlowFile(std::make_shared<core::repository::FileSystemRepository>());
}
+
+TEST_CASE("Test ProcessSession::write's possible outcomes") {
+
ContentRepositoryDependentTests::testOkWrite(std::make_shared<core::repository::VolatileContentRepository>());
+
ContentRepositoryDependentTests::testErrWrite(std::make_shared<core::repository::VolatileContentRepository>());
+
ContentRepositoryDependentTests::testCancelWrite(std::make_shared<core::repository::VolatileContentRepository>());
+
+
ContentRepositoryDependentTests::testOkWrite(std::make_shared<core::repository::FileSystemRepository>());
+
ContentRepositoryDependentTests::testErrWrite(std::make_shared<core::repository::FileSystemRepository>());
+
ContentRepositoryDependentTests::testCancelWrite(std::make_shared<core::repository::FileSystemRepository>());
+}
diff --git a/minifi-api/include/minifi-c/minifi-c.h
b/minifi-api/include/minifi-c/minifi-c.h
index 0b2a110c8..fe27ca565 100644
--- a/minifi-api/include/minifi-c/minifi-c.h
+++ b/minifi-api/include/minifi-c/minifi-c.h
@@ -48,6 +48,11 @@ extern "C" {
typedef bool MinifiBool;
+typedef enum MinifiIoStatus : int64_t {
+ MINIFI_IO_ERROR = -1,
+ MINIFI_IO_CANCEL = -125
+} MinifiIoStatus;
+
typedef enum MinifiInputRequirement : uint32_t {
MINIFI_INPUT_REQUIRED = 0,
MINIFI_INPUT_ALLOWED = 1,
diff --git a/minifi-api/include/minifi-cpp/core/ContentSession.h
b/minifi-api/include/minifi-cpp/core/ContentSession.h
index ce12eab9f..6d2fdaa44 100644
--- a/minifi-api/include/minifi-cpp/core/ContentSession.h
+++ b/minifi-api/include/minifi-cpp/core/ContentSession.h
@@ -39,6 +39,8 @@ class ContentSession {
virtual std::shared_ptr<io::BaseStream> read(const
std::shared_ptr<ResourceClaim>& resource_id) = 0;
+ virtual void remove(const std::shared_ptr<ResourceClaim>& resource_id) = 0;
+
virtual void commit() = 0;
virtual void rollback() = 0;