This is an automated email from the ASF dual-hosted git repository. lordgamez pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/main by this push: new be51e246c MINIFICPP-2230 MergeContent should yield if no ready bins and no incoming flowfiles be51e246c is described below commit be51e246cded86ced5a125bd240aa5ef74aca9fb Author: Martin Zink <martinz...@apache.org> AuthorDate: Wed Mar 27 14:24:25 2024 +0100 MINIFICPP-2230 MergeContent should yield if no ready bins and no incoming flowfiles Co-authored-by: Ferenc Gerlits <fgerl...@gmail.com> Co-authored-by: Gabor Gyimesi <gamezb...@gmail.com> Signed-off-by: Gabor Gyimesi <gamezb...@gmail.com> This closes #1748 --- extensions/libarchive/BinFiles.cpp | 16 +++++++++++++--- extensions/libarchive/BinFiles.h | 2 +- extensions/libarchive/tests/MergeFileTests.cpp | 24 ++++++++++++++++++++++++ 3 files changed, 38 insertions(+), 4 deletions(-) diff --git a/extensions/libarchive/BinFiles.cpp b/extensions/libarchive/BinFiles.cpp index e2982973d..b0687d46e 100644 --- a/extensions/libarchive/BinFiles.cpp +++ b/extensions/libarchive/BinFiles.cpp @@ -223,11 +223,14 @@ bool BinFiles::resurrectFlowFiles(core::ProcessSession &session) { return had_failure; } -void BinFiles::assumeOwnershipOfNextBatch(core::ProcessSession &session) { +bool BinFiles::assumeOwnershipOfNextBatch(core::ProcessSession &session) { for (size_t i = 0; i < batchSize_; ++i) { auto flow = session.get(); if (flow == nullptr) { + if (i == 0) { // Batch didn't contain a single flowfile, we should yield if there are no ready bins either + return false; + } break; } @@ -242,6 +245,7 @@ void BinFiles::assumeOwnershipOfNextBatch(core::ProcessSession &session) { session.transfer(flow, Self); } session.commit(); + return true; } void BinFiles::processReadyBins(std::deque<std::unique_ptr<Bin>> ready_bins, core::ProcessSession &session) { @@ -283,8 +287,14 @@ void BinFiles::onTrigger(core::ProcessContext& context, core::ProcessSession& se return; } - assumeOwnershipOfNextBatch(session); - processReadyBins(gatherReadyBins(context), session); + const bool valid_batch = assumeOwnershipOfNextBatch(session); + if (auto ready_bins = gatherReadyBins(context); ready_bins.empty()) { + if (!valid_batch) { + yield(); + } + } else { + processReadyBins(std::move(ready_bins), session); + } } void BinFiles::transferFlowsToFail(core::ProcessSession &session, std::unique_ptr<Bin> &bin) { diff --git a/extensions/libarchive/BinFiles.h b/extensions/libarchive/BinFiles.h index ab33e2c64..3dbb8fe86 100644 --- a/extensions/libarchive/BinFiles.h +++ b/extensions/libarchive/BinFiles.h @@ -279,7 +279,7 @@ class BinFiles : public core::Processor { // Sort flow files retrieved from the flow file repository after restart to their respective bins bool resurrectFlowFiles(core::ProcessSession &session); - void assumeOwnershipOfNextBatch(core::ProcessSession &session); + bool assumeOwnershipOfNextBatch(core::ProcessSession &session); std::deque<std::unique_ptr<Bin>> gatherReadyBins(core::ProcessContext &context); void processReadyBins(std::deque<std::unique_ptr<Bin>> ready_bins, core::ProcessSession &session); diff --git a/extensions/libarchive/tests/MergeFileTests.cpp b/extensions/libarchive/tests/MergeFileTests.cpp index a98a18cf2..8c3015712 100644 --- a/extensions/libarchive/tests/MergeFileTests.cpp +++ b/extensions/libarchive/tests/MergeFileTests.cpp @@ -36,6 +36,7 @@ #include "MergeContent.h" #include "processors/LogAttribute.h" #include "TestBase.h" +#include "SingleProcessorTestController.h" #include "Catch.h" #include "unit/ProvenanceTestHelper.h" #include "serialization/FlowFileV3Serializer.h" @@ -899,3 +900,26 @@ TEST_CASE_METHOD(MergeTestController, "Maximum Group Size is respected", "[testM REQUIRE(expiredFlowRecords.empty()); REQUIRE_FALSE(flow3); } + +TEST_CASE("Empty MergeContent yields") { + const auto merge_content = std::make_shared<minifi::processors::MergeContent>("mergeContent"); + + minifi::test::SingleProcessorTestController controller{merge_content}; + controller.trigger(); + + CHECK(merge_content->isYield()); +} + +TEST_CASE("Empty MergeContent doesnt yield when processing readybins") { + const auto merge_content = std::make_shared<minifi::processors::MergeContent>("mergeContent"); + + minifi::test::SingleProcessorTestController controller{merge_content}; + controller.plan->setProperty(merge_content, minifi::processors::MergeContent::MaxBinAge, "100ms"); + controller.plan->setProperty(merge_content, minifi::processors::MergeContent::MinEntries, "2"); + + auto first_trigger_results = controller.trigger("foo"); + CHECK_FALSE(merge_content->isYield()); + std::this_thread::sleep_for(100ms); + auto second_trigger_results = controller.trigger(); + CHECK_FALSE(merge_content->isYield()); +}