This is an automated email from the ASF dual-hosted git repository. szaszm pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 624743dadcaff9d933f335995bccf1a430ddfea3 Author: Martin Zink <[email protected]> AuthorDate: Mon Oct 28 15:50:02 2024 +0100 MINIFICPP-2465 SplitContent Closes #1872 Signed-off-by: Marton Szasz <[email protected]> --- PROCESSORS.md | 35 ++ README.md | 6 +- .../processors/SplitContent.cpp | 224 ++++++++ .../standard-processors/processors/SplitContent.h | 130 +++++ .../tests/unit/SplitContentTests.cpp | 597 +++++++++++++++++++++ libminifi/include/io/InputStream.h | 8 + libminifi/src/core/ProcessSession.cpp | 1 + libminifi/test/libtest/unit/TestBase.cpp | 18 +- libminifi/test/libtest/unit/TestBase.h | 1 + 9 files changed, 1012 insertions(+), 8 deletions(-) diff --git a/PROCESSORS.md b/PROCESSORS.md index ff8488673..0a637c31c 100644 --- a/PROCESSORS.md +++ b/PROCESSORS.md @@ -93,6 +93,7 @@ limitations under the License. - [RetryFlowFile](#RetryFlowFile) - [RouteOnAttribute](#RouteOnAttribute) - [RouteText](#RouteText) +- [SplitContent](#SplitContent) - [SplitText](#SplitText) - [TailEventLog](#TailEventLog) - [TailFile](#TailFile) @@ -2809,6 +2810,40 @@ In the list below, the names of required properties appear in bold. Any other pr | success | All files, containing log events, are routed to success | +## SplitContent + +### Description + +Splits incoming FlowFiles by a specified byte sequence + +### Properties + +In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language. + +| Name | Default Value | Allowable Values | Description | +|----------------------------|---------------|----------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| **Byte Sequence Format** | Hexadecimal | Hexadecimal<br/>Text | Specifies how the <Byte Sequence> property should be interpreted | +| **Byte Sequence** | | | A representation of bytes to look for and upon which to split the source file into separate files | +| **Keep Byte Sequence** | false | true<br/>false | Determines whether or not the Byte Sequence should be included with each Split | +| **Byte Sequence Location** | Trailing | Trailing<br/>Leading | If <Keep Byte Sequence> is set to true, specifies whether the byte sequence should be added to the end of the first split or the beginning of the second; if <Keep Byte Sequence> is false, this property is ignored. | + +### Relationships + +| Name | Description | +|----------|------------------------------------------------------| +| original | The original file | +| splits | All Splits will be routed to the splits relationship | + +### Output Attributes + +| Attribute | Relationship | Description | +|---------------------------|--------------|--------------------------------------------------------------------------------------------------------------------------------| +| fragment.identifier | | All split FlowFiles produced from the same parent FlowFile will have the same randomly generated UUID added for this attribute | +| fragment.index | | A one-up number that indicates the ordering of the split FlowFiles that were created from a single parent FlowFile | +| fragment.count | | The number of split FlowFiles generated from the parent FlowFile | +| segment.original.filename | | The filename of the parent FlowFile | + + ## SplitText ### Description diff --git a/README.md b/README.md index b80108b8d..a79023fd8 100644 --- a/README.md +++ b/README.md @@ -63,9 +63,9 @@ MiNiFi - C++ supports the following C++ processors: The following table lists the base set of processors. -| Extension Set | Processors [...] -|---------------|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ [...] -| **Base** | [AppendHostInfo](PROCESSORS.md#appendhostinfo)<br/>[AttributesToJSON](PROCESSORS.md#attributestojson)<br/>[DefragmentText](PROCESSORS.md#defragmenttext)<br/>[ExecuteProcess](PROCESSORS.md#executeprocess)<br/>[ExtractText](PROCESSORS.md#extracttext)<br/>[FetchFile](PROCESSORS.md#fetchfile)<br/>[GenerateFlowFile](PROCESSORS.md#generateflowfile)<br/>[GetFile](PROCESSORS.md#getfile)<br/>[GetTCP](PROCESSORS.md#gettcp)<br/>[HashContent](PROCESSORS.md#hashcontent)<br/>[InvokeH [...] +| Extension Set | Processors [...] +|---------------|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ [...] +| **Base** | [AppendHostInfo](PROCESSORS.md#appendhostinfo)<br/>[AttributesToJSON](PROCESSORS.md#attributestojson)<br/>[DefragmentText](PROCESSORS.md#defragmenttext)<br/>[ExecuteProcess](PROCESSORS.md#executeprocess)<br/>[ExtractText](PROCESSORS.md#extracttext)<br/>[FetchFile](PROCESSORS.md#fetchfile)<br/>[GenerateFlowFile](PROCESSORS.md#generateflowfile)<br/>[GetFile](PROCESSORS.md#getfile)<br/>[GetTCP](PROCESSORS.md#gettcp)<br/>[HashContent](PROCESSORS.md#hashcontent)<br/>[InvokeH [...] The next table outlines CMAKE flags that correspond with MiNiFi extensions. Extensions that are enabled by default ( such as RocksDB ), can be disabled with the respective CMAKE flag on the command line. diff --git a/extensions/standard-processors/processors/SplitContent.cpp b/extensions/standard-processors/processors/SplitContent.cpp new file mode 100644 index 000000000..cfcfb3416 --- /dev/null +++ b/extensions/standard-processors/processors/SplitContent.cpp @@ -0,0 +1,224 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "SplitContent.h" + +#include <range/v3/view/split.hpp> + +#include "core/FlowFile.h" +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" +#include "core/Resource.h" +#include "utils/ProcessorConfigUtils.h" +#include "utils/gsl.h" + +namespace org::apache::nifi::minifi::processors { +void SplitContent::initialize() { + setSupportedProperties(Properties); + setSupportedRelationships(Relationships); +} + +void SplitContent::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) { + auto byte_sequence_str = utils::getRequiredPropertyOrThrow<std::string>(context, ByteSequence.name); + const auto byte_sequence_format = utils::parseEnumProperty<ByteSequenceFormat>(context, ByteSequenceFormatProperty); + std::vector<std::byte> byte_sequence{}; + if (byte_sequence_format == ByteSequenceFormat::Hexadecimal) { + byte_sequence = utils::string::from_hex(byte_sequence_str); + } else { + byte_sequence.resize(byte_sequence_str.size()); + std::ranges::transform(byte_sequence_str, byte_sequence.begin(), [](char c) { return static_cast<std::byte>(c); }); + } + if (byte_sequence.empty()) { throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Cannot operate without byte sequence"); } + byte_sequence_matcher_.emplace(ByteSequenceMatcher(std::move(byte_sequence))); + byte_sequence_location_ = utils::parseEnumProperty<ByteSequenceLocation>(context, ByteSequenceLocationProperty); + keep_byte_sequence = utils::getRequiredPropertyOrThrow<bool>(context, KeepByteSequence.name); +} + +namespace { +class Splitter { + public: + explicit Splitter(core::ProcessSession& session, std::optional<std::string> original_filename, SplitContent::ByteSequenceMatcher& byte_sequence_matcher, const bool keep_byte_sequence, + const SplitContent::ByteSequenceLocation byte_sequence_location) + : session_(session), + original_filename_(std::move(original_filename)), + byte_sequence_matcher_(byte_sequence_matcher), + keep_trailing_byte_sequence_(keep_byte_sequence && byte_sequence_location == SplitContent::ByteSequenceLocation::Trailing), + keep_leading_byte_sequence_(keep_byte_sequence && byte_sequence_location == SplitContent::ByteSequenceLocation::Leading) { + data_before_byte_sequence_.reserve(SplitContent::BUFFER_TARGET_SIZE); + } + + Splitter(const Splitter&) = delete; + Splitter& operator=(const Splitter&) = delete; + Splitter(Splitter&&) = delete; + Splitter& operator=(Splitter&&) = delete; + + ~Splitter() { + flushRemainingData(); + updateSplitAttributesAndTransfer(); + } + + void digest(const std::byte b) { + const auto prev_matching_bytes = matching_bytes_; + matching_bytes_ = byte_sequence_matcher_.getNumberOfMatchingBytes(matching_bytes_, b); + if (matchedByteSequence()) { + appendDataBeforeByteSequenceToSplit(); + if (keep_trailing_byte_sequence_) { appendByteSequenceToSplit(); } + closeCurrentSplit(); + + // possible new split + if (keep_leading_byte_sequence_) { appendByteSequenceToSplit(); } + matching_bytes_ = 0; + return; + } + // matching grew, no need to grow data_before_byte_sequence + if (matching_bytes_ > prev_matching_bytes) { return; } + + if (matching_bytes_ > 0) { + // last byte could be part of the byte_sequence + std::copy_n(getByteSequence().begin(), prev_matching_bytes - matching_bytes_ + 1, std::back_inserter(data_before_byte_sequence_)); + } else { + // last byte is not part of the byte_sequence + std::copy_n(getByteSequence().begin(), prev_matching_bytes - matching_bytes_, std::back_inserter(data_before_byte_sequence_)); + data_before_byte_sequence_.push_back(b); + } + } + + void flushIfBufferTooLarge() { + if (data_before_byte_sequence_.size() >= SplitContent::BUFFER_TARGET_SIZE) { appendDataBeforeByteSequenceToSplit(); } + } + + private: + void closeCurrentSplit() { + if (current_split_) { + completed_splits_.push_back(current_split_); + current_split_.reset(); + } + } + + void appendDataBeforeByteSequenceToSplit() { + if (data_before_byte_sequence_.empty()) { return; } + ensureCurrentSplit(); + session_.appendBuffer(current_split_, std::span<const std::byte>(data_before_byte_sequence_.data(), data_before_byte_sequence_.size())); + data_before_byte_sequence_.clear(); + } + + void appendByteSequenceToSplit() { + ensureCurrentSplit(); + session_.appendBuffer(current_split_, getByteSequence()); + } + + [[nodiscard]] std::span<const std::byte> getByteSequence() const { return byte_sequence_matcher_.getByteSequence(); } + + void ensureCurrentSplit() { + if (!current_split_) { current_split_ = session_.create(); } + } + + [[nodiscard]] bool matchedByteSequence() const { return matching_bytes_ == byte_sequence_matcher_.getByteSequence().size(); } + + void flushRemainingData() { + if (current_split_ || !data_before_byte_sequence_.empty() || matching_bytes_ > 0) { + ensureCurrentSplit(); + session_.appendBuffer(current_split_, std::span<const std::byte>(data_before_byte_sequence_.data(), data_before_byte_sequence_.size())); + session_.appendBuffer(current_split_, byte_sequence_matcher_.getByteSequence().subspan(0, matching_bytes_)); + completed_splits_.push_back(current_split_); + } + } + + void updateSplitAttributesAndTransfer() const { + const std::string fragment_identifier_ = utils::IdGenerator::getIdGenerator()->generate().to_string(); + for (size_t split_i = 0; split_i < completed_splits_.size(); ++split_i) { + const auto& split = completed_splits_[split_i]; + split->setAttribute(SplitContent::FragmentCountOutputAttribute.name, std::to_string(completed_splits_.size())); + split->setAttribute(SplitContent::FragmentIndexOutputAttribute.name, std::to_string(split_i + 1)); // One based indexing + split->setAttribute(SplitContent::FragmentIdentifierOutputAttribute.name, fragment_identifier_); + split->setAttribute(SplitContent::SegmentOriginalFilenameOutputAttribute.name, original_filename_.value_or("")); + session_.transfer(split, SplitContent::Splits); + } + } + + core::ProcessSession& session_; + const std::optional<std::string> original_filename_; + SplitContent::ByteSequenceMatcher& byte_sequence_matcher_; + std::vector<std::byte> data_before_byte_sequence_; + std::shared_ptr<core::FlowFile> current_split_ = nullptr; + std::vector<std::shared_ptr<core::FlowFile>> completed_splits_; + SplitContent::size_type matching_bytes_ = 0; + + const bool keep_trailing_byte_sequence_ = false; + const bool keep_leading_byte_sequence_ = false; +}; +} // namespace + +SplitContent::ByteSequenceMatcher::ByteSequenceMatcher(std::vector<std::byte> byte_sequence) : byte_sequence_(std::move(byte_sequence)) { + byte_sequence_nodes_.push_back(node{.byte = {}, .cache = {}, .previous_max_match = {}}); + for (const auto& byte: byte_sequence_) { byte_sequence_nodes_.push_back(node{.byte = byte, .cache = {}, .previous_max_match = {}}); } +} + +SplitContent::size_type SplitContent::ByteSequenceMatcher::getNumberOfMatchingBytes(const size_type number_of_currently_matching_bytes, const std::byte next_byte) { + gsl_Assert(number_of_currently_matching_bytes <= byte_sequence_nodes_.size()); + auto& curr_go = byte_sequence_nodes_[number_of_currently_matching_bytes].cache; + if (curr_go.contains(next_byte)) { return curr_go.at(next_byte); } + if (next_byte == byte_sequence_nodes_[number_of_currently_matching_bytes + 1].byte) { + curr_go[next_byte] = number_of_currently_matching_bytes + 1; + return number_of_currently_matching_bytes + 1; + } + if (number_of_currently_matching_bytes == 0) { + curr_go[next_byte] = 0; + return 0; + } + + curr_go[next_byte] = getNumberOfMatchingBytes(getPreviousMaxMatch(number_of_currently_matching_bytes), next_byte); + return curr_go.at(next_byte); +} + +SplitContent::size_type SplitContent::ByteSequenceMatcher::getPreviousMaxMatch(const size_type number_of_currently_matching_bytes) { + gsl_Assert(number_of_currently_matching_bytes <= byte_sequence_nodes_.size()); + auto& prev_max_match = byte_sequence_nodes_[number_of_currently_matching_bytes].previous_max_match; + if (prev_max_match) { return *prev_max_match; } + if (number_of_currently_matching_bytes <= 1) { + prev_max_match = 0; + return 0; + } + prev_max_match = getNumberOfMatchingBytes(getPreviousMaxMatch(number_of_currently_matching_bytes - 1), byte_sequence_nodes_[number_of_currently_matching_bytes].byte); + return *prev_max_match; +} + +void SplitContent::onTrigger(core::ProcessContext& context, core::ProcessSession& session) { + gsl_Assert(byte_sequence_matcher_); + const auto original = session.get(); + if (!original) { + context.yield(); + return; + } + + const auto ff_content_stream = session.getFlowFileContentStream(*original); + if (!ff_content_stream) { throw Exception(PROCESSOR_EXCEPTION, fmt::format("Couldn't access the ContentStream of {}", original->getUUID().to_string())); } + + Splitter splitter{session, original->getAttribute(core::SpecialFlowAttribute::FILENAME), *byte_sequence_matcher_, keep_byte_sequence, byte_sequence_location_}; + + while (auto latest_byte = ff_content_stream->readByte()) { + splitter.digest(*latest_byte); + splitter.flushIfBufferTooLarge(); + } + + session.transfer(original, Original); +} + +REGISTER_RESOURCE(SplitContent, Processor); + +} // namespace org::apache::nifi::minifi::processors diff --git a/extensions/standard-processors/processors/SplitContent.h b/extensions/standard-processors/processors/SplitContent.h new file mode 100644 index 000000000..9c77fcfae --- /dev/null +++ b/extensions/standard-processors/processors/SplitContent.h @@ -0,0 +1,130 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include <memory> +#include <optional> +#include <string_view> + +#include "FlowFileRecord.h" +#include "core/ProcessSession.h" +#include "core/Processor.h" +#include "core/PropertyDefinition.h" +#include "core/PropertyDefinitionBuilder.h" +#include "core/PropertyType.h" +#include "core/RelationshipDefinition.h" +#include "utils/Export.h" + +namespace org::apache::nifi::minifi::processors { + +class SplitContent final : public core::Processor { + public: + explicit SplitContent(const std::string_view name, const utils::Identifier& uuid = {}) : Processor(name, uuid) {} + + using size_type = std::vector<std::byte>::size_type; + enum class ByteSequenceFormat { Hexadecimal, Text }; + enum class ByteSequenceLocation { Trailing, Leading }; + + EXTENSIONAPI static constexpr auto Description = "Splits incoming FlowFiles by a specified byte sequence"; + + EXTENSIONAPI static constexpr auto ByteSequenceFormatProperty = + core::PropertyDefinitionBuilder<2>::createProperty("Byte Sequence Format") + .withDescription("Specifies how the <Byte Sequence> property should be interpreted") + .isRequired(true) + .withDefaultValue(magic_enum::enum_name(ByteSequenceFormat::Hexadecimal)) + .withAllowedValues(magic_enum::enum_names<ByteSequenceFormat>()) + .build(); + + EXTENSIONAPI static constexpr auto ByteSequence = + core::PropertyDefinitionBuilder<>::createProperty("Byte Sequence") + .withDescription("A representation of bytes to look for and upon which to split the source file into separate files") + .isRequired(true) + .withPropertyType(core::StandardPropertyTypes::NON_BLANK_TYPE) + .build(); + + EXTENSIONAPI static constexpr auto KeepByteSequence = + core::PropertyDefinitionBuilder<>::createProperty("Keep Byte Sequence") + .withDescription("Determines whether or not the Byte Sequence should be included with each Split") + .withPropertyType(core::StandardPropertyTypes::BOOLEAN_TYPE) + .withDefaultValue("false") + .isRequired(true) + .build(); + + EXTENSIONAPI static constexpr auto ByteSequenceLocationProperty = + core::PropertyDefinitionBuilder<2>::createProperty("Byte Sequence Location") + .withDescription( + "If <Keep Byte Sequence> is set to true, specifies whether the byte sequence should be added to the end of the first split or the beginning of the second; " + "if <Keep Byte Sequence> is false, this property is ignored.") + .withDefaultValue(magic_enum::enum_name(ByteSequenceLocation::Trailing)) + .withAllowedValues(magic_enum::enum_names<ByteSequenceLocation>()) + .isRequired(true) + .build(); + + EXTENSIONAPI static constexpr auto Properties = std::to_array<core::PropertyReference>({ByteSequenceFormatProperty, ByteSequence, KeepByteSequence, ByteSequenceLocationProperty}); + + EXTENSIONAPI static constexpr auto Splits = core::RelationshipDefinition{"splits", "All Splits will be routed to the splits relationship"}; + EXTENSIONAPI static constexpr auto Original = core::RelationshipDefinition{"original", "The original file"}; + EXTENSIONAPI static constexpr auto Relationships = std::array{Original, Splits}; + + EXTENSIONAPI static constexpr auto FragmentIdentifierOutputAttribute = + core::OutputAttributeDefinition<0>{"fragment.identifier", {}, "All split FlowFiles produced from the same parent FlowFile will have the same randomly generated UUID added for this attribute"}; + EXTENSIONAPI static constexpr auto FragmentIndexOutputAttribute = + core::OutputAttributeDefinition<0>{"fragment.index", {}, "A one-up number that indicates the ordering of the split FlowFiles that were created from a single parent FlowFile"}; + EXTENSIONAPI static constexpr auto FragmentCountOutputAttribute = core::OutputAttributeDefinition<0>{"fragment.count", {}, "The number of split FlowFiles generated from the parent FlowFile"}; + EXTENSIONAPI static constexpr auto SegmentOriginalFilenameOutputAttribute = core::OutputAttributeDefinition<0>{"segment.original.filename", {}, "The filename of the parent FlowFile"}; + EXTENSIONAPI static constexpr auto OutputAttributes = + std::array<core::OutputAttributeReference, 4>{FragmentIdentifierOutputAttribute, FragmentIndexOutputAttribute, FragmentCountOutputAttribute, SegmentOriginalFilenameOutputAttribute}; + + EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false; + EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false; + EXTENSIONAPI static constexpr auto InputRequirement = core::annotation::Input::INPUT_REQUIRED; + EXTENSIONAPI static constexpr bool IsSingleThreaded = true; + ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS + + static constexpr size_type BUFFER_TARGET_SIZE = 4096; + + void onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& session_factory) override; + void onTrigger(core::ProcessContext& context, core::ProcessSession& session) override; + void initialize() override; + + class ByteSequenceMatcher { + public: + using size_type = std::vector<std::byte>::size_type; + explicit ByteSequenceMatcher(std::vector<std::byte> byte_sequence); + size_type getNumberOfMatchingBytes(size_type number_of_currently_matching_bytes, std::byte next_byte); + size_type getPreviousMaxMatch(size_type number_of_currently_matching_bytes); + [[nodiscard]] std::span<const std::byte> getByteSequence() const { return byte_sequence_; } + + private: + struct node { + std::byte byte; + std::unordered_map<std::byte, size_type> cache; + std::optional<size_type> previous_max_match; + }; + std::vector<node> byte_sequence_nodes_; + const std::vector<std::byte> byte_sequence_; + }; + + private: + std::optional<ByteSequenceMatcher> byte_sequence_matcher_; + bool keep_byte_sequence = false; + ByteSequenceLocation byte_sequence_location_ = ByteSequenceLocation::Trailing; + std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<SplitContent>::getLogger(uuid_); +}; + +} // namespace org::apache::nifi::minifi::processors diff --git a/extensions/standard-processors/tests/unit/SplitContentTests.cpp b/extensions/standard-processors/tests/unit/SplitContentTests.cpp new file mode 100644 index 000000000..32a1856ff --- /dev/null +++ b/extensions/standard-processors/tests/unit/SplitContentTests.cpp @@ -0,0 +1,597 @@ +/** +* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "FlowFileRecord.h" +#include "catch2/generators/catch_generators.hpp" +#include "processors/SplitContent.h" +#include "unit/Catch.h" +#include "unit/SingleProcessorTestController.h" +#include "unit/TestBase.h" + +namespace org::apache::nifi::minifi::processors::test { + +template<typename... Bytes> +std::vector<std::byte> createByteVector(Bytes... bytes) { + return {static_cast<std::byte>(bytes)...}; +} + +TEST_CASE("WithoutByteSequence") { + const auto split_content = std::make_shared<SplitContent>("SplitContent"); + minifi::test::SingleProcessorTestController controller{split_content}; + split_content->setProperty(SplitContent::ByteSequenceFormatProperty, magic_enum::enum_name(SplitContent::ByteSequenceFormat::Text)); + split_content->setProperty(SplitContent::KeepByteSequence, "true"); + split_content->setProperty(SplitContent::ByteSequenceLocationProperty, magic_enum::enum_name(SplitContent::ByteSequenceLocation::Leading)); + + REQUIRE_THROWS_WITH(controller.trigger("rub-a-dub-dub"), "General Operation: Required property is empty: Byte Sequence"); +} + +TEST_CASE("EmptyFlowFile") { + const auto split_content = std::make_shared<SplitContent>("SplitContent"); + minifi::test::SingleProcessorTestController controller{split_content}; + split_content->setProperty(SplitContent::ByteSequenceFormatProperty, magic_enum::enum_name(SplitContent::ByteSequenceFormat::Text)); + split_content->setProperty(SplitContent::ByteSequence, "ub"); + split_content->setProperty(SplitContent::KeepByteSequence, "true"); + split_content->setProperty(SplitContent::ByteSequenceLocationProperty, magic_enum::enum_name(SplitContent::ByteSequenceLocation::Leading)); + + auto trigger_results = controller.trigger(""); + auto original = trigger_results.at(processors::SplitContent::Original); + auto splits = trigger_results.at(processors::SplitContent::Splits); + + REQUIRE(original.size() == 1); + REQUIRE(splits.empty()); + + CHECK(controller.plan->getContent(original[0]).empty()); +} + +TEST_CASE("TextFormatLeadingPosition", "[NiFi]") { + const auto split_content = std::make_shared<SplitContent>("SplitContent"); + minifi::test::SingleProcessorTestController controller{split_content}; + split_content->setProperty(SplitContent::ByteSequenceFormatProperty, magic_enum::enum_name(SplitContent::ByteSequenceFormat::Text)); + split_content->setProperty(SplitContent::ByteSequence, "ub"); + split_content->setProperty(SplitContent::KeepByteSequence, "true"); + split_content->setProperty(SplitContent::ByteSequenceLocationProperty, magic_enum::enum_name(SplitContent::ByteSequenceLocation::Leading)); + + auto trigger_results = controller.trigger("rub-a-dub-dub"); + auto original = trigger_results.at(processors::SplitContent::Original); + auto splits = trigger_results.at(processors::SplitContent::Splits); + + REQUIRE(original.size() == 1); + REQUIRE(splits.size() == 4); + + CHECK(controller.plan->getContent(original[0]) == "rub-a-dub-dub"); + + CHECK(controller.plan->getContent(splits[0]) == "r"); + CHECK(controller.plan->getContent(splits[1]) == "ub-a-d"); + CHECK(controller.plan->getContent(splits[2]) == "ub-d"); + CHECK(controller.plan->getContent(splits[3]) == "ub"); +} + +TEST_CASE("TextFormatTrailingPosition", "[NiFi]") { + const auto split_content = std::make_shared<SplitContent>("SplitContent"); + minifi::test::SingleProcessorTestController controller{split_content}; + + split_content->setProperty(SplitContent::ByteSequenceFormatProperty, magic_enum::enum_name(SplitContent::ByteSequenceFormat::Text)); + split_content->setProperty(SplitContent::ByteSequence, "ub"); + split_content->setProperty(SplitContent::KeepByteSequence, "true"); + split_content->setProperty(SplitContent::ByteSequenceLocationProperty, magic_enum::enum_name(SplitContent::ByteSequenceLocation::Trailing)); + + auto trigger_results = controller.trigger("rub-a-dub-dub"); + + auto original = trigger_results.at(processors::SplitContent::Original); + auto splits = trigger_results.at(processors::SplitContent::Splits); + + REQUIRE(original.size() == 1); + REQUIRE(splits.size() == 3); + + CHECK(controller.plan->getContent(original[0]) == "rub-a-dub-dub"); + + CHECK(controller.plan->getContent(splits[0]) == "rub"); + CHECK(controller.plan->getContent(splits[1]) == "-a-dub"); + CHECK(controller.plan->getContent(splits[2]) == "-dub"); +} + +TEST_CASE("TextFormatSplits", "[NiFi]") { + const auto split_content = std::make_shared<SplitContent>("SplitContent"); + minifi::test::SingleProcessorTestController controller{split_content}; + + split_content->setProperty(SplitContent::ByteSequenceFormatProperty, magic_enum::enum_name(SplitContent::ByteSequenceFormat::Text)); + split_content->setProperty(SplitContent::ByteSequence, "test"); + + constexpr std::string_view input_1 = "This is a test. This is another test. And this is yet another test. Finally this is the last Test."; + constexpr std::string_view input_2 = "This is a test. This is another test. And this is yet another test. Finally this is the last test"; + + const auto [keep_byte_sequence, byte_sequence_location, input, expected_splits] = GENERATE_REF( + std::make_tuple("true", "Leading", input_1, std::vector<std::string_view>{"This is a ", "test. This is another ", "test. And this is yet another ", "test. Finally this is the last Test."}), + std::make_tuple("false", "Leading", input_1, std::vector<std::string_view>{"This is a ", ". This is another ", ". And this is yet another ", ". Finally this is the last Test."}), + std::make_tuple("true", "Trailing", input_1, std::vector<std::string_view>{"This is a test", ". This is another test", ". And this is yet another test", ". Finally this is the last Test."}), + std::make_tuple("false", "Trailing", input_1, std::vector<std::string_view>{"This is a ", ". This is another ", ". And this is yet another ", ". Finally this is the last Test."}), + std::make_tuple("true", "Leading", input_2, std::vector<std::string_view>{"This is a ", "test. This is another ", "test. And this is yet another ", "test. Finally this is the last ", "test"}), + std::make_tuple("true", "Trailing", input_2, std::vector<std::string_view>{"This is a test", ". This is another test", ". And this is yet another test", ". Finally this is the last test"})); + + split_content->setProperty(SplitContent::KeepByteSequence, keep_byte_sequence); + split_content->setProperty(SplitContent::ByteSequenceLocationProperty, byte_sequence_location); + + auto trigger_results = controller.trigger(input); + + auto original = trigger_results.at(processors::SplitContent::Original); + auto splits = trigger_results.at(processors::SplitContent::Splits); + + REQUIRE(original.size() == 1); + REQUIRE(splits.size() == expected_splits.size()); + + CHECK(controller.plan->getContent(original[0]) == input); + + for (size_t i = 0; i < expected_splits.size(); ++i) { + auto split_i = controller.plan->getContent(splits[i]); + auto expected_i = expected_splits[i]; + CHECK(split_i == expected_i); + } +} + +TEST_CASE("SmallSplits", "[NiFi]") { + const auto split_content = std::make_shared<SplitContent>("SplitContent"); + minifi::test::SingleProcessorTestController controller{split_content}; + + split_content->setProperty(SplitContent::KeepByteSequence, "false"); + split_content->setProperty(SplitContent::ByteSequence, "FFFF"); + + const auto input_data = createByteVector(1, 2, 3, 4, 5, 0xFF, 0xFF, 0xFF, 5, 4, 3, 2, 1); + std::string_view input(reinterpret_cast<const char*>(input_data.data()), input_data.size()); + + auto trigger_results = controller.trigger(input); + + auto original = trigger_results.at(processors::SplitContent::Original); + auto splits = trigger_results.at(processors::SplitContent::Splits); + + REQUIRE(original.size() == 1); + REQUIRE(splits.size() == 2); + + const auto expected_split_1 = createByteVector(1, 2, 3, 4, 5); + const auto expected_split_2 = createByteVector(0xFF, 5, 4, 3, 2, 1); + + CHECK(controller.plan->getContentAsBytes(*splits[0]) == expected_split_1); + CHECK(controller.plan->getContentAsBytes(*splits[1]) == expected_split_2); +} + +TEST_CASE("WithSingleByteSplit", "[NiFi]") { + const auto split_content = std::make_shared<SplitContent>("SplitContent"); + minifi::test::SingleProcessorTestController controller{split_content}; + + split_content->setProperty(SplitContent::KeepByteSequence, "false"); + split_content->setProperty(SplitContent::ByteSequence, "FF"); + + const auto input_data = createByteVector(1, 2, 3, 4, 5, 0xFF, 5, 4, 3, 2, 1); + std::string_view input(reinterpret_cast<const char*>(input_data.data()), input_data.size()); + + auto trigger_results = controller.trigger(input); + + const auto original = trigger_results.at(processors::SplitContent::Original); + const auto splits = trigger_results.at(processors::SplitContent::Splits); + + REQUIRE(original.size() == 1); + REQUIRE(splits.size() == 2); + + const auto expected_split_1 = createByteVector(1, 2, 3, 4, 5); + const auto expected_split_2 = createByteVector(5, 4, 3, 2, 1); + + CHECK(controller.plan->getContentAsBytes(*splits[0]) == expected_split_1); + CHECK(controller.plan->getContentAsBytes(*splits[1]) == expected_split_2); +} + +TEST_CASE("WithLargerSplit", "[NiFi]") { + const auto split_content = std::make_shared<SplitContent>("SplitContent"); + minifi::test::SingleProcessorTestController controller{split_content}; + + split_content->setProperty(SplitContent::KeepByteSequence, "false"); + split_content->setProperty(SplitContent::ByteSequence, "05050505"); + + const auto input_data = createByteVector(1, 2, 3, 4, 5, 5, 5, 5, 5, 5, 4, 3, 2, 1); + std::string_view input(reinterpret_cast<const char*>(input_data.data()), input_data.size()); + + auto trigger_results = controller.trigger(input); + + const auto original = trigger_results.at(processors::SplitContent::Original); + const auto splits = trigger_results.at(processors::SplitContent::Splits); + + REQUIRE(original.size() == 1); + REQUIRE(splits.size() == 2); + + const auto expected_split_1 = createByteVector(1, 2, 3, 4); + const auto expected_split_2 = createByteVector(5, 5, 4, 3, 2, 1); + + CHECK(controller.plan->getContentAsBytes(*splits[0]) == expected_split_1); + CHECK(controller.plan->getContentAsBytes(*splits[1]) == expected_split_2); +} + +TEST_CASE("KeepingSequence", "[NiFi]") { + const auto split_content = std::make_shared<SplitContent>("SplitContent"); + minifi::test::SingleProcessorTestController controller{split_content}; + + split_content->setProperty(SplitContent::KeepByteSequence, "true"); + split_content->setProperty(SplitContent::ByteSequence, "05050505"); + + const auto input_data = createByteVector(1, 2, 3, 4, 5, 5, 5, 5, 5, 5, 4, 3, 2, 1); + std::string_view input(reinterpret_cast<const char*>(input_data.data()), input_data.size()); + + auto trigger_results = controller.trigger(input); + + const auto original = trigger_results.at(processors::SplitContent::Original); + const auto splits = trigger_results.at(processors::SplitContent::Splits); + + REQUIRE(original.size() == 1); + REQUIRE(splits.size() == 2); + + const auto expected_split_1 = createByteVector(1, 2, 3, 4, 5, 5, 5, 5); + const auto expected_split_2 = createByteVector(5, 5, 4, 3, 2, 1); + + CHECK(controller.plan->getContentAsBytes(*splits[0]) == expected_split_1); + CHECK(controller.plan->getContentAsBytes(*splits[1]) == expected_split_2); +} + +TEST_CASE("EndsWithSequence", "[NiFi]") { + const auto split_content = std::make_shared<SplitContent>("SplitContent"); + minifi::test::SingleProcessorTestController controller{split_content}; + + split_content->setProperty(SplitContent::KeepByteSequence, "false"); + split_content->setProperty(SplitContent::ByteSequence, "05050505"); + + const auto input_data = createByteVector(1, 2, 3, 4, 5, 5, 5, 5); + std::string_view input(reinterpret_cast<const char*>(input_data.data()), input_data.size()); + + auto trigger_results = controller.trigger(input); + + auto original = trigger_results.at(processors::SplitContent::Original); + auto splits = trigger_results.at(processors::SplitContent::Splits); + + REQUIRE(original.size() == 1); + REQUIRE(splits.size() == 1); + + auto expected_split = createByteVector(1, 2, 3, 4); + + CHECK(controller.plan->getContentAsBytes(*splits[0]) == expected_split); +} + +TEST_CASE("EndsWithSequenceAndKeepSequence", "[NiFi]") { + const auto split_content = std::make_shared<SplitContent>("SplitContent"); + minifi::test::SingleProcessorTestController controller{split_content}; + + split_content->setProperty(SplitContent::KeepByteSequence, "true"); + split_content->setProperty(SplitContent::ByteSequence, "05050505"); + + const auto input_data = createByteVector(1, 2, 3, 4, 5, 5, 5, 5); + std::string_view input(reinterpret_cast<const char*>(input_data.data()), input_data.size()); + + auto trigger_results = controller.trigger(input); + + auto original = trigger_results.at(processors::SplitContent::Original); + auto splits = trigger_results.at(processors::SplitContent::Splits); + + REQUIRE(original.size() == 1); + REQUIRE(splits.size() == 1); + + auto expected_split_1 = createByteVector(1, 2, 3, 4, 5, 5, 5, 5); + + CHECK(controller.plan->getContentAsBytes(*splits[0]) == expected_split_1); +} + +TEST_CASE("StartsWithSequence", "[NiFi]") { + const auto split_content = std::make_shared<SplitContent>("SplitContent"); + minifi::test::SingleProcessorTestController controller{split_content}; + + split_content->setProperty(SplitContent::KeepByteSequence, "false"); + split_content->setProperty(SplitContent::ByteSequence, "05050505"); + + const auto input_data = createByteVector(5, 5, 5, 5, 1, 2, 3, 4); + std::string_view input(reinterpret_cast<const char*>(input_data.data()), input_data.size()); + + auto trigger_results = controller.trigger(input); + + auto original = trigger_results.at(processors::SplitContent::Original); + auto splits = trigger_results.at(processors::SplitContent::Splits); + + REQUIRE(original.size() == 1); + REQUIRE(splits.size() == 1); + + auto expected_split = createByteVector(1, 2, 3, 4); + + CHECK(controller.plan->getContentAsBytes(*splits[0]) == expected_split); +} + +TEST_CASE("StartsWithSequenceAndKeepTrailingSequence", "[NiFi]") { + const auto split_content = std::make_shared<SplitContent>("SplitContent"); + minifi::test::SingleProcessorTestController controller{split_content}; + + split_content->setProperty(SplitContent::KeepByteSequence, "true"); + split_content->setProperty(SplitContent::ByteSequence, "05050505"); + split_content->setProperty(SplitContent::ByteSequenceLocationProperty, "Trailing"); + + const auto input_data = createByteVector(5, 5, 5, 5, 1, 2, 3, 4); + std::string_view input(reinterpret_cast<const char*>(input_data.data()), input_data.size()); + + auto trigger_results = controller.trigger(input); + + auto original = trigger_results.at(processors::SplitContent::Original); + auto splits = trigger_results.at(processors::SplitContent::Splits); + + REQUIRE(original.size() == 1); + REQUIRE(splits.size() == 2); + + auto expected_split_1 = createByteVector(5, 5, 5, 5); + auto expected_split_2 = createByteVector(1, 2, 3, 4); + + CHECK(controller.plan->getContentAsBytes(*splits[0]) == expected_split_1); + CHECK(controller.plan->getContentAsBytes(*splits[1]) == expected_split_2); +} + +TEST_CASE("StartsWithSequenceAndKeepLeadingSequence") { + const auto split_content = std::make_shared<SplitContent>("SplitContent"); + minifi::test::SingleProcessorTestController controller{split_content}; + + split_content->setProperty(SplitContent::KeepByteSequence, "true"); + split_content->setProperty(SplitContent::ByteSequence, "05050505"); + split_content->setProperty(SplitContent::ByteSequenceLocationProperty, "Leading"); + + const auto input_data = createByteVector(5, 5, 5, 5, 1, 2, 3, 4); + std::string_view input(reinterpret_cast<const char*>(input_data.data()), input_data.size()); + + auto trigger_results = controller.trigger(input); + + auto original = trigger_results.at(processors::SplitContent::Original); + auto splits = trigger_results.at(processors::SplitContent::Splits); + + REQUIRE(original.size() == 1); + REQUIRE(splits.size() == 1); + + CHECK(controller.plan->getContentAsBytes(*splits[0]) == input_data); +} + +TEST_CASE("StartsWithDoubleSequenceAndKeepLeadingSequence") { + const auto split_content = std::make_shared<SplitContent>("SplitContent"); + minifi::test::SingleProcessorTestController controller{split_content}; + + split_content->setProperty(SplitContent::KeepByteSequence, "true"); + split_content->setProperty(SplitContent::ByteSequence, "05050505"); + split_content->setProperty(SplitContent::ByteSequenceLocationProperty, "Leading"); + + const auto input_data = createByteVector(5, 5, 5, 5, 5, 5, 5, 5, 1, 2, 3, 4); + std::string_view input(reinterpret_cast<const char*>(input_data.data()), input_data.size()); + + auto trigger_results = controller.trigger(input); + + auto original = trigger_results.at(processors::SplitContent::Original); + auto splits = trigger_results.at(processors::SplitContent::Splits); + + auto expected_split_1 = createByteVector(5, 5, 5, 5); + auto expected_split_2 = createByteVector(5, 5, 5, 5, 1, 2, 3, 4); + + REQUIRE(original.size() == 1); + REQUIRE(splits.size() == 2); + + CHECK(controller.plan->getContentAsBytes(*splits[0]) == expected_split_1); + CHECK(controller.plan->getContentAsBytes(*splits[1]) == expected_split_2); +} + +TEST_CASE("NoSplitterInString", "[NiFi]") { + const auto split_content = std::make_shared<SplitContent>("SplitContent"); + minifi::test::SingleProcessorTestController controller{split_content}; + + split_content->setProperty(SplitContent::ByteSequenceFormatProperty, magic_enum::enum_name(SplitContent::ByteSequenceFormat::Text)); + split_content->setProperty(SplitContent::ByteSequence, ","); + split_content->setProperty(SplitContent::KeepByteSequence, "false"); + split_content->setProperty(SplitContent::ByteSequenceLocationProperty, magic_enum::enum_name(SplitContent::ByteSequenceLocation::Trailing)); + + constexpr std::string_view input = "UVAT"; + auto trigger_results = controller.trigger(input); + + auto original = trigger_results.at(processors::SplitContent::Original); + auto splits = trigger_results.at(processors::SplitContent::Splits); + + REQUIRE(splits.size() == 1); + REQUIRE(original.size() == 1); + + CHECK(splits[0]->getAttribute("fragment.identifier").has_value()); + CHECK(splits[0]->getAttribute("segment.original.filename").has_value()); + + CHECK(splits[0]->getAttribute("fragment.count").value() == "1"); + CHECK(splits[0]->getAttribute("fragment.index").value() == "1"); + + CHECK(controller.plan->getContent(splits[0]) == input); + CHECK(controller.plan->getContent(original[0]) == input); +} + +TEST_CASE("ByteSequenceAtBufferTargetSize") { + const auto split_content = std::make_shared<SplitContent>("SplitContent"); + minifi::test::SingleProcessorTestController controller{split_content}; + + auto x = SplitContent::BUFFER_TARGET_SIZE-10; + + auto [pre_fix_size, separator_size, post_fix_size] = GENERATE_COPY( + std::make_tuple(x, x, x), + std::make_tuple(10, 10, x), + std::make_tuple(10, x, 10), + std::make_tuple(10, 10, x), + std::make_tuple(10, x, x), + std::make_tuple(x, 10, x), + std::make_tuple(x, x, 10), + std::make_tuple(2*x, x, 10)); + + + const std::string pre_fix = utils::string::repeat("a", pre_fix_size); + const std::string separator = utils::string::repeat("b", separator_size); + const std::string post_fix = utils::string::repeat("c", post_fix_size); + + split_content->setProperty(SplitContent::ByteSequenceFormatProperty, magic_enum::enum_name(SplitContent::ByteSequenceFormat::Text)); + split_content->setProperty(SplitContent::ByteSequence, separator); + split_content->setProperty(SplitContent::KeepByteSequence, "true"); + split_content->setProperty(SplitContent::ByteSequenceLocationProperty, "Trailing"); + + auto input = pre_fix + separator + post_fix; + auto trigger_results = controller.trigger(input); + + auto original = trigger_results.at(processors::SplitContent::Original); + auto splits = trigger_results.at(processors::SplitContent::Splits); + + REQUIRE(splits.size() == 2); + REQUIRE(original.size() == 1); + + CHECK(controller.plan->getContent(splits[0]) == std::string(pre_fix) + std::string(separator)); + CHECK(controller.plan->getContent(splits[1]) == post_fix); + + CHECK(controller.plan->getContent(original[0]) == input); +} + +TEST_CASE("TrickyWithLeading", "[NiFi]") { + const auto split_content = std::make_shared<SplitContent>("SplitContent"); + minifi::test::SingleProcessorTestController controller{split_content}; + split_content->setProperty(SplitContent::ByteSequenceFormatProperty, magic_enum::enum_name(SplitContent::ByteSequenceFormat::Text)); + split_content->setProperty(SplitContent::ByteSequence, "aab"); + split_content->setProperty(SplitContent::KeepByteSequence, "true"); + split_content->setProperty(SplitContent::ByteSequenceLocationProperty, magic_enum::enum_name(SplitContent::ByteSequenceLocation::Leading)); + + auto trigger_results = controller.trigger("aaabc"); + auto original = trigger_results.at(processors::SplitContent::Original); + auto splits = trigger_results.at(processors::SplitContent::Splits); + + REQUIRE(original.size() == 1); + REQUIRE(splits.size() == 2); + + CHECK(controller.plan->getContent(original[0]) == "aaabc"); + + CHECK(controller.plan->getContent(splits[0]) == "a"); + CHECK(controller.plan->getContent(splits[1]) == "aabc"); +} + +TEST_CASE("TrickyWithTrailing", "[NiFi]") { + const auto split_content = std::make_shared<SplitContent>("SplitContent"); + minifi::test::SingleProcessorTestController controller{split_content}; + split_content->setProperty(SplitContent::ByteSequenceFormatProperty, magic_enum::enum_name(SplitContent::ByteSequenceFormat::Text)); + split_content->setProperty(SplitContent::ByteSequence, "aab"); + split_content->setProperty(SplitContent::KeepByteSequence, "true"); + split_content->setProperty(SplitContent::ByteSequenceLocationProperty, magic_enum::enum_name(SplitContent::ByteSequenceLocation::Trailing)); + + auto trigger_results = controller.trigger("aaabc"); + auto original = trigger_results.at(processors::SplitContent::Original); + auto splits = trigger_results.at(processors::SplitContent::Splits); + + REQUIRE(original.size() == 1); + REQUIRE(splits.size() == 2); + + CHECK(controller.plan->getContent(original[0]) == "aaabc"); + + CHECK(controller.plan->getContent(splits[0]) == "aaab"); + CHECK(controller.plan->getContent(splits[1]) == "c"); +} + +TEST_CASE("TrickierWithLeading", "[NiFi]") { + const auto split_content = std::make_shared<SplitContent>("SplitContent"); + minifi::test::SingleProcessorTestController controller{split_content}; + split_content->setProperty(SplitContent::ByteSequenceFormatProperty, magic_enum::enum_name(SplitContent::ByteSequenceFormat::Text)); + split_content->setProperty(SplitContent::ByteSequence, "abcd"); + split_content->setProperty(SplitContent::KeepByteSequence, "true"); + split_content->setProperty(SplitContent::ByteSequenceLocationProperty, magic_enum::enum_name(SplitContent::ByteSequenceLocation::Leading)); + + auto trigger_results = controller.trigger("abcabcabcdabc"); + auto original = trigger_results.at(processors::SplitContent::Original); + auto splits = trigger_results.at(processors::SplitContent::Splits); + + REQUIRE(original.size() == 1); + REQUIRE(splits.size() == 2); + + CHECK(controller.plan->getContent(original[0]) == "abcabcabcdabc"); + + CHECK(controller.plan->getContent(splits[0]) == "abcabc"); + CHECK(controller.plan->getContent(splits[1]) == "abcdabc"); +} + +TEST_CASE("TrickierWithTrailing", "[NiFi]") { + const auto split_content = std::make_shared<SplitContent>("SplitContent"); + minifi::test::SingleProcessorTestController controller{split_content}; + split_content->setProperty(SplitContent::ByteSequenceFormatProperty, magic_enum::enum_name(SplitContent::ByteSequenceFormat::Text)); + split_content->setProperty(SplitContent::ByteSequence, "abcd"); + split_content->setProperty(SplitContent::KeepByteSequence, "true"); + split_content->setProperty(SplitContent::ByteSequenceLocationProperty, magic_enum::enum_name(SplitContent::ByteSequenceLocation::Trailing)); + + auto trigger_results = controller.trigger("abcabcabcdabc"); + auto original = trigger_results.at(processors::SplitContent::Original); + auto splits = trigger_results.at(processors::SplitContent::Splits); + + REQUIRE(original.size() == 1); + REQUIRE(splits.size() == 2); + + CHECK(controller.plan->getContent(original[0]) == "abcabcabcdabc"); + + CHECK(controller.plan->getContent(splits[0]) == "abcabcabcd"); + CHECK(controller.plan->getContent(splits[1]) == "abc"); +} + +TEST_CASE("OnlyByteSequencesNoKeep", "[NiFi]") { + const auto split_content = std::make_shared<SplitContent>("SplitContent"); + minifi::test::SingleProcessorTestController controller{split_content}; + split_content->setProperty(SplitContent::ByteSequenceFormatProperty, magic_enum::enum_name(SplitContent::ByteSequenceFormat::Text)); + split_content->setProperty(SplitContent::ByteSequence, "ab"); + split_content->setProperty(SplitContent::KeepByteSequence, "false"); + split_content->setProperty(SplitContent::ByteSequenceLocationProperty, magic_enum::enum_name(SplitContent::ByteSequenceLocation::Trailing)); + + auto trigger_results = controller.trigger("ababab"); + auto original = trigger_results.at(processors::SplitContent::Original); + auto splits = trigger_results.at(processors::SplitContent::Splits); + + REQUIRE(original.size() == 1); + REQUIRE(splits.empty()); +} + +TEST_CASE("OnlyByteSequencesTrailing", "[NiFi]") { + const auto split_content = std::make_shared<SplitContent>("SplitContent"); + minifi::test::SingleProcessorTestController controller{split_content}; + split_content->setProperty(SplitContent::ByteSequenceFormatProperty, magic_enum::enum_name(SplitContent::ByteSequenceFormat::Text)); + split_content->setProperty(SplitContent::ByteSequence, "ab"); + split_content->setProperty(SplitContent::KeepByteSequence, "true"); + split_content->setProperty(SplitContent::ByteSequenceLocationProperty, magic_enum::enum_name(SplitContent::ByteSequenceLocation::Trailing)); + + auto trigger_results = controller.trigger("ababab"); + auto original = trigger_results.at(processors::SplitContent::Original); + auto splits = trigger_results.at(processors::SplitContent::Splits); + + REQUIRE(original.size() == 1); + REQUIRE(splits.size() == 3); + + CHECK(controller.plan->getContent(splits[0]) == "ab"); + CHECK(controller.plan->getContent(splits[1]) == "ab"); + CHECK(controller.plan->getContent(splits[2]) == "ab"); +} + +TEST_CASE("OnlyByteSequencesLeading", "[NiFi]") { + const auto split_content = std::make_shared<SplitContent>("SplitContent"); + minifi::test::SingleProcessorTestController controller{split_content}; + split_content->setProperty(SplitContent::ByteSequenceFormatProperty, magic_enum::enum_name(SplitContent::ByteSequenceFormat::Text)); + split_content->setProperty(SplitContent::ByteSequence, "ab"); + split_content->setProperty(SplitContent::KeepByteSequence, "true"); + split_content->setProperty(SplitContent::ByteSequenceLocationProperty, magic_enum::enum_name(SplitContent::ByteSequenceLocation::Leading)); + + auto trigger_results = controller.trigger("ababab"); + auto original = trigger_results.at(processors::SplitContent::Original); + auto splits = trigger_results.at(processors::SplitContent::Splits); + + REQUIRE(original.size() == 1); + REQUIRE(splits.size() == 3); + + CHECK(controller.plan->getContent(splits[0]) == "ab"); + CHECK(controller.plan->getContent(splits[1]) == "ab"); + CHECK(controller.plan->getContent(splits[2]) == "ab"); +} +} // namespace org::apache::nifi::minifi::processors::test diff --git a/libminifi/include/io/InputStream.h b/libminifi/include/io/InputStream.h index f18c5655e..ad24d6908 100644 --- a/libminifi/include/io/InputStream.h +++ b/libminifi/include/io/InputStream.h @@ -81,6 +81,14 @@ class InputStream : public virtual Stream { return sizeof(Integral); } + + std::optional<std::byte> readByte() { + std::array<std::byte, 1> buf{}; + if (read(buf) != 1) { + return std::nullopt; + } + return buf[0]; + } }; } // namespace org::apache::nifi::minifi::io diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp index 1d62e3d3a..79c02ff63 100644 --- a/libminifi/src/core/ProcessSession.cpp +++ b/libminifi/src/core/ProcessSession.cpp @@ -333,6 +333,7 @@ void ProcessSession::appendBuffer(const std::shared_ptr<core::FlowFile>& flow_fi appendBuffer(flow_file, as_bytes(buffer)); } void ProcessSession::appendBuffer(const std::shared_ptr<core::FlowFile>& flow_file, std::span<const std::byte> buffer) { + if (buffer.empty()) { return; } append(flow_file, [buffer](const std::shared_ptr<io::OutputStream>& output_stream) { const auto write_status = output_stream->write(buffer); return io::isError(write_status) ? -1 : gsl::narrow<int64_t>(write_status); diff --git a/libminifi/test/libtest/unit/TestBase.cpp b/libminifi/test/libtest/unit/TestBase.cpp index b83c1509c..a580859f0 100644 --- a/libminifi/test/libtest/unit/TestBase.cpp +++ b/libminifi/test/libtest/unit/TestBase.cpp @@ -672,12 +672,20 @@ void TestPlan::validateAnnotations() const { } } +std::vector<std::byte> TestPlan::getContentAsBytes(const core::FlowFile& flow_file) const { + const auto content_claim = flow_file.getResourceClaim(); + const auto content_stream = content_repo_->read(*content_claim); + const auto output_stream = std::make_shared<minifi::io::BufferStream>(); + minifi::internal::pipe(*content_stream, *output_stream); + return ranges::to<std::vector>(output_stream->getBuffer()); +} + std::string TestPlan::getContent(const minifi::core::FlowFile& file) const { - auto content_claim = file.getResourceClaim(); - auto content_stream = content_repo_->read(*content_claim); - auto output_stream = std::make_shared<minifi::io::BufferStream>(); - minifi::InputStreamPipe{*output_stream}(content_stream); - return utils::span_to<std::string>(minifi::utils::as_span<const char>(output_stream->getBuffer()).subspan(file.getOffset(), file.getSize())); + const auto content_claim = file.getResourceClaim(); + const auto content_stream = content_repo_->read(*content_claim); + const auto output_stream = std::make_shared<minifi::io::BufferStream>(); + minifi::internal::pipe(*content_stream, *output_stream); + return utils::span_to<std::string>(minifi::utils::as_span<const char>(output_stream->getBuffer())); } TestController::TestController() diff --git a/libminifi/test/libtest/unit/TestBase.h b/libminifi/test/libtest/unit/TestBase.h index 05826b2b6..94c90e9be 100644 --- a/libminifi/test/libtest/unit/TestBase.h +++ b/libminifi/test/libtest/unit/TestBase.h @@ -285,6 +285,7 @@ class TestPlan { return state_storage_; } + std::vector<std::byte> getContentAsBytes(const core::FlowFile& flow_file) const; std::string getContent(const std::shared_ptr<const minifi::core::FlowFile>& file) const { return getContent(*file); } std::string getContent(const minifi::core::FlowFile& file) const;
