This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 24e19a324f76cc8ef92ac0a5c93308a2e53ee7e8
Author: Martin Zink <[email protected]>
AuthorDate: Mon Oct 28 15:59:33 2024 +0100

    MINIFICPP-2471 SegmentContent
    
    Closes #1879
    
    Signed-off-by: Marton Szasz <[email protected]>
---
 PROCESSORS.md                                      |  31 +++
 README.md                                          |   2 +-
 .../processors/SegmentContent.cpp                  | 108 +++++++++
 .../processors/SegmentContent.h                    |  78 ++++++
 .../tests/unit/SegmentContentTests.cpp             | 263 +++++++++++++++++++++
 5 files changed, 481 insertions(+), 1 deletion(-)

diff --git a/PROCESSORS.md b/PROCESSORS.md
index 0a637c31c..9a09eae09 100644
--- a/PROCESSORS.md
+++ b/PROCESSORS.md
@@ -2810,6 +2810,37 @@ In the list below, the names of required properties 
appear in bold. Any other pr
 | success | All files, containing log events, are routed to success |
 
 
+## SegmentContent
+
+### Description
+
+Segments a FlowFile into multiple smaller segments on byte boundaries.
+
+### Properties
+
+In the list below, the names of required properties appear in bold. Any other 
properties (not in bold) are considered optional. The table also indicates any 
default values, and whether a property supports the NiFi Expression Language.
+
+| Name             | Default Value | Allowable Values | Description            
                                                                    |
+|------------------|---------------|------------------|--------------------------------------------------------------------------------------------|
+| **Segment Size** |               |                  | The maximum data size 
in bytes for each segment<br/>**Supports Expression Language: true** |
+
+### Relationships
+
+| Name     | Description                                                       
                                                                                
                              |
+|----------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| original | The original FlowFile will be sent to this relationship           
                                                                                
                              |
+| segments | All segments will be sent to this relationship. If the file was 
small enough that it was not segmented, a copy of the original is sent to this 
relationship as well as original |
+
+### Output Attributes
+
+| Attribute                 | Relationship | Description                       
                                                                                
            |
+|---------------------------|--------------|-------------------------------------------------------------------------------------------------------------------------------|
+| fragment.identifier       |              | All segments produced from the 
same parent FlowFile will have the same randomly generated UUID added for this 
attribute       |
+| fragment.index            |              | A sequence number starting with 1 
that indicates the ordering of the segments that were created from a single 
parent FlowFile |
+| fragment.count            |              | The number of segments generated 
from the parent FlowFile                                                        
             |
+| segment.original.filename |              | The filename of the parent 
FlowFile                                                                        
                   |
+
+
 ## SplitContent
 
 ### Description
diff --git a/README.md b/README.md
index a79023fd8..8285e4b4d 100644
--- a/README.md
+++ b/README.md
@@ -65,7 +65,7 @@ The following table lists the base set of processors.
 
 | Extension Set | Processors                                                   
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
 
|---------------|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 [...]
-| **Base**      | 
[AppendHostInfo](PROCESSORS.md#appendhostinfo)<br/>[AttributesToJSON](PROCESSORS.md#attributestojson)<br/>[DefragmentText](PROCESSORS.md#defragmenttext)<br/>[ExecuteProcess](PROCESSORS.md#executeprocess)<br/>[ExtractText](PROCESSORS.md#extracttext)<br/>[FetchFile](PROCESSORS.md#fetchfile)<br/>[GenerateFlowFile](PROCESSORS.md#generateflowfile)<br/>[GetFile](PROCESSORS.md#getfile)<br/>[GetTCP](PROCESSORS.md#gettcp)<br/>[HashContent](PROCESSORS.md#hashcontent)<br/>[InvokeH
 [...]
+| **Base**      | 
[AppendHostInfo](PROCESSORS.md#appendhostinfo)<br/>[AttributesToJSON](PROCESSORS.md#attributestojson)<br/>[DefragmentText](PROCESSORS.md#defragmenttext)<br/>[ExecuteProcess](PROCESSORS.md#executeprocess)<br/>[ExtractText](PROCESSORS.md#extracttext)<br/>[FetchFile](PROCESSORS.md#fetchfile)<br/>[GenerateFlowFile](PROCESSORS.md#generateflowfile)<br/>[GetFile](PROCESSORS.md#getfile)<br/>[GetTCP](PROCESSORS.md#gettcp)<br/>[HashContent](PROCESSORS.md#hashcontent)<br/>[InvokeH
 [...]
 
 The next table outlines CMAKE flags that correspond with MiNiFi extensions. 
Extensions that are enabled by default ( such as RocksDB ), can be disabled 
with the respective CMAKE flag on the command line.
 
diff --git a/extensions/standard-processors/processors/SegmentContent.cpp 
b/extensions/standard-processors/processors/SegmentContent.cpp
new file mode 100644
index 000000000..d8fbe7643
--- /dev/null
+++ b/extensions/standard-processors/processors/SegmentContent.cpp
@@ -0,0 +1,108 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "SegmentContent.h"
+
+#include "core/FlowFile.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/Resource.h"
+#include "utils/ProcessorConfigUtils.h"
+
+namespace org::apache::nifi::minifi::processors {
+
+constexpr size_t BUFFER_TARGET_SIZE = 1024;
+
+void SegmentContent::initialize() {
+  setSupportedProperties(Properties);
+  setSupportedRelationships(Relationships);
+}
+
+void SegmentContent::onSchedule(core::ProcessContext&, 
core::ProcessSessionFactory&) {}
+
+namespace {
+void updateSplitAttributesAndTransfer(core::ProcessSession& session, const 
std::vector<std::shared_ptr<core::FlowFile>>& splits, const core::FlowFile& 
original) {
+  const std::string fragment_identifier_ = 
original.getAttribute(core::SpecialFlowAttribute::UUID).value_or(utils::IdGenerator::getIdGenerator()->generate().to_string());
+  const auto original_filename = 
original.getAttribute(core::SpecialFlowAttribute::FILENAME).value_or("");
+  for (size_t split_i = 0; split_i < splits.size(); ++split_i) {
+    const auto& split = splits[split_i];
+    split->setAttribute(SegmentContent::FragmentCountOutputAttribute.name, 
std::to_string(splits.size()));
+    split->setAttribute(SegmentContent::FragmentIndexOutputAttribute.name, 
std::to_string(split_i + 1));  // One based indexing
+    
split->setAttribute(SegmentContent::FragmentIdentifierOutputAttribute.name, 
fragment_identifier_);
+    
split->setAttribute(SegmentContent::SegmentOriginalFilenameOutputAttribute.name,
 original_filename);
+    session.transfer(split, SegmentContent::Segments);
+  }
+}
+}  // namespace
+
+void SegmentContent::onTrigger(core::ProcessContext& context, 
core::ProcessSession& session) {
+  const auto original = session.get();
+  if (!original) {
+    context.yield();
+    return;
+  }
+
+  size_t max_segment_size{};
+  const auto segment_size_str = context.getProperty(SegmentSize, 
original.get());
+  if (!segment_size_str || 
!core::DataSizeValue::StringToInt(*segment_size_str, max_segment_size) || 
max_segment_size == 0) {
+    throw Exception(PROCESSOR_EXCEPTION, fmt::format("Invalid Segment Size: 
'{}'", segment_size_str.value_or("")));
+  }
+
+  const auto ff_content_stream = session.getFlowFileContentStream(*original);
+  if (!ff_content_stream) {
+    throw Exception(PROCESSOR_EXCEPTION, fmt::format("Couldn't access the 
ContentStream of {}", original->getUUID().to_string()));
+  }
+
+  std::vector<std::byte> buffer;
+  std::vector<std::shared_ptr<core::FlowFile>> segments{};
+
+  size_t current_segment_size = 0;
+  size_t num_bytes_read{};
+  bool needs_new_segment = true;
+  while (true) {
+    const size_t segment_remaining_size = max_segment_size - 
current_segment_size;
+    const size_t buffer_size = std::min(BUFFER_TARGET_SIZE, 
segment_remaining_size);
+    buffer.resize(buffer_size);
+    num_bytes_read = ff_content_stream->read(buffer);
+    if (io::isError(num_bytes_read)) {
+      logger_->log_error("Error while reading from {}", 
original->getUUID().to_string());
+      break;
+    }
+    if (num_bytes_read == 0) {  // No more data
+      break;
+    }
+    if (needs_new_segment) {
+      segments.push_back(session.create());
+      needs_new_segment = false;
+    }
+    buffer.resize(num_bytes_read);
+    session.appendBuffer(segments.back(), buffer);
+    current_segment_size += num_bytes_read;
+    if (current_segment_size >= max_segment_size) {  // Defensive >= (read 
shouldn't read larger than requested size)
+      needs_new_segment = true;
+      current_segment_size = 0;
+    }
+  };
+
+  updateSplitAttributesAndTransfer(session, segments, *original);
+  session.transfer(original, Original);
+}
+
+REGISTER_RESOURCE(SegmentContent, Processor);
+
+}  // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/standard-processors/processors/SegmentContent.h 
b/extensions/standard-processors/processors/SegmentContent.h
new file mode 100644
index 000000000..76e6f083c
--- /dev/null
+++ b/extensions/standard-processors/processors/SegmentContent.h
@@ -0,0 +1,78 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <memory>
+#include <optional>
+#include <string_view>
+#include <utility>
+
+#include "FlowFileRecord.h"
+#include "core/ProcessSession.h"
+#include "core/Processor.h"
+#include "core/PropertyDefinition.h"
+#include "core/PropertyDefinitionBuilder.h"
+#include "core/RelationshipDefinition.h"
+#include "utils/Export.h"
+
+namespace org::apache::nifi::minifi::processors {
+
+class SegmentContent final : public core::Processor {
+ public:
+  explicit SegmentContent(const std::string_view name, const 
utils::Identifier& uuid = {}) : Processor(name, uuid) {}
+
+  EXTENSIONAPI static constexpr auto Description = "Segments a FlowFile into 
multiple smaller segments on byte boundaries.";
+
+  EXTENSIONAPI static constexpr auto SegmentSize =
+      core::PropertyDefinitionBuilder<>::createProperty("Segment Size")
+        .withDescription("The maximum data size in bytes for each segment")
+        .isRequired(true)
+        .supportsExpressionLanguage(true)
+        .build();
+
+  EXTENSIONAPI static constexpr auto Properties = 
std::to_array<core::PropertyReference>({SegmentSize});
+
+  EXTENSIONAPI static constexpr auto Segments = core::RelationshipDefinition{
+      "segments", "All segments will be sent to this relationship. If the file 
was small enough that it was not segmented, a copy of the original is sent to 
this relationship as well as original"};
+  EXTENSIONAPI static constexpr auto Original = 
core::RelationshipDefinition{"original", "The original FlowFile will be sent to 
this relationship"};
+  EXTENSIONAPI static constexpr auto Relationships = std::array{Original, 
Segments};
+
+  EXTENSIONAPI static constexpr auto FragmentIdentifierOutputAttribute =
+      core::OutputAttributeDefinition<0>{"fragment.identifier", {}, "All 
segments produced from the same parent FlowFile will have the same randomly 
generated UUID added for this attribute"};
+  EXTENSIONAPI static constexpr auto FragmentIndexOutputAttribute =
+      core::OutputAttributeDefinition<0>{"fragment.index", {}, "A sequence 
number starting with 1 that indicates the ordering of the segments that were 
created from a single parent FlowFile"};
+  EXTENSIONAPI static constexpr auto FragmentCountOutputAttribute = 
core::OutputAttributeDefinition<0>{"fragment.count", {}, "The number of 
segments generated from the parent FlowFile"};
+  EXTENSIONAPI static constexpr auto SegmentOriginalFilenameOutputAttribute = 
core::OutputAttributeDefinition<0>{"segment.original.filename", {}, "The 
filename of the parent FlowFile"};
+  EXTENSIONAPI static constexpr auto OutputAttributes =
+      
std::to_array<core::OutputAttributeReference>({FragmentIdentifierOutputAttribute,
 FragmentIndexOutputAttribute, FragmentCountOutputAttribute, 
SegmentOriginalFilenameOutputAttribute});
+
+  EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false;
+  EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false;
+  EXTENSIONAPI static constexpr auto InputRequirement = 
core::annotation::Input::INPUT_REQUIRED;
+  EXTENSIONAPI static constexpr bool IsSingleThreaded = false;
+  ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
+
+  void onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& 
session_factory) override;
+  void onTrigger(core::ProcessContext& context, core::ProcessSession& session) 
override;
+  void initialize() override;
+
+ private:
+  std::shared_ptr<core::logging::Logger> logger_ = 
core::logging::LoggerFactory<SegmentContent>::getLogger(uuid_);
+};
+
+}  // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/standard-processors/tests/unit/SegmentContentTests.cpp 
b/extensions/standard-processors/tests/unit/SegmentContentTests.cpp
new file mode 100644
index 000000000..a85377284
--- /dev/null
+++ b/extensions/standard-processors/tests/unit/SegmentContentTests.cpp
@@ -0,0 +1,263 @@
+/**
+*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <random>
+
+#include "FlowFileRecord.h"
+#include "catch2/generators/catch_generators.hpp"
+#include "processors/SegmentContent.h"
+#include "range/v3/algorithm/equal.hpp"
+#include "unit/Catch.h"
+#include "unit/SingleProcessorTestController.h"
+#include "unit/TestBase.h"
+#include "range/v3/algorithm/generate.hpp"
+
+namespace org::apache::nifi::minifi::processors::test {
+
+std::vector<std::byte> generateRandomData(const size_t n) {
+  std::independent_bits_engine<std::default_random_engine, CHAR_BIT, uint16_t> 
rbe{gsl::narrow_cast<uint16_t>(std::chrono::system_clock::now().time_since_epoch().count())};
+  std::vector<std::byte> bytes(n);
+  ranges::generate(bytes, [&]() { return static_cast<std::byte>(rbe()); });
+  return bytes;
+}
+
+std::string_view calcExpectedSegment(const std::string_view original_content, 
const size_t segment_i, const size_t segment_size) {
+  const auto start_pos = segment_i * segment_size;
+  const auto end_pos = std::min(start_pos + segment_size, 
original_content.length());
+  const auto actual_size = std::min(segment_size, end_pos - start_pos);
+  return original_content.substr(segment_i * segment_size, 
std::min(segment_size, actual_size));
+}
+
+std::span<const std::byte> calcExpectedSegment(const std::span<const 
std::byte> original_content, const size_t segment_i, const size_t segment_size) 
{
+  const auto start_pos = segment_i * segment_size;
+  const auto end_pos = std::min(start_pos + segment_size, 
original_content.size());
+  const auto actual_size = std::min(segment_size, end_pos - start_pos);
+  return original_content.subspan(segment_i * segment_size, 
std::min(segment_size, actual_size));
+}
+
+template<typename... Bytes>
+std::vector<std::byte> createByteVector(Bytes... bytes) {
+  return {static_cast<std::byte>(bytes)...};
+}
+
+TEST_CASE("Invalid segmentSize tests") {
+  const auto segment_content = 
std::make_shared<SegmentContent>("SegmentContent");
+  minifi::test::SingleProcessorTestController controller{segment_content};
+
+  SECTION("foo") {
+    REQUIRE_NOTHROW(segment_content->setProperty(SegmentContent::SegmentSize, 
"foo"), "General Operation: Segment Size value validation failed");
+    REQUIRE_THROWS_WITH(controller.trigger("bar"), "Processor Operation: 
Invalid Segment Size: 'foo'");
+  }
+  SECTION("10 foo") {
+    REQUIRE_NOTHROW(segment_content->setProperty(SegmentContent::SegmentSize, 
"10 foo"), "General Operation: Segment Size value validation failed");
+    REQUIRE_NOTHROW(controller.trigger("bar"));
+  }
+  SECTION("0") {
+    REQUIRE_NOTHROW(segment_content->setProperty(SegmentContent::SegmentSize, 
"0"), "General Operation: Segment Size value validation failed");
+    REQUIRE_THROWS_WITH(controller.trigger("bar"), "Processor Operation: 
Invalid Segment Size: '0'");
+  }
+  SECTION("10 MB") {
+    REQUIRE_NOTHROW(segment_content->setProperty(SegmentContent::SegmentSize, 
"10 MB"), "General Operation: Segment Size value validation failed");
+    REQUIRE_NOTHROW(controller.trigger("bar"));
+  }
+}
+
+TEST_CASE("EmptyFlowFile") {
+  const auto split_content = 
std::make_shared<SegmentContent>("SegmentContent");
+  minifi::test::SingleProcessorTestController controller{split_content};
+  split_content->setProperty(SegmentContent::SegmentSize, "10 B");
+
+  auto trigger_results = controller.trigger("");
+  auto original = trigger_results.at(processors::SegmentContent::Original);
+  auto splits = trigger_results.at(processors::SegmentContent::Segments);
+
+  REQUIRE(original.size() == 1);
+  REQUIRE(splits.empty());
+
+  CHECK(controller.plan->getContent(original[0]).empty());
+}
+
+TEST_CASE("SegmentContent with different sized text input") {
+  const auto segment_content = 
std::make_shared<SegmentContent>("SegmentContent");
+  minifi::test::SingleProcessorTestController controller{segment_content};
+
+  auto [original_size, segment_size] = GENERATE(
+    std::make_tuple(size_t{1020}, size_t{30}),
+    std::make_tuple(1020, 31),
+    std::make_tuple(1020, 1),
+    std::make_tuple(2000, 30),
+    std::make_tuple(2000, 1010),
+    std::make_tuple(2000, 1050),
+    std::make_tuple(100, 100),
+    std::make_tuple(99, 100),
+    std::make_tuple(100, 99));
+
+  const std::string original_content = utils::string::repeat("a", 
original_size);
+
+  segment_content->setProperty(SegmentContent::SegmentSize, 
std::to_string(segment_size));
+
+  auto trigger_results = controller.trigger(original_content);
+
+  auto original = trigger_results.at(processors::SegmentContent::Original);
+  auto segments = trigger_results.at(processors::SegmentContent::Segments);
+
+  auto expected_segment_size = 
gsl::narrow<size_t>(std::ceil(static_cast<double>(original_size) / 
static_cast<double>(segment_size)));
+  REQUIRE(segments.size() == expected_segment_size);
+  REQUIRE(original.size() == 1);
+
+  size_t segment_size_sum = 0;
+  for (size_t segment_i = 0; segment_i < expected_segment_size; ++segment_i) {
+    auto segment_str = controller.plan->getContent(segments[segment_i]);
+    CHECK(segment_str == calcExpectedSegment(original_content, segment_i, 
segment_size));
+    segment_size_sum += segment_str.length();
+  }
+  CHECK(original_size == segment_size_sum);
+}
+
+TEST_CASE("SegmentContent with different sized byte input") {
+  const auto segment_content = 
std::make_shared<SegmentContent>("SegmentContent");
+  minifi::test::SingleProcessorTestController controller{segment_content};
+
+  auto [original_size, segment_size] = GENERATE(
+    std::make_tuple(size_t{1020}, size_t{30}),
+    std::make_tuple(1020, 31),
+    std::make_tuple(1020, 1),
+    std::make_tuple(2000, 30),
+    std::make_tuple(2000, 1010),
+    std::make_tuple(2000, 1050),
+    std::make_tuple(100, 100),
+    std::make_tuple(99, 100),
+    std::make_tuple(100, 99));
+
+  const auto input_data = generateRandomData(original_size);
+  std::string_view input(reinterpret_cast<const char*>(input_data.data()), 
input_data.size());
+
+  segment_content->setProperty(SegmentContent::SegmentSize, 
std::to_string(segment_size));
+
+  auto trigger_results = controller.trigger(input);
+
+  auto original = trigger_results.at(processors::SegmentContent::Original);
+  auto segments = trigger_results.at(processors::SegmentContent::Segments);
+
+  auto expected_segment_size = 
gsl::narrow<size_t>(std::ceil(static_cast<double>(original_size) / 
static_cast<double>(segment_size)));
+  REQUIRE(segments.size() == expected_segment_size);
+  REQUIRE(original.size() == 1);
+
+  size_t segment_size_sum = 0;
+  for (size_t segment_i = 0; segment_i < expected_segment_size; ++segment_i) {
+    auto segment_bytes = 
controller.plan->getContentAsBytes(*segments[segment_i]);
+    CHECK(ranges::equal(segment_bytes, calcExpectedSegment(input_data, 
segment_i, segment_size)));
+    segment_size_sum += segment_bytes.size();
+  }
+  CHECK(original_size == segment_size_sum);
+}
+
+TEST_CASE("SimpleTest", "[NiFi]") {
+  const auto segment_content = 
std::make_shared<SegmentContent>("SegmentContent");
+  minifi::test::SingleProcessorTestController controller{segment_content};
+
+  segment_content->setProperty(SegmentContent::SegmentSize, "4 B");
+
+  const auto input_data = createByteVector(1, 2, 3, 4, 5, 6, 7, 8, 9);
+  std::string_view input(reinterpret_cast<const char*>(input_data.data()), 
input_data.size());
+
+  auto trigger_results = controller.trigger(input, 
{{std::string{core::SpecialFlowAttribute::UUID}, "original_uuid"}});
+
+  auto original = trigger_results.at(processors::SegmentContent::Original);
+  auto segments = trigger_results.at(processors::SegmentContent::Segments);
+
+  REQUIRE(segments.size() == 3);
+  REQUIRE(original.size() == 1);
+
+  auto expected_segment_1 = createByteVector(1, 2, 3, 4);
+  auto expected_segment_2 = createByteVector(5, 6, 7, 8);
+  auto expected_segment_3 = createByteVector(9);
+
+  CHECK(controller.plan->getContentAsBytes(*original[0]) == input_data);
+  CHECK(controller.plan->getContentAsBytes(*segments[0]) == 
expected_segment_1);
+  CHECK(controller.plan->getContentAsBytes(*segments[1]) == 
expected_segment_2);
+  CHECK(controller.plan->getContentAsBytes(*segments[2]) == 
expected_segment_3);
+
+  auto flowfile_filename = 
*original[0]->getAttribute(core::SpecialFlowAttribute::FILENAME);
+
+  
CHECK(segments[0]->getAttribute(SegmentContent::SegmentOriginalFilenameOutputAttribute.name)
 == flowfile_filename);
+  
CHECK(segments[1]->getAttribute(SegmentContent::SegmentOriginalFilenameOutputAttribute.name)
 == flowfile_filename);
+  
CHECK(segments[2]->getAttribute(SegmentContent::SegmentOriginalFilenameOutputAttribute.name)
 == flowfile_filename);
+
+  
CHECK(segments[0]->getAttribute(SegmentContent::FragmentIdentifierOutputAttribute.name)
 == "original_uuid");
+  
CHECK(segments[1]->getAttribute(SegmentContent::FragmentIdentifierOutputAttribute.name)
 == "original_uuid");
+  
CHECK(segments[2]->getAttribute(SegmentContent::FragmentIdentifierOutputAttribute.name)
 == "original_uuid");
+
+  
CHECK(segments[0]->getAttribute(SegmentContent::FragmentCountOutputAttribute.name)
 == "3");
+  
CHECK(segments[1]->getAttribute(SegmentContent::FragmentCountOutputAttribute.name)
 == "3");
+  
CHECK(segments[2]->getAttribute(SegmentContent::FragmentCountOutputAttribute.name)
 == "3");
+
+  
CHECK(segments[0]->getAttribute(SegmentContent::FragmentIndexOutputAttribute.name)
 == "1");
+  
CHECK(segments[1]->getAttribute(SegmentContent::FragmentIndexOutputAttribute.name)
 == "2");
+  
CHECK(segments[2]->getAttribute(SegmentContent::FragmentIndexOutputAttribute.name)
 == "3");
+}
+
+TEST_CASE("TransferSmall", "[NiFi]") {
+  const auto segment_content = 
std::make_shared<SegmentContent>("SegmentContent");
+  minifi::test::SingleProcessorTestController controller{segment_content};
+
+  segment_content->setProperty(SegmentContent::SegmentSize, "4 KB");
+
+  const auto input_data = createByteVector(1, 2, 3, 4, 5, 6, 7, 8, 9);
+  std::string_view input(reinterpret_cast<const char*>(input_data.data()), 
input_data.size());
+
+  auto trigger_results = controller.trigger(input);
+
+  auto original = trigger_results.at(processors::SegmentContent::Original);
+  auto segments = trigger_results.at(processors::SegmentContent::Segments);
+
+  REQUIRE(segments.size() == 1);
+  REQUIRE(original.size() == 1);
+
+  CHECK(controller.plan->getContentAsBytes(*segments[0]) == input_data);
+  CHECK(controller.plan->getContentAsBytes(*original[0]) == input_data);
+}
+
+TEST_CASE("ExpressionLanguageSupport", "[NiFi]") {
+  const auto segment_content = 
std::make_shared<SegmentContent>("SegmentContent");
+  minifi::test::SingleProcessorTestController controller{segment_content};
+
+  segment_content->setProperty(SegmentContent::SegmentSize, "${segmentSize}");
+
+  const auto input_data = createByteVector(1, 2, 3, 4, 5, 6, 7, 8, 9);
+  std::string_view input(reinterpret_cast<const char*>(input_data.data()), 
input_data.size());
+
+  auto trigger_results = controller.trigger(input, {{"segmentSize", "4 B"}});
+
+  auto original = trigger_results.at(processors::SegmentContent::Original);
+  auto segments = trigger_results.at(processors::SegmentContent::Segments);
+
+  REQUIRE(segments.size() == 3);
+  REQUIRE(original.size() == 1);
+
+  auto expected_segment_1 = createByteVector(1, 2, 3, 4);
+  auto expected_segment_2 = createByteVector(5, 6, 7, 8);
+  auto expected_segment_3 = createByteVector(9);
+
+  CHECK(controller.plan->getContentAsBytes(*original[0]) == input_data);
+  CHECK(controller.plan->getContentAsBytes(*segments[0]) == 
expected_segment_1);
+  CHECK(controller.plan->getContentAsBytes(*segments[1]) == 
expected_segment_2);
+  CHECK(controller.plan->getContentAsBytes(*segments[2]) == 
expected_segment_3);
+}
+
+}  // namespace org::apache::nifi::minifi::processors::test

Reply via email to