This is an automated email from the ASF dual-hosted git repository. fgerlits pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 9dd176da272aab53baa79f6f1a1c28b07b4ad6a1 Author: Martin Zink <[email protected]> AuthorDate: Thu Jun 5 19:19:20 2025 +0200 MINIFICPP-2449 TailFile: option to output full batch in a single flow file Signed-off-by: Ferenc Gerlits <[email protected]> Closes #1972 --- PROCESSORS.md | 27 ++++++----- .../standard-processors/processors/TailFile.cpp | 17 ++++--- .../standard-processors/processors/TailFile.h | 30 ++++++++++-- .../tests/unit/TailFileTests.cpp | 56 +++++++++++++++++----- 4 files changed, 94 insertions(+), 36 deletions(-) diff --git a/PROCESSORS.md b/PROCESSORS.md index 051e7377b..7bb2e948a 100644 --- a/PROCESSORS.md +++ b/PROCESSORS.md @@ -3150,19 +3150,20 @@ In the list below, the names of required properties appear in bold. Any other pr In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language. -| Name | Default Value | Allowable Values | Description [...] -|----------------------------|-------------------|----------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- [...] -| **File to Tail** | | | Fully-qualified filename of the file that should be tailed when using single file mode, or a file regex when using multifile mode [...] -| State File | TailFileState | | DEPRECATED. Only use it for state migration from the legacy state file. [...] -| Input Delimiter | \n | | Specifies the character that should be used for delimiting the data being tailedfrom the incoming file. If none is specified, data will be ingested as it becomes available. [...] -| **tail-mode** | Single file | Single file<br/>Multiple file | Specifies the tail file mode. In 'Single file' mode only a single file will be watched. In 'Multiple file' mode a regex may be used. Note that in multiple file mode we will still continue to watch for rollover on the initial set of watched files. The Regex used to locate multiple files will be run during the schedule phrase. Note that if rotated files are matched by the regex, th [...] -| tail-base-directory | | | Base directory used to look for files to tail. This property is required when using Multiple file mode. Can contain expression language placeholders if Attribute Provider Service is set.<br/>**Supports Expression Language: true** [...] -| Recursive lookup | false | true<br/>false | When using Multiple file mode, this property determines whether files are tailed in child directories of the Base Directory or not. [...] -| Lookup frequency | 10 min | | When using Multiple file mode, this property specifies the minimum duration the processor will wait between looking for new files to tail in the Base Directory. [...] -| Rolling Filename Pattern | ${filename}.* | | If the file to tail "rolls over" as would be the case with log files, this filename pattern will be used to identify files that have rolled over so MiNiFi can read the remaining of the rolled-over file and then continue with the new log file. This pattern supports the wildcard characters * and ?, it also supports the notation ${filename} to specify a pattern based on the name of [...] -| **Initial Start Position** | Beginning of File | Beginning of Time<br/>Beginning of File<br/>Current Time | When the Processor first begins to tail data, this property specifies where the Processor should begin reading data. Once data has been ingested from a file, the Processor will continue from the last point from which it has received data.<br/>Beginning of Time: Start with the oldest data that matches the Rolling Filename Pattern and then begin reading from the File to Tail.<br/>B [...] -| Attribute Provider Service | | | Provides a list of key-value pair records which can be used in the Base Directory property using Expression Language. Requires Multiple file mode. [...] -| **Batch Size** | 0 | | Maximum number of flowfiles emitted in a single trigger. If set to 0 all new content will be processed. [...] +| Name | Default Value | Allowable Values | Description [...] +|----------------------------|-------------------------|----------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- [...] +| **File to Tail** | | | Fully-qualified filename of the file that should be tailed when using single file mode, or a file regex when using multifile mode [...] +| State File | TailFileState | | DEPRECATED. Only use it for state migration from the legacy state file. [...] +| Input Delimiter | \n | | Specifies the character that should be used for delimiting the data being tailedfrom the incoming file. If none is specified, data will be ingested as it becomes available. [...] +| **tail-mode** | Single file | Single file<br/>Multiple file | Specifies the tail file mode. In 'Single file' mode only a single file will be watched. In 'Multiple file' mode a regex may be used. Note that in multiple file mode we will still continue to watch for rollover on the initial set of watched files. The Regex used to locate multiple files will be run during the schedule phrase. Note that if rotated files are matched by the reg [...] +| tail-base-directory | | | Base directory used to look for files to tail. This property is required when using Multiple file mode. Can contain expression language placeholders if Attribute Provider Service is set.<br/>**Supports Expression Language: true** [...] +| Recursive lookup | false | true<br/>false | When using Multiple file mode, this property determines whether files are tailed in child directories of the Base Directory or not. [...] +| Lookup frequency | 10 min | | When using Multiple file mode, this property specifies the minimum duration the processor will wait between looking for new files to tail in the Base Directory. [...] +| Rolling Filename Pattern | ${filename}.* | | If the file to tail "rolls over" as would be the case with log files, this filename pattern will be used to identify files that have rolled over so MiNiFi can read the remaining of the rolled-over file and then continue with the new log file. This pattern supports the wildcard characters * and ?, it also supports the notation ${filename} to specify a pattern based on the na [...] +| **Initial Start Position** | Beginning of File | Beginning of Time<br/>Beginning of File<br/>Current Time | When the Processor first begins to tail data, this property specifies where the Processor should begin reading data. Once data has been ingested from a file, the Processor will continue from the last point from which it has received data.<br/>Beginning of Time: Start with the oldest data that matches the Rolling Filename Pattern and then begin reading from the File to Tail. [...] +| Attribute Provider Service | | | Provides a list of key-value pair records which can be used in the Base Directory property using Expression Language. Requires Multiple file mode. [...] +| **Batch Size** | 0 | | Maximum number of lines emitted in a single trigger. If set to 0 all new content will be processed. [...] +| **Result Mode** | Flow file per delimiter | Flow file per delimiter<br/>Flow file per batch | Specifies how the result lines are arranged into output flow files [...] ### Relationships diff --git a/extensions/standard-processors/processors/TailFile.cpp b/extensions/standard-processors/processors/TailFile.cpp index 5082a8c57..708cf499e 100644 --- a/extensions/standard-processors/processors/TailFile.cpp +++ b/extensions/standard-processors/processors/TailFile.cpp @@ -119,7 +119,7 @@ class FileReaderCallback { while (hasMoreToRead() && !found_delimiter) { if (begin_ == end_) { - input_stream_.read(reinterpret_cast<char *>(buffer_.data()), gsl::narrow<std::streamsize>(buffer_.size())); + input_stream_.read(buffer_.data(), gsl::narrow<std::streamsize>(buffer_.size())); const auto num_bytes_read = input_stream_.gcount(); logger_->log_trace("Read {} bytes of input", std::intmax_t{num_bytes_read}); @@ -246,13 +246,18 @@ void TailFile::onSchedule(core::ProcessContext& context, core::ProcessSessionFac throw Exception(PROCESSOR_EXCEPTION, "Failed to get StateManager"); } + const auto result_format = utils::parseEnumProperty<TailResultFormat>(context, ResultFormat); if (auto delimiter_str = context.getProperty(Delimiter)) { - if (auto parsed_delimiter = utils::string::parseCharacter(*delimiter_str)) { - delimiter_ = *parsed_delimiter; + if (result_format == TailResultFormat::FlowFilePerBatch) { + logger_->log_warn("Delimiter({}) is ignored with {}", *delimiter_str, magic_enum::enum_name(TailResultFormat::FlowFilePerBatch)); } else { - logger_->log_error("Invalid {}: \"{}\" (it should be a single character, whether escaped or not). Using the first character as the {}", - TailFile::Delimiter.name, *delimiter_str, TailFile::Delimiter.name); - delimiter_ = getDelimiterOld(*delimiter_str); + if (auto parsed_delimiter = utils::string::parseCharacter(*delimiter_str)) { + delimiter_ = *parsed_delimiter; + } else { + logger_->log_error("Invalid {}: \"{}\" (it should be a single character, whether escaped or not). Using the first character as the {}", + TailFile::Delimiter.name, *delimiter_str, TailFile::Delimiter.name); + delimiter_ = getDelimiterOld(*delimiter_str); + } } } diff --git a/extensions/standard-processors/processors/TailFile.h b/extensions/standard-processors/processors/TailFile.h index 700a40a65..3d8fa3a83 100644 --- a/extensions/standard-processors/processors/TailFile.h +++ b/extensions/standard-processors/processors/TailFile.h @@ -48,10 +48,16 @@ enum class InitialStartPositions { CURRENT_TIME }; +enum class TailResultFormat { + FlowFilePerDelimiter, + FlowFilePerBatch +}; + } // namespace org::apache::nifi::minifi::processors namespace magic_enum::customize { using InitialStartPositions = org::apache::nifi::minifi::processors::InitialStartPositions; +using TailResultFormat = org::apache::nifi::minifi::processors::TailResultFormat; template <> constexpr customize_t enum_name<InitialStartPositions>(InitialStartPositions value) noexcept { @@ -65,6 +71,17 @@ constexpr customize_t enum_name<InitialStartPositions>(InitialStartPositions val } return invalid_tag; } + +template<> +constexpr customize_t enum_name<TailResultFormat>(TailResultFormat value) noexcept { + switch (value) { + case TailResultFormat::FlowFilePerBatch: + return "Flow file per batch"; + case TailResultFormat::FlowFilePerDelimiter: + return "Flow file per delimiter"; + } + return invalid_tag; +} } // namespace magic_enum::customize namespace org::apache::nifi::minifi::processors { @@ -109,8 +126,6 @@ class TailFile : public core::ProcessorImpl { logger_ = core::logging::LoggerFactory<TailFile>::getLogger(uuid_); } - ~TailFile() override = default; - EXTENSIONAPI static constexpr const char* Description = "\"Tails\" a file, or a list of files, ingesting data from the file as it is written to the file. The file is expected to be textual." " Data is ingested only when a new line is encountered (carriage return or new-line character or combination). If the file to tail is periodically \"rolled over\"," " as is generally the case with log files, an optional Rolling Filename Pattern can be used to retrieve data from files that have rolled over, even if the rollover" @@ -184,11 +199,17 @@ class TailFile : public core::ProcessorImpl { .withAllowedTypes<minifi::controllers::AttributeProviderService>() .build(); EXTENSIONAPI static constexpr auto BatchSize = core::PropertyDefinitionBuilder<>::createProperty("Batch Size") - .withDescription("Maximum number of flowfiles emitted in a single trigger. If set to 0 all new content will be processed.") + .withDescription("Maximum number of lines emitted in a single trigger. If set to 0 all new content will be processed.") .isRequired(true) .withValidator(core::StandardPropertyValidators::UNSIGNED_INTEGER_VALIDATOR) .withDefaultValue("0") .build(); + EXTENSIONAPI static constexpr auto ResultFormat = core::PropertyDefinitionBuilder<magic_enum::enum_count<TailResultFormat>()>::createProperty("Result Mode") + .withDescription("Specifies how the result lines are arranged into output flow files") + .isRequired(true) + .withDefaultValue(magic_enum::enum_name(TailResultFormat::FlowFilePerDelimiter)) + .withAllowedValues(magic_enum::enum_names<TailResultFormat>()) + .build(); EXTENSIONAPI static constexpr auto Properties = std::to_array<core::PropertyReference>({ FileName, StateFile, @@ -200,7 +221,8 @@ class TailFile : public core::ProcessorImpl { RollingFilenamePattern, InitialStartPosition, AttributeProviderService, - BatchSize + BatchSize, + ResultFormat }); diff --git a/extensions/standard-processors/tests/unit/TailFileTests.cpp b/extensions/standard-processors/tests/unit/TailFileTests.cpp index b9b98732e..043262e98 100644 --- a/extensions/standard-processors/tests/unit/TailFileTests.cpp +++ b/extensions/standard-processors/tests/unit/TailFileTests.cpp @@ -16,30 +16,31 @@ * limitations under the License. */ +#include <algorithm> #include <cstdio> #include <fstream> -#include <string> #include <iostream> -#include <set> -#include <algorithm> #include <random> +#include <set> +#include <string> + #include "FlowController.h" -#include "unit/TestBase.h" -#include "unit/Catch.h" +#include "LogAttribute.h" +#include "TailFile.h" +#include "TextFragmentUtils.h" +#include "catch2/generators/catch_generators.hpp" #include "core/Core.h" -#include "utils/file/FileUtils.h" -#include "utils/file/PathUtils.h" -#include "unit/ProvenanceTestHelper.h" -#include "core/Processor.h" #include "core/ProcessContext.h" #include "core/ProcessSession.h" +#include "core/Processor.h" #include "core/Resource.h" -#include "TailFile.h" -#include "LogAttribute.h" +#include "unit/Catch.h" +#include "unit/ProvenanceTestHelper.h" +#include "unit/SingleProcessorTestController.h" +#include "unit/TestBase.h" #include "unit/TestUtils.h" #include "utils/StringUtils.h" -#include "unit/SingleProcessorTestController.h" -#include "TextFragmentUtils.h" +#include "utils/file/FileUtils.h" using namespace std::literals::chrono_literals; @@ -1804,3 +1805,32 @@ TEST_CASE("TailFile honors batch size for maximum lines processed", "[batchSize] const auto& file_contents = result.at(minifi::processors::TailFile::Success); REQUIRE(file_contents.size() == 10); } + +TEST_CASE("Result mode tests") { + LogTestController::getInstance().setTrace<minifi::processors::TailFile>(); + + minifi::test::SingleProcessorTestController test_controller(std::make_unique<minifi::processors::TailFile>("TailFile")); + auto tail_file = test_controller.getProcessor(); + + auto temp_file_path = test_controller.createTempDirectory() / TMP_FILE; + + const auto [result_mode, ff_count] = GENERATE( + std::make_tuple(minifi::processors::TailResultFormat::FlowFilePerBatch, std::size_t{1}), + std::make_tuple(minifi::processors::TailResultFormat::FlowFilePerDelimiter, std::size_t{5})); + + std::ofstream tmp_file; + tmp_file.open(temp_file_path, std::ios::out | std::ios::binary); + for (auto i = 0; i < 20; ++i) { + tmp_file << NEW_TAIL_DATA; + } + tmp_file.close(); + + CHECK(tail_file->setProperty(minifi::processors::TailFile::FileName.name, temp_file_path.string())); + CHECK(tail_file->setProperty(minifi::processors::TailFile::Delimiter.name, "\n")); + CHECK(tail_file->setProperty(minifi::processors::TailFile::BatchSize.name, "5")); + CHECK(tail_file->setProperty(minifi::processors::TailFile::ResultFormat.name, std::string(magic_enum::enum_name(result_mode)))); + + const auto result = test_controller.trigger(); + const auto& file_contents = result.at(minifi::processors::TailFile::Success); + CHECK(file_contents.size() == ff_count); +}
