This is an automated email from the ASF dual-hosted git repository.

fgerlits pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 9dd176da272aab53baa79f6f1a1c28b07b4ad6a1
Author: Martin Zink <[email protected]>
AuthorDate: Thu Jun 5 19:19:20 2025 +0200

    MINIFICPP-2449 TailFile: option to output full batch in a single flow file
    
    Signed-off-by: Ferenc Gerlits <[email protected]>
    Closes #1972
---
 PROCESSORS.md                                      | 27 ++++++-----
 .../standard-processors/processors/TailFile.cpp    | 17 ++++---
 .../standard-processors/processors/TailFile.h      | 30 ++++++++++--
 .../tests/unit/TailFileTests.cpp                   | 56 +++++++++++++++++-----
 4 files changed, 94 insertions(+), 36 deletions(-)

diff --git a/PROCESSORS.md b/PROCESSORS.md
index 051e7377b..7bb2e948a 100644
--- a/PROCESSORS.md
+++ b/PROCESSORS.md
@@ -3150,19 +3150,20 @@ In the list below, the names of required properties 
appear in bold. Any other pr
 
 In the list below, the names of required properties appear in bold. Any other 
properties (not in bold) are considered optional. The table also indicates any 
default values, and whether a property supports the NiFi Expression Language.
 
-| Name                       | Default Value     | Allowable Values            
                             | Description                                      
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
-|----------------------------|-------------------|----------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 [...]
-| **File to Tail**           |                   |                             
                             | Fully-qualified filename of the file that should 
be tailed when using single file mode, or a file regex when using multifile 
mode                                                                            
                                                                                
                                                                                
                  [...]
-| State File                 | TailFileState     |                             
                             | DEPRECATED. Only use it for state migration from 
the legacy state file.                                                          
                                                                                
                                                                                
                                                                                
              [...]
-| Input Delimiter            | \n                |                             
                             | Specifies the character that should be used for 
delimiting the data being tailedfrom the incoming file. If none is specified, 
data will be ingested as it becomes available.                                  
                                                                                
                                                                                
                 [...]
-| **tail-mode**              | Single file       | Single file<br/>Multiple 
file                            | Specifies the tail file mode. In 'Single 
file' mode only a single file will be watched. In 'Multiple file' mode a regex 
may be used. Note that in multiple file mode we will still continue to watch 
for rollover on the initial set of watched files. The Regex used to locate 
multiple files will be run during the schedule phrase. Note that if rotated 
files are matched by the regex, th [...]
-| tail-base-directory        |                   |                             
                             | Base directory used to look for files to tail. 
This property is required when using Multiple file mode. Can contain expression 
language placeholders if Attribute Provider Service is set.<br/>**Supports 
Expression Language: true**                                                     
                                                                                
                     [...]
-| Recursive lookup           | false             | true<br/>false              
                             | When using Multiple file mode, this property 
determines whether files are tailed in child directories of the Base Directory 
or not.                                                                         
                                                                                
                                                                                
                   [...]
-| Lookup frequency           | 10 min            |                             
                             | When using Multiple file mode, this property 
specifies the minimum duration the processor will wait between looking for new 
files to tail in the Base Directory.                                            
                                                                                
                                                                                
                   [...]
-| Rolling Filename Pattern   | ${filename}.*     |                             
                             | If the file to tail "rolls over" as would be the 
case with log files, this filename pattern will be used to identify files that 
have rolled over so MiNiFi can read the remaining of the rolled-over file and 
then continue with the new log file. This pattern supports the wildcard 
characters * and ?, it also supports the notation ${filename} to specify a 
pattern based on the name of  [...]
-| **Initial Start Position** | Beginning of File | Beginning of 
Time<br/>Beginning of File<br/>Current Time | When the Processor first begins 
to tail data, this property specifies where the Processor should begin reading 
data. Once data has been ingested from a file, the Processor will continue from 
the last point from which it has received data.<br/>Beginning of Time: Start 
with the oldest data that matches the Rolling Filename Pattern and then begin 
reading from the File to Tail.<br/>B [...]
-| Attribute Provider Service |                   |                             
                             | Provides a list of key-value pair records which 
can be used in the Base Directory property using Expression Language. Requires 
Multiple file mode.                                                             
                                                                                
                                                                                
                [...]
-| **Batch Size**             | 0                 |                             
                             | Maximum number of flowfiles emitted in a single 
trigger. If set to 0 all new content will be processed.                         
                                                                                
                                                                                
                                                                                
               [...]
+| Name                       | Default Value           | Allowable Values      
                                   | Description                                
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+|----------------------------|-------------------------|----------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 [...]
+| **File to Tail**           |                         |                       
                                   | Fully-qualified filename of the file that 
should be tailed when using single file mode, or a file regex when using 
multifile mode                                                                  
                                                                                
                                                                                
                      [...]
+| State File                 | TailFileState           |                       
                                   | DEPRECATED. Only use it for state 
migration from the legacy state file.                                           
                                                                                
                                                                                
                                                                                
                       [...]
+| Input Delimiter            | \n                      |                       
                                   | Specifies the character that should be 
used for delimiting the data being tailedfrom the incoming file. If none is 
specified, data will be ingested as it becomes available.                       
                                                                                
                                                                                
                      [...]
+| **tail-mode**              | Single file             | Single 
file<br/>Multiple file                            | Specifies the tail file 
mode. In 'Single file' mode only a single file will be watched. In 'Multiple 
file' mode a regex may be used. Note that in multiple file mode we will still 
continue to watch for rollover on the initial set of watched files. The Regex 
used to locate multiple files will be run during the schedule phrase. Note that 
if rotated files are matched by the reg [...]
+| tail-base-directory        |                         |                       
                                   | Base directory used to look for files to 
tail. This property is required when using Multiple file mode. Can contain 
expression language placeholders if Attribute Provider Service is 
set.<br/>**Supports Expression Language: true**                                 
                                                                                
                                   [...]
+| Recursive lookup           | false                   | true<br/>false        
                                   | When using Multiple file mode, this 
property determines whether files are tailed in child directories of the Base 
Directory or not.                                                               
                                                                                
                                                                                
                       [...]
+| Lookup frequency           | 10 min                  |                       
                                   | When using Multiple file mode, this 
property specifies the minimum duration the processor will wait between looking 
for new files to tail in the Base Directory.                                    
                                                                                
                                                                                
                     [...]
+| Rolling Filename Pattern   | ${filename}.*           |                       
                                   | If the file to tail "rolls over" as would 
be the case with log files, this filename pattern will be used to identify 
files that have rolled over so MiNiFi can read the remaining of the rolled-over 
file and then continue with the new log file. This pattern supports the 
wildcard characters * and ?, it also supports the notation ${filename} to 
specify a pattern based on the na [...]
+| **Initial Start Position** | Beginning of File       | Beginning of 
Time<br/>Beginning of File<br/>Current Time | When the Processor first begins 
to tail data, this property specifies where the Processor should begin reading 
data. Once data has been ingested from a file, the Processor will continue from 
the last point from which it has received data.<br/>Beginning of Time: Start 
with the oldest data that matches the Rolling Filename Pattern and then begin 
reading from the File to Tail. [...]
+| Attribute Provider Service |                         |                       
                                   | Provides a list of key-value pair records 
which can be used in the Base Directory property using Expression Language. 
Requires Multiple file mode.                                                    
                                                                                
                                                                                
                   [...]
+| **Batch Size**             | 0                       |                       
                                   | Maximum number of lines emitted in a 
single trigger. If set to 0 all new content will be processed.                  
                                                                                
                                                                                
                                                                                
                    [...]
+| **Result Mode**            | Flow file per delimiter | Flow file per 
delimiter<br/>Flow file per batch          | Specifies how the result lines are 
arranged into output flow files                                                 
                                                                                
                                                                                
                                                                                
                      [...]
 
 ### Relationships
 
diff --git a/extensions/standard-processors/processors/TailFile.cpp 
b/extensions/standard-processors/processors/TailFile.cpp
index 5082a8c57..708cf499e 100644
--- a/extensions/standard-processors/processors/TailFile.cpp
+++ b/extensions/standard-processors/processors/TailFile.cpp
@@ -119,7 +119,7 @@ class FileReaderCallback {
 
     while (hasMoreToRead() && !found_delimiter) {
       if (begin_ == end_) {
-        input_stream_.read(reinterpret_cast<char *>(buffer_.data()), 
gsl::narrow<std::streamsize>(buffer_.size()));
+        input_stream_.read(buffer_.data(), 
gsl::narrow<std::streamsize>(buffer_.size()));
 
         const auto num_bytes_read = input_stream_.gcount();
         logger_->log_trace("Read {} bytes of input", 
std::intmax_t{num_bytes_read});
@@ -246,13 +246,18 @@ void TailFile::onSchedule(core::ProcessContext& context, 
core::ProcessSessionFac
     throw Exception(PROCESSOR_EXCEPTION, "Failed to get StateManager");
   }
 
+  const auto result_format = 
utils::parseEnumProperty<TailResultFormat>(context, ResultFormat);
   if (auto delimiter_str = context.getProperty(Delimiter)) {
-    if (auto parsed_delimiter = utils::string::parseCharacter(*delimiter_str)) 
{
-      delimiter_ = *parsed_delimiter;
+    if (result_format == TailResultFormat::FlowFilePerBatch) {
+      logger_->log_warn("Delimiter({}) is ignored with {}", *delimiter_str, 
magic_enum::enum_name(TailResultFormat::FlowFilePerBatch));
     } else {
-      logger_->log_error("Invalid {}: \"{}\" (it should be a single character, 
whether escaped or not). Using the first character as the {}",
-          TailFile::Delimiter.name, *delimiter_str, TailFile::Delimiter.name);
-      delimiter_ = getDelimiterOld(*delimiter_str);
+      if (auto parsed_delimiter = 
utils::string::parseCharacter(*delimiter_str)) {
+        delimiter_ = *parsed_delimiter;
+      } else {
+        logger_->log_error("Invalid {}: \"{}\" (it should be a single 
character, whether escaped or not). Using the first character as the {}",
+            TailFile::Delimiter.name, *delimiter_str, 
TailFile::Delimiter.name);
+        delimiter_ = getDelimiterOld(*delimiter_str);
+      }
     }
   }
 
diff --git a/extensions/standard-processors/processors/TailFile.h 
b/extensions/standard-processors/processors/TailFile.h
index 700a40a65..3d8fa3a83 100644
--- a/extensions/standard-processors/processors/TailFile.h
+++ b/extensions/standard-processors/processors/TailFile.h
@@ -48,10 +48,16 @@ enum class InitialStartPositions {
   CURRENT_TIME
 };
 
+enum class TailResultFormat {
+  FlowFilePerDelimiter,
+  FlowFilePerBatch
+};
+
 }  // namespace org::apache::nifi::minifi::processors
 
 namespace magic_enum::customize {
 using InitialStartPositions = 
org::apache::nifi::minifi::processors::InitialStartPositions;
+using TailResultFormat = 
org::apache::nifi::minifi::processors::TailResultFormat;
 
 template <>
 constexpr customize_t enum_name<InitialStartPositions>(InitialStartPositions 
value) noexcept {
@@ -65,6 +71,17 @@ constexpr customize_t 
enum_name<InitialStartPositions>(InitialStartPositions val
   }
   return invalid_tag;
 }
+
+template<>
+constexpr customize_t enum_name<TailResultFormat>(TailResultFormat value) 
noexcept {
+  switch (value) {
+    case TailResultFormat::FlowFilePerBatch:
+      return "Flow file per batch";
+    case TailResultFormat::FlowFilePerDelimiter:
+      return "Flow file per delimiter";
+  }
+  return invalid_tag;
+}
 }  // namespace magic_enum::customize
 
 namespace org::apache::nifi::minifi::processors {
@@ -109,8 +126,6 @@ class TailFile : public core::ProcessorImpl {
     logger_ = core::logging::LoggerFactory<TailFile>::getLogger(uuid_);
   }
 
-  ~TailFile() override = default;
-
   EXTENSIONAPI static constexpr const char* Description = "\"Tails\" a file, 
or a list of files, ingesting data from the file as it is written to the file. 
The file is expected to be textual."
       " Data is ingested only when a new line is encountered (carriage return 
or new-line character or combination). If the file to tail is periodically 
\"rolled over\","
       " as is generally the case with log files, an optional Rolling Filename 
Pattern can be used to retrieve data from files that have rolled over, even if 
the rollover"
@@ -184,11 +199,17 @@ class TailFile : public core::ProcessorImpl {
       .withAllowedTypes<minifi::controllers::AttributeProviderService>()
       .build();
   EXTENSIONAPI static constexpr auto BatchSize = 
core::PropertyDefinitionBuilder<>::createProperty("Batch Size")
-      .withDescription("Maximum number of flowfiles emitted in a single 
trigger. If set to 0 all new content will be processed.")
+      .withDescription("Maximum number of lines emitted in a single trigger. 
If set to 0 all new content will be processed.")
       .isRequired(true)
       
.withValidator(core::StandardPropertyValidators::UNSIGNED_INTEGER_VALIDATOR)
       .withDefaultValue("0")
       .build();
+  EXTENSIONAPI static constexpr auto ResultFormat = 
core::PropertyDefinitionBuilder<magic_enum::enum_count<TailResultFormat>()>::createProperty("Result
 Mode")
+      .withDescription("Specifies how the result lines are arranged into 
output flow files")
+      .isRequired(true)
+      
.withDefaultValue(magic_enum::enum_name(TailResultFormat::FlowFilePerDelimiter))
+      .withAllowedValues(magic_enum::enum_names<TailResultFormat>())
+      .build();
   EXTENSIONAPI static constexpr auto Properties = 
std::to_array<core::PropertyReference>({
       FileName,
       StateFile,
@@ -200,7 +221,8 @@ class TailFile : public core::ProcessorImpl {
       RollingFilenamePattern,
       InitialStartPosition,
       AttributeProviderService,
-      BatchSize
+      BatchSize,
+      ResultFormat
   });
 
 
diff --git a/extensions/standard-processors/tests/unit/TailFileTests.cpp 
b/extensions/standard-processors/tests/unit/TailFileTests.cpp
index b9b98732e..043262e98 100644
--- a/extensions/standard-processors/tests/unit/TailFileTests.cpp
+++ b/extensions/standard-processors/tests/unit/TailFileTests.cpp
@@ -16,30 +16,31 @@
  * limitations under the License.
  */
 
+#include <algorithm>
 #include <cstdio>
 #include <fstream>
-#include <string>
 #include <iostream>
-#include <set>
-#include <algorithm>
 #include <random>
+#include <set>
+#include <string>
+
 #include "FlowController.h"
-#include "unit/TestBase.h"
-#include "unit/Catch.h"
+#include "LogAttribute.h"
+#include "TailFile.h"
+#include "TextFragmentUtils.h"
+#include "catch2/generators/catch_generators.hpp"
 #include "core/Core.h"
-#include "utils/file/FileUtils.h"
-#include "utils/file/PathUtils.h"
-#include "unit/ProvenanceTestHelper.h"
-#include "core/Processor.h"
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
+#include "core/Processor.h"
 #include "core/Resource.h"
-#include "TailFile.h"
-#include "LogAttribute.h"
+#include "unit/Catch.h"
+#include "unit/ProvenanceTestHelper.h"
+#include "unit/SingleProcessorTestController.h"
+#include "unit/TestBase.h"
 #include "unit/TestUtils.h"
 #include "utils/StringUtils.h"
-#include "unit/SingleProcessorTestController.h"
-#include "TextFragmentUtils.h"
+#include "utils/file/FileUtils.h"
 
 using namespace std::literals::chrono_literals;
 
@@ -1804,3 +1805,32 @@ TEST_CASE("TailFile honors batch size for maximum lines 
processed", "[batchSize]
   const auto& file_contents = result.at(minifi::processors::TailFile::Success);
   REQUIRE(file_contents.size() == 10);
 }
+
+TEST_CASE("Result mode tests") {
+  LogTestController::getInstance().setTrace<minifi::processors::TailFile>();
+
+  minifi::test::SingleProcessorTestController 
test_controller(std::make_unique<minifi::processors::TailFile>("TailFile"));
+  auto tail_file = test_controller.getProcessor();
+
+  auto temp_file_path  = test_controller.createTempDirectory() / TMP_FILE;
+
+  const auto [result_mode, ff_count] = GENERATE(
+    std::make_tuple(minifi::processors::TailResultFormat::FlowFilePerBatch, 
std::size_t{1}),
+    
std::make_tuple(minifi::processors::TailResultFormat::FlowFilePerDelimiter, 
std::size_t{5}));
+
+  std::ofstream tmp_file;
+  tmp_file.open(temp_file_path, std::ios::out | std::ios::binary);
+  for (auto i = 0; i < 20; ++i) {
+    tmp_file << NEW_TAIL_DATA;
+  }
+  tmp_file.close();
+
+  CHECK(tail_file->setProperty(minifi::processors::TailFile::FileName.name, 
temp_file_path.string()));
+  CHECK(tail_file->setProperty(minifi::processors::TailFile::Delimiter.name, 
"\n"));
+  CHECK(tail_file->setProperty(minifi::processors::TailFile::BatchSize.name, 
"5"));
+  
CHECK(tail_file->setProperty(minifi::processors::TailFile::ResultFormat.name, 
std::string(magic_enum::enum_name(result_mode))));
+
+  const auto result = test_controller.trigger();
+  const auto& file_contents = result.at(minifi::processors::TailFile::Success);
+  CHECK(file_contents.size() == ff_count);
+}

Reply via email to