This is an automated email from the ASF dual-hosted git repository.

aboda pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit c5d58e8836723ba232440d457c6ff1be811bd075
Author: Gabor Gyimesi <gamezb...@gmail.com>
AuthorDate: Thu Oct 20 13:13:46 2022 +0200

    MINIFICPP-1967 Add batch processing of lines in TailFile
    
    Signed-off-by: Arpad Boda <ab...@apache.org>
    
    This closes #1439
---
 PROCESSORS.md                                      |  1 +
 .../standard-processors/processors/TailFile.cpp    | 14 +++++++++-
 .../standard-processors/processors/TailFile.h      | 23 ++++++-----------
 .../tests/unit/TailFileTests.cpp                   | 30 ++++++++++++++++++++++
 4 files changed, 52 insertions(+), 16 deletions(-)

diff --git a/PROCESSORS.md b/PROCESSORS.md
index 8e7e6e328..8a0aac5de 100644
--- a/PROCESSORS.md
+++ b/PROCESSORS.md
@@ -2424,6 +2424,7 @@ In the list below, the names of required properties 
appear in bold. Any other pr
 | State File                 | TailFileState     |                             
                           | Specifies the file that should be used for storing 
state about what data has been ingested so that upon restart NiFi can resume 
from where it left off                                                          
                                                                                
                                                                                
                 [...]
 | tail-base-directory        |                   |                             
                           | Base directory used to look for files to tail. 
This property is required when using Multiple file mode. Can contain expression 
language placeholders if Attribute Provider Service is set.<br/>**Supports 
Expression Language: true**                                                     
                                                                                
                       [...]
 | **tail-mode**              | Single file       | Single file<br>Multiple 
file<br>                       | Specifies the tail file mode. In 'Single file' 
mode only a single file will be watched. In 'Multiple file' mode a regex may be 
used. Note that in multiple file mode we will still continue to watch for 
rollover on the initial set of watched files. The Regex used to locate multiple 
files will be run during the schedule phrase. Note that if rotated files are 
matched by the regex, thos [...]
+| **Batch Size**             | 0                 |                             
                           | Maximum number of flowfiles emitted in a single 
trigger. If set to 0 all new content will be processed.                         
                                                                                
                                                                                
                                                                                
                 [...]
 ### Relationships
 
 | Name    | Description                     |
diff --git a/extensions/standard-processors/processors/TailFile.cpp 
b/extensions/standard-processors/processors/TailFile.cpp
index 7a3f20194..abfd6784b 100644
--- a/extensions/standard-processors/processors/TailFile.cpp
+++ b/extensions/standard-processors/processors/TailFile.cpp
@@ -135,6 +135,13 @@ const core::Property TailFile::AttributeProviderService(
         ->asType<minifi::controllers::AttributeProviderService>()
         ->build());
 
+const core::Property TailFile::BatchSize(
+    core::PropertyBuilder::createProperty("Batch Size")
+        ->withDescription("Maximum number of flowfiles emitted in a single 
trigger. If set to 0 all new content will be processed.")
+        ->isRequired(true)
+        ->withDefaultValue<uint32_t>(0)
+        ->build());
+
 const core::Relationship TailFile::Success("success", "All files are routed to 
success");
 
 const char *TailFile::CURRENT_STR = "CURRENT.";
@@ -395,6 +402,11 @@ void TailFile::onSchedule(const 
std::shared_ptr<core::ProcessContext> &context,
   context->getProperty(RollingFilenamePattern.getName(), 
rolling_filename_pattern_glob);
   rolling_filename_pattern_ = 
utils::file::globToRegex(rolling_filename_pattern_glob);
   initial_start_position_ = 
InitialStartPositions{utils::parsePropertyWithAllowableValuesOrThrow(*context, 
InitialStartPosition.getName(), InitialStartPositions::values())};
+
+  uint32_t batch_size = 0;
+  if (context->getProperty(BatchSize.getName(), batch_size) && batch_size != 
0) {
+    batch_size_ = batch_size;
+  }
 }
 
 void TailFile::parseAttributeProviderServiceProperty(core::ProcessContext& 
context) {
@@ -784,7 +796,7 @@ void TailFile::processSingleFile(const 
std::shared_ptr<core::ProcessSession> &se
     FileReaderCallback file_reader{full_file_name, state.position_, delim, 
state.checksum_};
     TailState state_copy{state};
 
-    while (file_reader.hasMoreToRead()) {
+    while (file_reader.hasMoreToRead() && (!batch_size_ || *batch_size_ > 
num_flow_files)) {
       auto flow_file = session->create();
       session->write(flow_file, std::ref(file_reader));
 
diff --git a/extensions/standard-processors/processors/TailFile.h 
b/extensions/standard-processors/processors/TailFile.h
index 58a9b1122..88cf7c018 100644
--- a/extensions/standard-processors/processors/TailFile.h
+++ b/extensions/standard-processors/processors/TailFile.h
@@ -17,8 +17,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef EXTENSIONS_STANDARD_PROCESSORS_PROCESSORS_TAILFILE_H_
-#define EXTENSIONS_STANDARD_PROCESSORS_PROCESSORS_TAILFILE_H_
+#pragma once
 
 #include <map>
 #include <memory>
@@ -27,6 +26,7 @@
 #include <unordered_map>
 #include <vector>
 #include <set>
+#include <optional>
 
 #include "controllers/AttributeProviderService.h"
 #include "FlowFileRecord.h"
@@ -37,11 +37,7 @@
 #include "utils/Enum.h"
 #include "utils/Export.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace processors {
+namespace org::apache::nifi::minifi::processors {
 
 struct TailState {
   TailState(std::string path, std::string file_name, uint64_t position,
@@ -106,6 +102,7 @@ class TailFile : public core::Processor {
   EXTENSIONAPI static const core::Property RollingFilenamePattern;
   EXTENSIONAPI static const core::Property InitialStartPosition;
   EXTENSIONAPI static const core::Property AttributeProviderService;
+  EXTENSIONAPI static const core::Property BatchSize;
 
   static auto properties() {
     return std::array{
@@ -118,7 +115,8 @@ class TailFile : public core::Processor {
       LookupFrequency,
       RollingFilenamePattern,
       InitialStartPosition,
-      AttributeProviderService
+      AttributeProviderService,
+      BatchSize
     };
   }
 
@@ -215,13 +213,8 @@ class TailFile : public core::Processor {
   bool first_trigger_{true};
   controllers::AttributeProviderService* attribute_provider_service_ = nullptr;
   std::unordered_map<std::string, 
controllers::AttributeProviderService::AttributeMap> extra_attributes_;
+  std::optional<uint32_t> batch_size_;
   std::shared_ptr<core::logging::Logger> logger_ = 
core::logging::LoggerFactory<TailFile>::getLogger();
 };
 
-}  // namespace processors
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
-
-#endif  // EXTENSIONS_STANDARD_PROCESSORS_PROCESSORS_TAILFILE_H_
+}  // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/standard-processors/tests/unit/TailFileTests.cpp 
b/extensions/standard-processors/tests/unit/TailFileTests.cpp
index 387f579a2..808dbb190 100644
--- a/extensions/standard-processors/tests/unit/TailFileTests.cpp
+++ b/extensions/standard-processors/tests/unit/TailFileTests.cpp
@@ -44,6 +44,7 @@
 #include "LogAttribute.h"
 #include "utils/TestUtils.h"
 #include "utils/StringUtils.h"
+#include "SingleProcessorTestController.h"
 
 using namespace std::literals::chrono_literals;
 
@@ -1847,3 +1848,32 @@ TEST_CASE("TailFile can use an 
AttributeProviderService", "[AttributeProviderSer
 
   LogTestController::getInstance().reset();
 }
+
+TEST_CASE("TailFile honors batch size for maximum lines processed", 
"[batchSize]") {
+  LogTestController::getInstance().setTrace<minifi::processors::TailFile>();
+
+  auto tailfile = std::make_shared<minifi::processors::TailFile>("TailFile");
+  minifi::test::SingleProcessorTestController test_controller(tailfile);
+
+  auto dir = test_controller.createTempDirectory();
+  std::stringstream temp_file;
+  temp_file << dir << utils::file::get_separator() << TMP_FILE;
+
+  std::ofstream tmpfile;
+  tmpfile.open(temp_file.str(), std::ios::out | std::ios::binary);
+  for (auto i = 0; i < 20; ++i) {
+    tmpfile << NEW_TAIL_DATA;
+  }
+  tmpfile.close();
+
+  std::stringstream state_file;
+  state_file << dir << utils::file::get_separator() << STATE_FILE;
+
+  tailfile->setProperty(minifi::processors::TailFile::FileName.getName(), 
temp_file.str());
+  tailfile->setProperty(minifi::processors::TailFile::Delimiter.getName(), 
"\n");
+  tailfile->setProperty(minifi::processors::TailFile::BatchSize.getName(), 
"10");
+
+  const auto result = test_controller.trigger();
+  const auto& file_contents = result.at(minifi::processors::TailFile::Success);
+  REQUIRE(file_contents.size() == 10);
+}

Reply via email to