This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 1a8ae2f30 MINIFICPP-2291 Fix the site-to-site transfer or large files
1a8ae2f30 is described below

commit 1a8ae2f30fd6465cfa5dce062b70fbc846ff8940
Author: Ferenc Gerlits <fgerl...@gmail.com>
AuthorDate: Thu Jan 25 16:37:52 2024 +0100

    MINIFICPP-2291 Fix the site-to-site transfer or large files
    
    Previously, the site-to-site client treated the 
TRANSACTION_FINISHED_BUT_DESTINATION_FULL
    message from the server as a failure, and kept rolling back and retrying 
the flow file.
    Now, we treat it as success but add a yield so the server has time to clear 
the incoming
    connection, the same way as NiFi does.
    
    Closes #1720
    
    Signed-off-by: Marton Szasz <sza...@apache.org>
---
 docker/test/integration/features/s2s.feature              | 15 +++++++++++++++
 .../flow_serialization/Nifi_flow_json_serializer.py       |  4 ++--
 extensions/standard-processors/processors/GetFile.cpp     |  2 +-
 libminifi/include/sitetosite/SiteToSiteClient.h           |  2 +-
 libminifi/src/core/FlowConfiguration.cpp                  |  2 +-
 libminifi/src/sitetosite/RawSocketProtocol.cpp            |  2 +-
 libminifi/src/sitetosite/SiteToSiteClient.cpp             | 15 ++++++++++-----
 7 files changed, 31 insertions(+), 11 deletions(-)

diff --git a/docker/test/integration/features/s2s.feature 
b/docker/test/integration/features/s2s.feature
index 6b5f2e6f0..c236f2ac7 100644
--- a/docker/test/integration/features/s2s.feature
+++ b/docker/test/integration/features/s2s.feature
@@ -34,6 +34,21 @@ Feature: Sending data from MiNiFi-C++ to NiFi using S2S 
protocol
 
     When both instances start up
     Then a flowfile with the content "test" is placed in the monitored 
directory in less than 90 seconds
+    And the Minifi logs do not contain the following message: "ProcessSession 
rollback" after 1 seconds
+
+  Scenario: A MiNiFi instance produces and transfers a large data file to a 
NiFi instance via s2s
+    Given a GetFile processor with the "Input Directory" property set to 
"/tmp/input"
+    And a file with the content "this is a very long file we want to send by 
site-to-site" is present in "/tmp/input"
+    And a RemoteProcessGroup node opened on 
"http://nifi-${feature_id}:8080/nifi";
+    And the "success" relationship of the GetFile processor is connected to 
the input port on the RemoteProcessGroup
+
+    And a NiFi flow receiving data from a RemoteProcessGroup "from-minifi" on 
port 8080
+    And a PutFile processor with the "Directory" property set to "/tmp/output" 
in the "nifi" flow
+    And the "success" relationship of the from-minifi is connected to the 
PutFile
+
+    When both instances start up
+    Then a flowfile with the content "this is a very long file we want to send 
by site-to-site" is placed in the monitored directory in less than 90 seconds
+    And the Minifi logs do not contain the following message: "ProcessSession 
rollback" after 1 seconds
 
   Scenario: Zero length files are transfered between via s2s if the "drop 
empty" connection property is false
     Given a MiNiFi CPP server with yaml config
diff --git 
a/docker/test/integration/minifi/flow_serialization/Nifi_flow_json_serializer.py
 
b/docker/test/integration/minifi/flow_serialization/Nifi_flow_json_serializer.py
index 2c24f2d36..fa3f36ac6 100644
--- 
a/docker/test/integration/minifi/flow_serialization/Nifi_flow_json_serializer.py
+++ 
b/docker/test/integration/minifi/flow_serialization/Nifi_flow_json_serializer.py
@@ -210,8 +210,8 @@ class Nifi_flow_json_serializer:
                     "labelIndex": 1,
                     "zIndex": 0,
                     "selectedRelationships": [conn_name] if not 
isinstance(connectable, InputPort) else [""],
-                    "backPressureObjectThreshold": 10000,
-                    "backPressureDataSizeThreshold": "1 GB",
+                    "backPressureObjectThreshold": 10,
+                    "backPressureDataSizeThreshold": "50 B",
                     "flowFileExpiration": "0 sec",
                     "prioritizers": [],
                     "bends": [],
diff --git a/extensions/standard-processors/processors/GetFile.cpp 
b/extensions/standard-processors/processors/GetFile.cpp
index b7c5e174b..318996730 100644
--- a/extensions/standard-processors/processors/GetFile.cpp
+++ b/extensions/standard-processors/processors/GetFile.cpp
@@ -78,7 +78,7 @@ void GetFile::onSchedule(core::ProcessContext& context, 
core::ProcessSessionFact
 
   if (auto directory_str = context.getProperty(Directory)) {
     if (!utils::file::is_directory(*directory_str)) {
-      throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Input Directory \"" + value 
+ "\" is not a directory");
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, 
utils::string::join_pack("Input Directory \"", *directory_str, "\" is not a 
directory"));
     }
     request_.inputDirectory = *directory_str;
   } else {
diff --git a/libminifi/include/sitetosite/SiteToSiteClient.h 
b/libminifi/include/sitetosite/SiteToSiteClient.h
index a39e4d008..53f4b2f29 100644
--- a/libminifi/include/sitetosite/SiteToSiteClient.h
+++ b/libminifi/include/sitetosite/SiteToSiteClient.h
@@ -201,7 +201,7 @@ class SiteToSiteClient : public core::Connectable {
   // Cancel the transaction
   virtual void cancel(const utils::Identifier &transactionID);
   // Complete the transaction
-  virtual bool complete(const utils::Identifier &transactionID);
+  virtual bool complete(core::ProcessContext& context, const utils::Identifier 
&transactionID);
   // Error the transaction
   virtual void error(const utils::Identifier &transactionID);
 
diff --git a/libminifi/src/core/FlowConfiguration.cpp 
b/libminifi/src/core/FlowConfiguration.cpp
index f955372b0..81729a1fb 100644
--- a/libminifi/src/core/FlowConfiguration.cpp
+++ b/libminifi/src/core/FlowConfiguration.cpp
@@ -143,7 +143,7 @@ bool FlowConfiguration::persist(const std::string& 
serialized_flow) {
   }
 
   const bool status = filesystem_->write(*config_path_, serialized_flow);
-  logger_->log_info("Result of updating the config file {}: {}", config_path_, 
status ? "success" : "failure");
+  logger_->log_info("Result of updating the config file {}: {}", 
*config_path_, status ? "success" : "failure");
   checksum_calculator_.invalidateChecksum();
   return status;
 }
diff --git a/libminifi/src/sitetosite/RawSocketProtocol.cpp 
b/libminifi/src/sitetosite/RawSocketProtocol.cpp
index 040029888..1a28cfc35 100644
--- a/libminifi/src/sitetosite/RawSocketProtocol.cpp
+++ b/libminifi/src/sitetosite/RawSocketProtocol.cpp
@@ -595,7 +595,7 @@ bool 
RawSiteToSiteClient::transmitPayload(core::ProcessContext& context, core::P
     if (!confirm(transactionID)) {
       throw Exception(SITE2SITE_EXCEPTION, "Confirm Failed in transaction " + 
transactionID.to_string());
     }
-    if (!complete(transactionID)) {
+    if (!complete(context, transactionID)) {
       throw Exception(SITE2SITE_EXCEPTION, "Complete Failed in transaction " + 
transactionID.to_string());
     }
     logger_->log_info("Site2Site transaction {} successfully send flow record 
{} content bytes {}", transactionID.to_string(), 
transaction->current_transfers_, transaction->_bytes);
diff --git a/libminifi/src/sitetosite/SiteToSiteClient.cpp 
b/libminifi/src/sitetosite/SiteToSiteClient.cpp
index 429ac70f5..725e200f3 100644
--- a/libminifi/src/sitetosite/SiteToSiteClient.cpp
+++ b/libminifi/src/sitetosite/SiteToSiteClient.cpp
@@ -164,7 +164,7 @@ bool 
SiteToSiteClient::transferFlowFiles(core::ProcessContext& context, core::Pr
     if (!confirm(transactionID)) {
       throw Exception(SITE2SITE_EXCEPTION, "Confirm Failed for " + 
transactionID.to_string());
     }
-    if (!complete(transactionID)) {
+    if (!complete(context, transactionID)) {
       throw Exception(SITE2SITE_EXCEPTION, "Complete Failed for " + 
transactionID.to_string());
     }
     logger_->log_debug("Site2Site transaction {} successfully sent flow record 
{}, content bytes {}", transactionID.to_string(), 
transaction->total_transfers_, transaction->_bytes);
@@ -336,7 +336,7 @@ void SiteToSiteClient::error(const utils::Identifier& 
transactionID) {
 }
 
 // Complete the transaction
-bool SiteToSiteClient::complete(const utils::Identifier& transactionID) {
+bool SiteToSiteClient::complete(core::ProcessContext& context, const 
utils::Identifier& transactionID) {
   int ret = 0;
   std::shared_ptr<Transaction> transaction = nullptr;
 
@@ -382,12 +382,17 @@ bool SiteToSiteClient::complete(const utils::Identifier& 
transactionID) {
     if (ret <= 0)
       return false;
 
-    if (code == TRANSACTION_FINISHED) {
+    if (code == TRANSACTION_FINISHED || code == 
TRANSACTION_FINISHED_BUT_DESTINATION_FULL) {
       logger_->log_info("Site2Site transaction {} peer finished transaction", 
transactionID.to_string());
       transaction->_state = TRANSACTION_COMPLETED;
+
+      if (code == TRANSACTION_FINISHED_BUT_DESTINATION_FULL) {
+        logger_->log_info("Site2Site transaction {} reported destination full, 
yielding", transactionID.to_string());
+        context.yield();
+      }
       return true;
     } else {
-      logger_->log_warn("Site2Site transaction {} peer unknown respond code 
{}", transactionID.to_string(), magic_enum::enum_underlying(code));
+      logger_->log_warn("Site2Site transaction {} peer unexpected respond code 
{}: {}", transactionID.to_string(), magic_enum::enum_underlying(code), 
magic_enum::enum_name(code));
       return false;
     }
   }
@@ -718,7 +723,7 @@ bool 
SiteToSiteClient::receiveFlowFiles(core::ProcessContext& context, core::Pro
     if (transfers > 0 && !confirm(transactionID)) {
       throw Exception(SITE2SITE_EXCEPTION, "Confirm Transaction Failed");
     }
-    if (!complete(transactionID)) {
+    if (!complete(context, transactionID)) {
       std::stringstream transaction_str;
       transaction_str << "Complete Transaction " << transactionID.to_string() 
<< " Failed";
       throw Exception(SITE2SITE_EXCEPTION, transaction_str.str());

Reply via email to