This is an automated email from the ASF dual-hosted git repository. szaszm pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 24e19a324f76cc8ef92ac0a5c93308a2e53ee7e8 Author: Martin Zink <[email protected]> AuthorDate: Mon Oct 28 15:59:33 2024 +0100 MINIFICPP-2471 SegmentContent Closes #1879 Signed-off-by: Marton Szasz <[email protected]> --- PROCESSORS.md | 31 +++ README.md | 2 +- .../processors/SegmentContent.cpp | 108 +++++++++ .../processors/SegmentContent.h | 78 ++++++ .../tests/unit/SegmentContentTests.cpp | 263 +++++++++++++++++++++ 5 files changed, 481 insertions(+), 1 deletion(-) diff --git a/PROCESSORS.md b/PROCESSORS.md index 0a637c31c..9a09eae09 100644 --- a/PROCESSORS.md +++ b/PROCESSORS.md @@ -2810,6 +2810,37 @@ In the list below, the names of required properties appear in bold. Any other pr | success | All files, containing log events, are routed to success | +## SegmentContent + +### Description + +Segments a FlowFile into multiple smaller segments on byte boundaries. + +### Properties + +In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language. + +| Name | Default Value | Allowable Values | Description | +|------------------|---------------|------------------|--------------------------------------------------------------------------------------------| +| **Segment Size** | | | The maximum data size in bytes for each segment<br/>**Supports Expression Language: true** | + +### Relationships + +| Name | Description | +|----------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| original | The original FlowFile will be sent to this relationship | +| segments | All segments will be sent to this relationship. If the file was small enough that it was not segmented, a copy of the original is sent to this relationship as well as original | + +### Output Attributes + +| Attribute | Relationship | Description | +|---------------------------|--------------|-------------------------------------------------------------------------------------------------------------------------------| +| fragment.identifier | | All segments produced from the same parent FlowFile will have the same randomly generated UUID added for this attribute | +| fragment.index | | A sequence number starting with 1 that indicates the ordering of the segments that were created from a single parent FlowFile | +| fragment.count | | The number of segments generated from the parent FlowFile | +| segment.original.filename | | The filename of the parent FlowFile | + + ## SplitContent ### Description diff --git a/README.md b/README.md index a79023fd8..8285e4b4d 100644 --- a/README.md +++ b/README.md @@ -65,7 +65,7 @@ The following table lists the base set of processors. | Extension Set | Processors [...] |---------------|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ [...] -| **Base** | [AppendHostInfo](PROCESSORS.md#appendhostinfo)<br/>[AttributesToJSON](PROCESSORS.md#attributestojson)<br/>[DefragmentText](PROCESSORS.md#defragmenttext)<br/>[ExecuteProcess](PROCESSORS.md#executeprocess)<br/>[ExtractText](PROCESSORS.md#extracttext)<br/>[FetchFile](PROCESSORS.md#fetchfile)<br/>[GenerateFlowFile](PROCESSORS.md#generateflowfile)<br/>[GetFile](PROCESSORS.md#getfile)<br/>[GetTCP](PROCESSORS.md#gettcp)<br/>[HashContent](PROCESSORS.md#hashcontent)<br/>[InvokeH [...] +| **Base** | [AppendHostInfo](PROCESSORS.md#appendhostinfo)<br/>[AttributesToJSON](PROCESSORS.md#attributestojson)<br/>[DefragmentText](PROCESSORS.md#defragmenttext)<br/>[ExecuteProcess](PROCESSORS.md#executeprocess)<br/>[ExtractText](PROCESSORS.md#extracttext)<br/>[FetchFile](PROCESSORS.md#fetchfile)<br/>[GenerateFlowFile](PROCESSORS.md#generateflowfile)<br/>[GetFile](PROCESSORS.md#getfile)<br/>[GetTCP](PROCESSORS.md#gettcp)<br/>[HashContent](PROCESSORS.md#hashcontent)<br/>[InvokeH [...] The next table outlines CMAKE flags that correspond with MiNiFi extensions. Extensions that are enabled by default ( such as RocksDB ), can be disabled with the respective CMAKE flag on the command line. diff --git a/extensions/standard-processors/processors/SegmentContent.cpp b/extensions/standard-processors/processors/SegmentContent.cpp new file mode 100644 index 000000000..d8fbe7643 --- /dev/null +++ b/extensions/standard-processors/processors/SegmentContent.cpp @@ -0,0 +1,108 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "SegmentContent.h" + +#include "core/FlowFile.h" +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" +#include "core/Resource.h" +#include "utils/ProcessorConfigUtils.h" + +namespace org::apache::nifi::minifi::processors { + +constexpr size_t BUFFER_TARGET_SIZE = 1024; + +void SegmentContent::initialize() { + setSupportedProperties(Properties); + setSupportedRelationships(Relationships); +} + +void SegmentContent::onSchedule(core::ProcessContext&, core::ProcessSessionFactory&) {} + +namespace { +void updateSplitAttributesAndTransfer(core::ProcessSession& session, const std::vector<std::shared_ptr<core::FlowFile>>& splits, const core::FlowFile& original) { + const std::string fragment_identifier_ = original.getAttribute(core::SpecialFlowAttribute::UUID).value_or(utils::IdGenerator::getIdGenerator()->generate().to_string()); + const auto original_filename = original.getAttribute(core::SpecialFlowAttribute::FILENAME).value_or(""); + for (size_t split_i = 0; split_i < splits.size(); ++split_i) { + const auto& split = splits[split_i]; + split->setAttribute(SegmentContent::FragmentCountOutputAttribute.name, std::to_string(splits.size())); + split->setAttribute(SegmentContent::FragmentIndexOutputAttribute.name, std::to_string(split_i + 1)); // One based indexing + split->setAttribute(SegmentContent::FragmentIdentifierOutputAttribute.name, fragment_identifier_); + split->setAttribute(SegmentContent::SegmentOriginalFilenameOutputAttribute.name, original_filename); + session.transfer(split, SegmentContent::Segments); + } +} +} // namespace + +void SegmentContent::onTrigger(core::ProcessContext& context, core::ProcessSession& session) { + const auto original = session.get(); + if (!original) { + context.yield(); + return; + } + + size_t max_segment_size{}; + const auto segment_size_str = context.getProperty(SegmentSize, original.get()); + if (!segment_size_str || !core::DataSizeValue::StringToInt(*segment_size_str, max_segment_size) || max_segment_size == 0) { + throw Exception(PROCESSOR_EXCEPTION, fmt::format("Invalid Segment Size: '{}'", segment_size_str.value_or(""))); + } + + const auto ff_content_stream = session.getFlowFileContentStream(*original); + if (!ff_content_stream) { + throw Exception(PROCESSOR_EXCEPTION, fmt::format("Couldn't access the ContentStream of {}", original->getUUID().to_string())); + } + + std::vector<std::byte> buffer; + std::vector<std::shared_ptr<core::FlowFile>> segments{}; + + size_t current_segment_size = 0; + size_t num_bytes_read{}; + bool needs_new_segment = true; + while (true) { + const size_t segment_remaining_size = max_segment_size - current_segment_size; + const size_t buffer_size = std::min(BUFFER_TARGET_SIZE, segment_remaining_size); + buffer.resize(buffer_size); + num_bytes_read = ff_content_stream->read(buffer); + if (io::isError(num_bytes_read)) { + logger_->log_error("Error while reading from {}", original->getUUID().to_string()); + break; + } + if (num_bytes_read == 0) { // No more data + break; + } + if (needs_new_segment) { + segments.push_back(session.create()); + needs_new_segment = false; + } + buffer.resize(num_bytes_read); + session.appendBuffer(segments.back(), buffer); + current_segment_size += num_bytes_read; + if (current_segment_size >= max_segment_size) { // Defensive >= (read shouldn't read larger than requested size) + needs_new_segment = true; + current_segment_size = 0; + } + }; + + updateSplitAttributesAndTransfer(session, segments, *original); + session.transfer(original, Original); +} + +REGISTER_RESOURCE(SegmentContent, Processor); + +} // namespace org::apache::nifi::minifi::processors diff --git a/extensions/standard-processors/processors/SegmentContent.h b/extensions/standard-processors/processors/SegmentContent.h new file mode 100644 index 000000000..76e6f083c --- /dev/null +++ b/extensions/standard-processors/processors/SegmentContent.h @@ -0,0 +1,78 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include <memory> +#include <optional> +#include <string_view> +#include <utility> + +#include "FlowFileRecord.h" +#include "core/ProcessSession.h" +#include "core/Processor.h" +#include "core/PropertyDefinition.h" +#include "core/PropertyDefinitionBuilder.h" +#include "core/RelationshipDefinition.h" +#include "utils/Export.h" + +namespace org::apache::nifi::minifi::processors { + +class SegmentContent final : public core::Processor { + public: + explicit SegmentContent(const std::string_view name, const utils::Identifier& uuid = {}) : Processor(name, uuid) {} + + EXTENSIONAPI static constexpr auto Description = "Segments a FlowFile into multiple smaller segments on byte boundaries."; + + EXTENSIONAPI static constexpr auto SegmentSize = + core::PropertyDefinitionBuilder<>::createProperty("Segment Size") + .withDescription("The maximum data size in bytes for each segment") + .isRequired(true) + .supportsExpressionLanguage(true) + .build(); + + EXTENSIONAPI static constexpr auto Properties = std::to_array<core::PropertyReference>({SegmentSize}); + + EXTENSIONAPI static constexpr auto Segments = core::RelationshipDefinition{ + "segments", "All segments will be sent to this relationship. If the file was small enough that it was not segmented, a copy of the original is sent to this relationship as well as original"}; + EXTENSIONAPI static constexpr auto Original = core::RelationshipDefinition{"original", "The original FlowFile will be sent to this relationship"}; + EXTENSIONAPI static constexpr auto Relationships = std::array{Original, Segments}; + + EXTENSIONAPI static constexpr auto FragmentIdentifierOutputAttribute = + core::OutputAttributeDefinition<0>{"fragment.identifier", {}, "All segments produced from the same parent FlowFile will have the same randomly generated UUID added for this attribute"}; + EXTENSIONAPI static constexpr auto FragmentIndexOutputAttribute = + core::OutputAttributeDefinition<0>{"fragment.index", {}, "A sequence number starting with 1 that indicates the ordering of the segments that were created from a single parent FlowFile"}; + EXTENSIONAPI static constexpr auto FragmentCountOutputAttribute = core::OutputAttributeDefinition<0>{"fragment.count", {}, "The number of segments generated from the parent FlowFile"}; + EXTENSIONAPI static constexpr auto SegmentOriginalFilenameOutputAttribute = core::OutputAttributeDefinition<0>{"segment.original.filename", {}, "The filename of the parent FlowFile"}; + EXTENSIONAPI static constexpr auto OutputAttributes = + std::to_array<core::OutputAttributeReference>({FragmentIdentifierOutputAttribute, FragmentIndexOutputAttribute, FragmentCountOutputAttribute, SegmentOriginalFilenameOutputAttribute}); + + EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false; + EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false; + EXTENSIONAPI static constexpr auto InputRequirement = core::annotation::Input::INPUT_REQUIRED; + EXTENSIONAPI static constexpr bool IsSingleThreaded = false; + ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS + + void onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& session_factory) override; + void onTrigger(core::ProcessContext& context, core::ProcessSession& session) override; + void initialize() override; + + private: + std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<SegmentContent>::getLogger(uuid_); +}; + +} // namespace org::apache::nifi::minifi::processors diff --git a/extensions/standard-processors/tests/unit/SegmentContentTests.cpp b/extensions/standard-processors/tests/unit/SegmentContentTests.cpp new file mode 100644 index 000000000..a85377284 --- /dev/null +++ b/extensions/standard-processors/tests/unit/SegmentContentTests.cpp @@ -0,0 +1,263 @@ +/** +* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <random> + +#include "FlowFileRecord.h" +#include "catch2/generators/catch_generators.hpp" +#include "processors/SegmentContent.h" +#include "range/v3/algorithm/equal.hpp" +#include "unit/Catch.h" +#include "unit/SingleProcessorTestController.h" +#include "unit/TestBase.h" +#include "range/v3/algorithm/generate.hpp" + +namespace org::apache::nifi::minifi::processors::test { + +std::vector<std::byte> generateRandomData(const size_t n) { + std::independent_bits_engine<std::default_random_engine, CHAR_BIT, uint16_t> rbe{gsl::narrow_cast<uint16_t>(std::chrono::system_clock::now().time_since_epoch().count())}; + std::vector<std::byte> bytes(n); + ranges::generate(bytes, [&]() { return static_cast<std::byte>(rbe()); }); + return bytes; +} + +std::string_view calcExpectedSegment(const std::string_view original_content, const size_t segment_i, const size_t segment_size) { + const auto start_pos = segment_i * segment_size; + const auto end_pos = std::min(start_pos + segment_size, original_content.length()); + const auto actual_size = std::min(segment_size, end_pos - start_pos); + return original_content.substr(segment_i * segment_size, std::min(segment_size, actual_size)); +} + +std::span<const std::byte> calcExpectedSegment(const std::span<const std::byte> original_content, const size_t segment_i, const size_t segment_size) { + const auto start_pos = segment_i * segment_size; + const auto end_pos = std::min(start_pos + segment_size, original_content.size()); + const auto actual_size = std::min(segment_size, end_pos - start_pos); + return original_content.subspan(segment_i * segment_size, std::min(segment_size, actual_size)); +} + +template<typename... Bytes> +std::vector<std::byte> createByteVector(Bytes... bytes) { + return {static_cast<std::byte>(bytes)...}; +} + +TEST_CASE("Invalid segmentSize tests") { + const auto segment_content = std::make_shared<SegmentContent>("SegmentContent"); + minifi::test::SingleProcessorTestController controller{segment_content}; + + SECTION("foo") { + REQUIRE_NOTHROW(segment_content->setProperty(SegmentContent::SegmentSize, "foo"), "General Operation: Segment Size value validation failed"); + REQUIRE_THROWS_WITH(controller.trigger("bar"), "Processor Operation: Invalid Segment Size: 'foo'"); + } + SECTION("10 foo") { + REQUIRE_NOTHROW(segment_content->setProperty(SegmentContent::SegmentSize, "10 foo"), "General Operation: Segment Size value validation failed"); + REQUIRE_NOTHROW(controller.trigger("bar")); + } + SECTION("0") { + REQUIRE_NOTHROW(segment_content->setProperty(SegmentContent::SegmentSize, "0"), "General Operation: Segment Size value validation failed"); + REQUIRE_THROWS_WITH(controller.trigger("bar"), "Processor Operation: Invalid Segment Size: '0'"); + } + SECTION("10 MB") { + REQUIRE_NOTHROW(segment_content->setProperty(SegmentContent::SegmentSize, "10 MB"), "General Operation: Segment Size value validation failed"); + REQUIRE_NOTHROW(controller.trigger("bar")); + } +} + +TEST_CASE("EmptyFlowFile") { + const auto split_content = std::make_shared<SegmentContent>("SegmentContent"); + minifi::test::SingleProcessorTestController controller{split_content}; + split_content->setProperty(SegmentContent::SegmentSize, "10 B"); + + auto trigger_results = controller.trigger(""); + auto original = trigger_results.at(processors::SegmentContent::Original); + auto splits = trigger_results.at(processors::SegmentContent::Segments); + + REQUIRE(original.size() == 1); + REQUIRE(splits.empty()); + + CHECK(controller.plan->getContent(original[0]).empty()); +} + +TEST_CASE("SegmentContent with different sized text input") { + const auto segment_content = std::make_shared<SegmentContent>("SegmentContent"); + minifi::test::SingleProcessorTestController controller{segment_content}; + + auto [original_size, segment_size] = GENERATE( + std::make_tuple(size_t{1020}, size_t{30}), + std::make_tuple(1020, 31), + std::make_tuple(1020, 1), + std::make_tuple(2000, 30), + std::make_tuple(2000, 1010), + std::make_tuple(2000, 1050), + std::make_tuple(100, 100), + std::make_tuple(99, 100), + std::make_tuple(100, 99)); + + const std::string original_content = utils::string::repeat("a", original_size); + + segment_content->setProperty(SegmentContent::SegmentSize, std::to_string(segment_size)); + + auto trigger_results = controller.trigger(original_content); + + auto original = trigger_results.at(processors::SegmentContent::Original); + auto segments = trigger_results.at(processors::SegmentContent::Segments); + + auto expected_segment_size = gsl::narrow<size_t>(std::ceil(static_cast<double>(original_size) / static_cast<double>(segment_size))); + REQUIRE(segments.size() == expected_segment_size); + REQUIRE(original.size() == 1); + + size_t segment_size_sum = 0; + for (size_t segment_i = 0; segment_i < expected_segment_size; ++segment_i) { + auto segment_str = controller.plan->getContent(segments[segment_i]); + CHECK(segment_str == calcExpectedSegment(original_content, segment_i, segment_size)); + segment_size_sum += segment_str.length(); + } + CHECK(original_size == segment_size_sum); +} + +TEST_CASE("SegmentContent with different sized byte input") { + const auto segment_content = std::make_shared<SegmentContent>("SegmentContent"); + minifi::test::SingleProcessorTestController controller{segment_content}; + + auto [original_size, segment_size] = GENERATE( + std::make_tuple(size_t{1020}, size_t{30}), + std::make_tuple(1020, 31), + std::make_tuple(1020, 1), + std::make_tuple(2000, 30), + std::make_tuple(2000, 1010), + std::make_tuple(2000, 1050), + std::make_tuple(100, 100), + std::make_tuple(99, 100), + std::make_tuple(100, 99)); + + const auto input_data = generateRandomData(original_size); + std::string_view input(reinterpret_cast<const char*>(input_data.data()), input_data.size()); + + segment_content->setProperty(SegmentContent::SegmentSize, std::to_string(segment_size)); + + auto trigger_results = controller.trigger(input); + + auto original = trigger_results.at(processors::SegmentContent::Original); + auto segments = trigger_results.at(processors::SegmentContent::Segments); + + auto expected_segment_size = gsl::narrow<size_t>(std::ceil(static_cast<double>(original_size) / static_cast<double>(segment_size))); + REQUIRE(segments.size() == expected_segment_size); + REQUIRE(original.size() == 1); + + size_t segment_size_sum = 0; + for (size_t segment_i = 0; segment_i < expected_segment_size; ++segment_i) { + auto segment_bytes = controller.plan->getContentAsBytes(*segments[segment_i]); + CHECK(ranges::equal(segment_bytes, calcExpectedSegment(input_data, segment_i, segment_size))); + segment_size_sum += segment_bytes.size(); + } + CHECK(original_size == segment_size_sum); +} + +TEST_CASE("SimpleTest", "[NiFi]") { + const auto segment_content = std::make_shared<SegmentContent>("SegmentContent"); + minifi::test::SingleProcessorTestController controller{segment_content}; + + segment_content->setProperty(SegmentContent::SegmentSize, "4 B"); + + const auto input_data = createByteVector(1, 2, 3, 4, 5, 6, 7, 8, 9); + std::string_view input(reinterpret_cast<const char*>(input_data.data()), input_data.size()); + + auto trigger_results = controller.trigger(input, {{std::string{core::SpecialFlowAttribute::UUID}, "original_uuid"}}); + + auto original = trigger_results.at(processors::SegmentContent::Original); + auto segments = trigger_results.at(processors::SegmentContent::Segments); + + REQUIRE(segments.size() == 3); + REQUIRE(original.size() == 1); + + auto expected_segment_1 = createByteVector(1, 2, 3, 4); + auto expected_segment_2 = createByteVector(5, 6, 7, 8); + auto expected_segment_3 = createByteVector(9); + + CHECK(controller.plan->getContentAsBytes(*original[0]) == input_data); + CHECK(controller.plan->getContentAsBytes(*segments[0]) == expected_segment_1); + CHECK(controller.plan->getContentAsBytes(*segments[1]) == expected_segment_2); + CHECK(controller.plan->getContentAsBytes(*segments[2]) == expected_segment_3); + + auto flowfile_filename = *original[0]->getAttribute(core::SpecialFlowAttribute::FILENAME); + + CHECK(segments[0]->getAttribute(SegmentContent::SegmentOriginalFilenameOutputAttribute.name) == flowfile_filename); + CHECK(segments[1]->getAttribute(SegmentContent::SegmentOriginalFilenameOutputAttribute.name) == flowfile_filename); + CHECK(segments[2]->getAttribute(SegmentContent::SegmentOriginalFilenameOutputAttribute.name) == flowfile_filename); + + CHECK(segments[0]->getAttribute(SegmentContent::FragmentIdentifierOutputAttribute.name) == "original_uuid"); + CHECK(segments[1]->getAttribute(SegmentContent::FragmentIdentifierOutputAttribute.name) == "original_uuid"); + CHECK(segments[2]->getAttribute(SegmentContent::FragmentIdentifierOutputAttribute.name) == "original_uuid"); + + CHECK(segments[0]->getAttribute(SegmentContent::FragmentCountOutputAttribute.name) == "3"); + CHECK(segments[1]->getAttribute(SegmentContent::FragmentCountOutputAttribute.name) == "3"); + CHECK(segments[2]->getAttribute(SegmentContent::FragmentCountOutputAttribute.name) == "3"); + + CHECK(segments[0]->getAttribute(SegmentContent::FragmentIndexOutputAttribute.name) == "1"); + CHECK(segments[1]->getAttribute(SegmentContent::FragmentIndexOutputAttribute.name) == "2"); + CHECK(segments[2]->getAttribute(SegmentContent::FragmentIndexOutputAttribute.name) == "3"); +} + +TEST_CASE("TransferSmall", "[NiFi]") { + const auto segment_content = std::make_shared<SegmentContent>("SegmentContent"); + minifi::test::SingleProcessorTestController controller{segment_content}; + + segment_content->setProperty(SegmentContent::SegmentSize, "4 KB"); + + const auto input_data = createByteVector(1, 2, 3, 4, 5, 6, 7, 8, 9); + std::string_view input(reinterpret_cast<const char*>(input_data.data()), input_data.size()); + + auto trigger_results = controller.trigger(input); + + auto original = trigger_results.at(processors::SegmentContent::Original); + auto segments = trigger_results.at(processors::SegmentContent::Segments); + + REQUIRE(segments.size() == 1); + REQUIRE(original.size() == 1); + + CHECK(controller.plan->getContentAsBytes(*segments[0]) == input_data); + CHECK(controller.plan->getContentAsBytes(*original[0]) == input_data); +} + +TEST_CASE("ExpressionLanguageSupport", "[NiFi]") { + const auto segment_content = std::make_shared<SegmentContent>("SegmentContent"); + minifi::test::SingleProcessorTestController controller{segment_content}; + + segment_content->setProperty(SegmentContent::SegmentSize, "${segmentSize}"); + + const auto input_data = createByteVector(1, 2, 3, 4, 5, 6, 7, 8, 9); + std::string_view input(reinterpret_cast<const char*>(input_data.data()), input_data.size()); + + auto trigger_results = controller.trigger(input, {{"segmentSize", "4 B"}}); + + auto original = trigger_results.at(processors::SegmentContent::Original); + auto segments = trigger_results.at(processors::SegmentContent::Segments); + + REQUIRE(segments.size() == 3); + REQUIRE(original.size() == 1); + + auto expected_segment_1 = createByteVector(1, 2, 3, 4); + auto expected_segment_2 = createByteVector(5, 6, 7, 8); + auto expected_segment_3 = createByteVector(9); + + CHECK(controller.plan->getContentAsBytes(*original[0]) == input_data); + CHECK(controller.plan->getContentAsBytes(*segments[0]) == expected_segment_1); + CHECK(controller.plan->getContentAsBytes(*segments[1]) == expected_segment_2); + CHECK(controller.plan->getContentAsBytes(*segments[2]) == expected_segment_3); +} + +} // namespace org::apache::nifi::minifi::processors::test
