This is an automated email from the ASF dual-hosted git repository.

lordgamez pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new be51e246c MINIFICPP-2230 MergeContent should yield if no ready bins 
and no incoming flowfiles
be51e246c is described below

commit be51e246cded86ced5a125bd240aa5ef74aca9fb
Author: Martin Zink <martinz...@apache.org>
AuthorDate: Wed Mar 27 14:24:25 2024 +0100

    MINIFICPP-2230 MergeContent should yield if no ready bins and no incoming 
flowfiles
    
    Co-authored-by: Ferenc Gerlits <fgerl...@gmail.com>
    Co-authored-by: Gabor Gyimesi <gamezb...@gmail.com>
    Signed-off-by: Gabor Gyimesi <gamezb...@gmail.com>
    
    This closes #1748
---
 extensions/libarchive/BinFiles.cpp             | 16 +++++++++++++---
 extensions/libarchive/BinFiles.h               |  2 +-
 extensions/libarchive/tests/MergeFileTests.cpp | 24 ++++++++++++++++++++++++
 3 files changed, 38 insertions(+), 4 deletions(-)

diff --git a/extensions/libarchive/BinFiles.cpp 
b/extensions/libarchive/BinFiles.cpp
index e2982973d..b0687d46e 100644
--- a/extensions/libarchive/BinFiles.cpp
+++ b/extensions/libarchive/BinFiles.cpp
@@ -223,11 +223,14 @@ bool BinFiles::resurrectFlowFiles(core::ProcessSession 
&session) {
   return had_failure;
 }
 
-void BinFiles::assumeOwnershipOfNextBatch(core::ProcessSession &session) {
+bool BinFiles::assumeOwnershipOfNextBatch(core::ProcessSession &session) {
   for (size_t i = 0; i < batchSize_; ++i) {
     auto flow = session.get();
 
     if (flow == nullptr) {
+      if (i == 0) {  // Batch didn't contain a single flowfile, we should 
yield if there are no ready bins either
+        return false;
+      }
       break;
     }
 
@@ -242,6 +245,7 @@ void 
BinFiles::assumeOwnershipOfNextBatch(core::ProcessSession &session) {
     session.transfer(flow, Self);
   }
   session.commit();
+  return true;
 }
 
 void BinFiles::processReadyBins(std::deque<std::unique_ptr<Bin>> ready_bins, 
core::ProcessSession &session) {
@@ -283,8 +287,14 @@ void BinFiles::onTrigger(core::ProcessContext& context, 
core::ProcessSession& se
     return;
   }
 
-  assumeOwnershipOfNextBatch(session);
-  processReadyBins(gatherReadyBins(context), session);
+  const bool valid_batch = assumeOwnershipOfNextBatch(session);
+  if (auto ready_bins = gatherReadyBins(context); ready_bins.empty()) {
+    if (!valid_batch) {
+      yield();
+    }
+  } else {
+    processReadyBins(std::move(ready_bins), session);
+  }
 }
 
 void BinFiles::transferFlowsToFail(core::ProcessSession &session, 
std::unique_ptr<Bin> &bin) {
diff --git a/extensions/libarchive/BinFiles.h b/extensions/libarchive/BinFiles.h
index ab33e2c64..3dbb8fe86 100644
--- a/extensions/libarchive/BinFiles.h
+++ b/extensions/libarchive/BinFiles.h
@@ -279,7 +279,7 @@ class BinFiles : public core::Processor {
 
   // Sort flow files retrieved from the flow file repository after restart to 
their respective bins
   bool resurrectFlowFiles(core::ProcessSession &session);
-  void assumeOwnershipOfNextBatch(core::ProcessSession &session);
+  bool assumeOwnershipOfNextBatch(core::ProcessSession &session);
   std::deque<std::unique_ptr<Bin>> gatherReadyBins(core::ProcessContext 
&context);
   void processReadyBins(std::deque<std::unique_ptr<Bin>> ready_bins, 
core::ProcessSession &session);
 
diff --git a/extensions/libarchive/tests/MergeFileTests.cpp 
b/extensions/libarchive/tests/MergeFileTests.cpp
index a98a18cf2..8c3015712 100644
--- a/extensions/libarchive/tests/MergeFileTests.cpp
+++ b/extensions/libarchive/tests/MergeFileTests.cpp
@@ -36,6 +36,7 @@
 #include "MergeContent.h"
 #include "processors/LogAttribute.h"
 #include "TestBase.h"
+#include "SingleProcessorTestController.h"
 #include "Catch.h"
 #include "unit/ProvenanceTestHelper.h"
 #include "serialization/FlowFileV3Serializer.h"
@@ -899,3 +900,26 @@ TEST_CASE_METHOD(MergeTestController, "Maximum Group Size 
is respected", "[testM
   REQUIRE(expiredFlowRecords.empty());
   REQUIRE_FALSE(flow3);
 }
+
+TEST_CASE("Empty MergeContent yields") {
+  const auto merge_content = 
std::make_shared<minifi::processors::MergeContent>("mergeContent");
+
+  minifi::test::SingleProcessorTestController controller{merge_content};
+  controller.trigger();
+
+  CHECK(merge_content->isYield());
+}
+
+TEST_CASE("Empty MergeContent doesnt yield when processing readybins") {
+  const auto merge_content = 
std::make_shared<minifi::processors::MergeContent>("mergeContent");
+
+  minifi::test::SingleProcessorTestController controller{merge_content};
+  controller.plan->setProperty(merge_content, 
minifi::processors::MergeContent::MaxBinAge, "100ms");
+  controller.plan->setProperty(merge_content, 
minifi::processors::MergeContent::MinEntries, "2");
+
+  auto first_trigger_results = controller.trigger("foo");
+  CHECK_FALSE(merge_content->isYield());
+  std::this_thread::sleep_for(100ms);
+  auto second_trigger_results = controller.trigger();
+  CHECK_FALSE(merge_content->isYield());
+}

Reply via email to