This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 624743dadcaff9d933f335995bccf1a430ddfea3
Author: Martin Zink <[email protected]>
AuthorDate: Mon Oct 28 15:50:02 2024 +0100

    MINIFICPP-2465 SplitContent
    
    Closes #1872
    
    Signed-off-by: Marton Szasz <[email protected]>
---
 PROCESSORS.md                                      |  35 ++
 README.md                                          |   6 +-
 .../processors/SplitContent.cpp                    | 224 ++++++++
 .../standard-processors/processors/SplitContent.h  | 130 +++++
 .../tests/unit/SplitContentTests.cpp               | 597 +++++++++++++++++++++
 libminifi/include/io/InputStream.h                 |   8 +
 libminifi/src/core/ProcessSession.cpp              |   1 +
 libminifi/test/libtest/unit/TestBase.cpp           |  18 +-
 libminifi/test/libtest/unit/TestBase.h             |   1 +
 9 files changed, 1012 insertions(+), 8 deletions(-)

diff --git a/PROCESSORS.md b/PROCESSORS.md
index ff8488673..0a637c31c 100644
--- a/PROCESSORS.md
+++ b/PROCESSORS.md
@@ -93,6 +93,7 @@ limitations under the License.
 - [RetryFlowFile](#RetryFlowFile)
 - [RouteOnAttribute](#RouteOnAttribute)
 - [RouteText](#RouteText)
+- [SplitContent](#SplitContent)
 - [SplitText](#SplitText)
 - [TailEventLog](#TailEventLog)
 - [TailFile](#TailFile)
@@ -2809,6 +2810,40 @@ In the list below, the names of required properties 
appear in bold. Any other pr
 | success | All files, containing log events, are routed to success |
 
 
+## SplitContent
+
+### Description
+
+Splits incoming FlowFiles by a specified byte sequence
+
+### Properties
+
+In the list below, the names of required properties appear in bold. Any other 
properties (not in bold) are considered optional. The table also indicates any 
default values, and whether a property supports the NiFi Expression Language.
+
+| Name                       | Default Value | Allowable Values     | 
Description                                                                     
                                                                                
                                                      |
+|----------------------------|---------------|----------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| **Byte Sequence Format**   | Hexadecimal   | Hexadecimal<br/>Text | 
Specifies how the <Byte Sequence> property should be interpreted                
                                                                                
                                                      |
+| **Byte Sequence**          |               |                      | A 
representation of bytes to look for and upon which to split the source file 
into separate files                                                             
                                                        |
+| **Keep Byte Sequence**     | false         | true<br/>false       | 
Determines whether or not the Byte Sequence should be included with each Split  
                                                                                
                                                      |
+| **Byte Sequence Location** | Trailing      | Trailing<br/>Leading | If <Keep 
Byte Sequence> is set to true, specifies whether the byte sequence should be 
added to the end of the first split or the beginning of the second; if <Keep 
Byte Sequence> is false, this property is ignored. |
+
+### Relationships
+
+| Name     | Description                                          |
+|----------|------------------------------------------------------|
+| original | The original file                                    |
+| splits   | All Splits will be routed to the splits relationship |
+
+### Output Attributes
+
+| Attribute                 | Relationship | Description                       
                                                                                
             |
+|---------------------------|--------------|--------------------------------------------------------------------------------------------------------------------------------|
+| fragment.identifier       |              | All split FlowFiles produced from 
the same parent FlowFile will have the same randomly generated UUID added for 
this attribute |
+| fragment.index            |              | A one-up number that indicates 
the ordering of the split FlowFiles that were created from a single parent 
FlowFile             |
+| fragment.count            |              | The number of split FlowFiles 
generated from the parent FlowFile                                              
                 |
+| segment.original.filename |              | The filename of the parent 
FlowFile                                                                        
                    |
+
+
 ## SplitText
 
 ### Description
diff --git a/README.md b/README.md
index b80108b8d..a79023fd8 100644
--- a/README.md
+++ b/README.md
@@ -63,9 +63,9 @@ MiNiFi - C++ supports the following C++ processors:
 
 The following table lists the base set of processors.
 
-| Extension Set | Processors                                                   
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
-|---------------|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 [...]
-| **Base**      | 
[AppendHostInfo](PROCESSORS.md#appendhostinfo)<br/>[AttributesToJSON](PROCESSORS.md#attributestojson)<br/>[DefragmentText](PROCESSORS.md#defragmenttext)<br/>[ExecuteProcess](PROCESSORS.md#executeprocess)<br/>[ExtractText](PROCESSORS.md#extracttext)<br/>[FetchFile](PROCESSORS.md#fetchfile)<br/>[GenerateFlowFile](PROCESSORS.md#generateflowfile)<br/>[GetFile](PROCESSORS.md#getfile)<br/>[GetTCP](PROCESSORS.md#gettcp)<br/>[HashContent](PROCESSORS.md#hashcontent)<br/>[InvokeH
 [...]
+| Extension Set | Processors                                                   
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+|---------------|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 [...]
+| **Base**      | 
[AppendHostInfo](PROCESSORS.md#appendhostinfo)<br/>[AttributesToJSON](PROCESSORS.md#attributestojson)<br/>[DefragmentText](PROCESSORS.md#defragmenttext)<br/>[ExecuteProcess](PROCESSORS.md#executeprocess)<br/>[ExtractText](PROCESSORS.md#extracttext)<br/>[FetchFile](PROCESSORS.md#fetchfile)<br/>[GenerateFlowFile](PROCESSORS.md#generateflowfile)<br/>[GetFile](PROCESSORS.md#getfile)<br/>[GetTCP](PROCESSORS.md#gettcp)<br/>[HashContent](PROCESSORS.md#hashcontent)<br/>[InvokeH
 [...]
 
 The next table outlines CMAKE flags that correspond with MiNiFi extensions. 
Extensions that are enabled by default ( such as RocksDB ), can be disabled 
with the respective CMAKE flag on the command line.
 
diff --git a/extensions/standard-processors/processors/SplitContent.cpp 
b/extensions/standard-processors/processors/SplitContent.cpp
new file mode 100644
index 000000000..cfcfb3416
--- /dev/null
+++ b/extensions/standard-processors/processors/SplitContent.cpp
@@ -0,0 +1,224 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "SplitContent.h"
+
+#include <range/v3/view/split.hpp>
+
+#include "core/FlowFile.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/Resource.h"
+#include "utils/ProcessorConfigUtils.h"
+#include "utils/gsl.h"
+
+namespace org::apache::nifi::minifi::processors {
+void SplitContent::initialize() {
+  setSupportedProperties(Properties);
+  setSupportedRelationships(Relationships);
+}
+
+void SplitContent::onSchedule(core::ProcessContext& context, 
core::ProcessSessionFactory&) {
+  auto byte_sequence_str = 
utils::getRequiredPropertyOrThrow<std::string>(context, ByteSequence.name);
+  const auto byte_sequence_format = 
utils::parseEnumProperty<ByteSequenceFormat>(context, 
ByteSequenceFormatProperty);
+  std::vector<std::byte> byte_sequence{};
+  if (byte_sequence_format == ByteSequenceFormat::Hexadecimal) {
+    byte_sequence = utils::string::from_hex(byte_sequence_str);
+  } else {
+    byte_sequence.resize(byte_sequence_str.size());
+    std::ranges::transform(byte_sequence_str, byte_sequence.begin(), [](char 
c) { return static_cast<std::byte>(c); });
+  }
+  if (byte_sequence.empty()) { throw Exception(PROCESS_SCHEDULE_EXCEPTION, 
"Cannot operate without byte sequence"); }
+  
byte_sequence_matcher_.emplace(ByteSequenceMatcher(std::move(byte_sequence)));
+  byte_sequence_location_ = 
utils::parseEnumProperty<ByteSequenceLocation>(context, 
ByteSequenceLocationProperty);
+  keep_byte_sequence = utils::getRequiredPropertyOrThrow<bool>(context, 
KeepByteSequence.name);
+}
+
+namespace {
+class Splitter {
+ public:
+  explicit Splitter(core::ProcessSession& session, std::optional<std::string> 
original_filename, SplitContent::ByteSequenceMatcher& byte_sequence_matcher, 
const bool keep_byte_sequence,
+      const SplitContent::ByteSequenceLocation byte_sequence_location)
+      : session_(session),
+        original_filename_(std::move(original_filename)),
+        byte_sequence_matcher_(byte_sequence_matcher),
+        keep_trailing_byte_sequence_(keep_byte_sequence && 
byte_sequence_location == SplitContent::ByteSequenceLocation::Trailing),
+        keep_leading_byte_sequence_(keep_byte_sequence && 
byte_sequence_location == SplitContent::ByteSequenceLocation::Leading) {
+    data_before_byte_sequence_.reserve(SplitContent::BUFFER_TARGET_SIZE);
+  }
+
+  Splitter(const Splitter&) = delete;
+  Splitter& operator=(const Splitter&) = delete;
+  Splitter(Splitter&&) = delete;
+  Splitter& operator=(Splitter&&) = delete;
+
+  ~Splitter() {
+    flushRemainingData();
+    updateSplitAttributesAndTransfer();
+  }
+
+  void digest(const std::byte b) {
+    const auto prev_matching_bytes = matching_bytes_;
+    matching_bytes_ = 
byte_sequence_matcher_.getNumberOfMatchingBytes(matching_bytes_, b);
+    if (matchedByteSequence()) {
+      appendDataBeforeByteSequenceToSplit();
+      if (keep_trailing_byte_sequence_) { appendByteSequenceToSplit(); }
+      closeCurrentSplit();
+
+      // possible new split
+      if (keep_leading_byte_sequence_) { appendByteSequenceToSplit(); }
+      matching_bytes_ = 0;
+      return;
+    }
+    // matching grew, no need to grow data_before_byte_sequence
+    if (matching_bytes_ > prev_matching_bytes) { return; }
+
+    if (matching_bytes_ > 0) {
+      // last byte could be part of the byte_sequence
+      std::copy_n(getByteSequence().begin(), prev_matching_bytes - 
matching_bytes_ + 1, std::back_inserter(data_before_byte_sequence_));
+    } else {
+      // last byte is not part of the byte_sequence
+      std::copy_n(getByteSequence().begin(), prev_matching_bytes - 
matching_bytes_, std::back_inserter(data_before_byte_sequence_));
+      data_before_byte_sequence_.push_back(b);
+    }
+  }
+
+  void flushIfBufferTooLarge() {
+    if (data_before_byte_sequence_.size() >= SplitContent::BUFFER_TARGET_SIZE) 
{ appendDataBeforeByteSequenceToSplit(); }
+  }
+
+ private:
+  void closeCurrentSplit() {
+    if (current_split_) {
+      completed_splits_.push_back(current_split_);
+      current_split_.reset();
+    }
+  }
+
+  void appendDataBeforeByteSequenceToSplit() {
+    if (data_before_byte_sequence_.empty()) { return; }
+    ensureCurrentSplit();
+    session_.appendBuffer(current_split_, std::span<const 
std::byte>(data_before_byte_sequence_.data(), 
data_before_byte_sequence_.size()));
+    data_before_byte_sequence_.clear();
+  }
+
+  void appendByteSequenceToSplit() {
+    ensureCurrentSplit();
+    session_.appendBuffer(current_split_, getByteSequence());
+  }
+
+  [[nodiscard]] std::span<const std::byte> getByteSequence() const { return 
byte_sequence_matcher_.getByteSequence(); }
+
+  void ensureCurrentSplit() {
+    if (!current_split_) { current_split_ = session_.create(); }
+  }
+
+  [[nodiscard]] bool matchedByteSequence() const { return matching_bytes_ == 
byte_sequence_matcher_.getByteSequence().size(); }
+
+  void flushRemainingData() {
+    if (current_split_ || !data_before_byte_sequence_.empty() || 
matching_bytes_ > 0) {
+      ensureCurrentSplit();
+      session_.appendBuffer(current_split_, std::span<const 
std::byte>(data_before_byte_sequence_.data(), 
data_before_byte_sequence_.size()));
+      session_.appendBuffer(current_split_, 
byte_sequence_matcher_.getByteSequence().subspan(0, matching_bytes_));
+      completed_splits_.push_back(current_split_);
+    }
+  }
+
+  void updateSplitAttributesAndTransfer() const {
+    const std::string fragment_identifier_ = 
utils::IdGenerator::getIdGenerator()->generate().to_string();
+    for (size_t split_i = 0; split_i < completed_splits_.size(); ++split_i) {
+      const auto& split = completed_splits_[split_i];
+      split->setAttribute(SplitContent::FragmentCountOutputAttribute.name, 
std::to_string(completed_splits_.size()));
+      split->setAttribute(SplitContent::FragmentIndexOutputAttribute.name, 
std::to_string(split_i + 1));  // One based indexing
+      
split->setAttribute(SplitContent::FragmentIdentifierOutputAttribute.name, 
fragment_identifier_);
+      
split->setAttribute(SplitContent::SegmentOriginalFilenameOutputAttribute.name, 
original_filename_.value_or(""));
+      session_.transfer(split, SplitContent::Splits);
+    }
+  }
+
+  core::ProcessSession& session_;
+  const std::optional<std::string> original_filename_;
+  SplitContent::ByteSequenceMatcher& byte_sequence_matcher_;
+  std::vector<std::byte> data_before_byte_sequence_;
+  std::shared_ptr<core::FlowFile> current_split_ = nullptr;
+  std::vector<std::shared_ptr<core::FlowFile>> completed_splits_;
+  SplitContent::size_type matching_bytes_ = 0;
+
+  const bool keep_trailing_byte_sequence_ = false;
+  const bool keep_leading_byte_sequence_ = false;
+};
+}  // namespace
+
+SplitContent::ByteSequenceMatcher::ByteSequenceMatcher(std::vector<std::byte> 
byte_sequence) : byte_sequence_(std::move(byte_sequence)) {
+  byte_sequence_nodes_.push_back(node{.byte = {}, .cache = {}, 
.previous_max_match = {}});
+  for (const auto& byte: byte_sequence_) { 
byte_sequence_nodes_.push_back(node{.byte = byte, .cache = {}, 
.previous_max_match = {}}); }
+}
+
+SplitContent::size_type 
SplitContent::ByteSequenceMatcher::getNumberOfMatchingBytes(const size_type 
number_of_currently_matching_bytes, const std::byte next_byte) {
+  gsl_Assert(number_of_currently_matching_bytes <= 
byte_sequence_nodes_.size());
+  auto& curr_go = 
byte_sequence_nodes_[number_of_currently_matching_bytes].cache;
+  if (curr_go.contains(next_byte)) { return curr_go.at(next_byte); }
+  if (next_byte == byte_sequence_nodes_[number_of_currently_matching_bytes + 
1].byte) {
+    curr_go[next_byte] = number_of_currently_matching_bytes + 1;
+    return number_of_currently_matching_bytes + 1;
+  }
+  if (number_of_currently_matching_bytes == 0) {
+    curr_go[next_byte] = 0;
+    return 0;
+  }
+
+  curr_go[next_byte] = 
getNumberOfMatchingBytes(getPreviousMaxMatch(number_of_currently_matching_bytes),
 next_byte);
+  return curr_go.at(next_byte);
+}
+
+SplitContent::size_type 
SplitContent::ByteSequenceMatcher::getPreviousMaxMatch(const size_type 
number_of_currently_matching_bytes) {
+  gsl_Assert(number_of_currently_matching_bytes <= 
byte_sequence_nodes_.size());
+  auto& prev_max_match = 
byte_sequence_nodes_[number_of_currently_matching_bytes].previous_max_match;
+  if (prev_max_match) { return *prev_max_match; }
+  if (number_of_currently_matching_bytes <= 1) {
+    prev_max_match = 0;
+    return 0;
+  }
+  prev_max_match = 
getNumberOfMatchingBytes(getPreviousMaxMatch(number_of_currently_matching_bytes 
- 1), byte_sequence_nodes_[number_of_currently_matching_bytes].byte);
+  return *prev_max_match;
+}
+
+void SplitContent::onTrigger(core::ProcessContext& context, 
core::ProcessSession& session) {
+  gsl_Assert(byte_sequence_matcher_);
+  const auto original = session.get();
+  if (!original) {
+    context.yield();
+    return;
+  }
+
+  const auto ff_content_stream = session.getFlowFileContentStream(*original);
+  if (!ff_content_stream) { throw Exception(PROCESSOR_EXCEPTION, 
fmt::format("Couldn't access the ContentStream of {}", 
original->getUUID().to_string())); }
+
+  Splitter splitter{session, 
original->getAttribute(core::SpecialFlowAttribute::FILENAME), 
*byte_sequence_matcher_, keep_byte_sequence, byte_sequence_location_};
+
+  while (auto latest_byte = ff_content_stream->readByte()) {
+    splitter.digest(*latest_byte);
+    splitter.flushIfBufferTooLarge();
+  }
+
+  session.transfer(original, Original);
+}
+
+REGISTER_RESOURCE(SplitContent, Processor);
+
+}  // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/standard-processors/processors/SplitContent.h 
b/extensions/standard-processors/processors/SplitContent.h
new file mode 100644
index 000000000..9c77fcfae
--- /dev/null
+++ b/extensions/standard-processors/processors/SplitContent.h
@@ -0,0 +1,130 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <memory>
+#include <optional>
+#include <string_view>
+
+#include "FlowFileRecord.h"
+#include "core/ProcessSession.h"
+#include "core/Processor.h"
+#include "core/PropertyDefinition.h"
+#include "core/PropertyDefinitionBuilder.h"
+#include "core/PropertyType.h"
+#include "core/RelationshipDefinition.h"
+#include "utils/Export.h"
+
+namespace org::apache::nifi::minifi::processors {
+
+class SplitContent final : public core::Processor {
+ public:
+  explicit SplitContent(const std::string_view name, const utils::Identifier& 
uuid = {}) : Processor(name, uuid) {}
+
+  using size_type = std::vector<std::byte>::size_type;
+  enum class ByteSequenceFormat { Hexadecimal, Text };
+  enum class ByteSequenceLocation { Trailing, Leading };
+
+  EXTENSIONAPI static constexpr auto Description = "Splits incoming FlowFiles 
by a specified byte sequence";
+
+  EXTENSIONAPI static constexpr auto ByteSequenceFormatProperty =
+      core::PropertyDefinitionBuilder<2>::createProperty("Byte Sequence 
Format")
+          .withDescription("Specifies how the <Byte Sequence> property should 
be interpreted")
+          .isRequired(true)
+          
.withDefaultValue(magic_enum::enum_name(ByteSequenceFormat::Hexadecimal))
+          .withAllowedValues(magic_enum::enum_names<ByteSequenceFormat>())
+          .build();
+
+  EXTENSIONAPI static constexpr auto ByteSequence =
+      core::PropertyDefinitionBuilder<>::createProperty("Byte Sequence")
+          .withDescription("A representation of bytes to look for and upon 
which to split the source file into separate files")
+          .isRequired(true)
+          .withPropertyType(core::StandardPropertyTypes::NON_BLANK_TYPE)
+          .build();
+
+  EXTENSIONAPI static constexpr auto KeepByteSequence =
+      core::PropertyDefinitionBuilder<>::createProperty("Keep Byte Sequence")
+          .withDescription("Determines whether or not the Byte Sequence should 
be included with each Split")
+          .withPropertyType(core::StandardPropertyTypes::BOOLEAN_TYPE)
+          .withDefaultValue("false")
+          .isRequired(true)
+          .build();
+
+  EXTENSIONAPI static constexpr auto ByteSequenceLocationProperty =
+      core::PropertyDefinitionBuilder<2>::createProperty("Byte Sequence 
Location")
+          .withDescription(
+              "If <Keep Byte Sequence> is set to true, specifies whether the 
byte sequence should be added to the end of the first split or the beginning of 
the second; "
+              "if <Keep Byte Sequence> is false, this property is ignored.")
+          
.withDefaultValue(magic_enum::enum_name(ByteSequenceLocation::Trailing))
+          .withAllowedValues(magic_enum::enum_names<ByteSequenceLocation>())
+          .isRequired(true)
+          .build();
+
+  EXTENSIONAPI static constexpr auto Properties = 
std::to_array<core::PropertyReference>({ByteSequenceFormatProperty, 
ByteSequence, KeepByteSequence, ByteSequenceLocationProperty});
+
+  EXTENSIONAPI static constexpr auto Splits = 
core::RelationshipDefinition{"splits", "All Splits will be routed to the splits 
relationship"};
+  EXTENSIONAPI static constexpr auto Original = 
core::RelationshipDefinition{"original", "The original file"};
+  EXTENSIONAPI static constexpr auto Relationships = std::array{Original, 
Splits};
+
+  EXTENSIONAPI static constexpr auto FragmentIdentifierOutputAttribute =
+      core::OutputAttributeDefinition<0>{"fragment.identifier", {}, "All split 
FlowFiles produced from the same parent FlowFile will have the same randomly 
generated UUID added for this attribute"};
+  EXTENSIONAPI static constexpr auto FragmentIndexOutputAttribute =
+      core::OutputAttributeDefinition<0>{"fragment.index", {}, "A one-up 
number that indicates the ordering of the split FlowFiles that were created 
from a single parent FlowFile"};
+  EXTENSIONAPI static constexpr auto FragmentCountOutputAttribute = 
core::OutputAttributeDefinition<0>{"fragment.count", {}, "The number of split 
FlowFiles generated from the parent FlowFile"};
+  EXTENSIONAPI static constexpr auto SegmentOriginalFilenameOutputAttribute = 
core::OutputAttributeDefinition<0>{"segment.original.filename", {}, "The 
filename of the parent FlowFile"};
+  EXTENSIONAPI static constexpr auto OutputAttributes =
+      std::array<core::OutputAttributeReference, 
4>{FragmentIdentifierOutputAttribute, FragmentIndexOutputAttribute, 
FragmentCountOutputAttribute, SegmentOriginalFilenameOutputAttribute};
+
+  EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false;
+  EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false;
+  EXTENSIONAPI static constexpr auto InputRequirement = 
core::annotation::Input::INPUT_REQUIRED;
+  EXTENSIONAPI static constexpr bool IsSingleThreaded = true;
+  ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
+
+  static constexpr size_type BUFFER_TARGET_SIZE = 4096;
+
+  void onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& 
session_factory) override;
+  void onTrigger(core::ProcessContext& context, core::ProcessSession& session) 
override;
+  void initialize() override;
+
+  class ByteSequenceMatcher {
+   public:
+    using size_type = std::vector<std::byte>::size_type;
+    explicit ByteSequenceMatcher(std::vector<std::byte> byte_sequence);
+    size_type getNumberOfMatchingBytes(size_type 
number_of_currently_matching_bytes, std::byte next_byte);
+    size_type getPreviousMaxMatch(size_type 
number_of_currently_matching_bytes);
+    [[nodiscard]] std::span<const std::byte> getByteSequence() const { return 
byte_sequence_; }
+
+   private:
+    struct node {
+      std::byte byte;
+      std::unordered_map<std::byte, size_type> cache;
+      std::optional<size_type> previous_max_match;
+    };
+    std::vector<node> byte_sequence_nodes_;
+    const std::vector<std::byte> byte_sequence_;
+  };
+
+ private:
+  std::optional<ByteSequenceMatcher> byte_sequence_matcher_;
+  bool keep_byte_sequence = false;
+  ByteSequenceLocation byte_sequence_location_ = 
ByteSequenceLocation::Trailing;
+  std::shared_ptr<core::logging::Logger> logger_ = 
core::logging::LoggerFactory<SplitContent>::getLogger(uuid_);
+};
+
+}  // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/standard-processors/tests/unit/SplitContentTests.cpp 
b/extensions/standard-processors/tests/unit/SplitContentTests.cpp
new file mode 100644
index 000000000..32a1856ff
--- /dev/null
+++ b/extensions/standard-processors/tests/unit/SplitContentTests.cpp
@@ -0,0 +1,597 @@
+/**
+*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "FlowFileRecord.h"
+#include "catch2/generators/catch_generators.hpp"
+#include "processors/SplitContent.h"
+#include "unit/Catch.h"
+#include "unit/SingleProcessorTestController.h"
+#include "unit/TestBase.h"
+
+namespace org::apache::nifi::minifi::processors::test {
+
+template<typename... Bytes>
+std::vector<std::byte> createByteVector(Bytes... bytes) {
+  return {static_cast<std::byte>(bytes)...};
+}
+
+TEST_CASE("WithoutByteSequence") {
+  const auto split_content = std::make_shared<SplitContent>("SplitContent");
+  minifi::test::SingleProcessorTestController controller{split_content};
+  split_content->setProperty(SplitContent::ByteSequenceFormatProperty, 
magic_enum::enum_name(SplitContent::ByteSequenceFormat::Text));
+  split_content->setProperty(SplitContent::KeepByteSequence, "true");
+  split_content->setProperty(SplitContent::ByteSequenceLocationProperty, 
magic_enum::enum_name(SplitContent::ByteSequenceLocation::Leading));
+
+  REQUIRE_THROWS_WITH(controller.trigger("rub-a-dub-dub"), "General Operation: 
Required property is empty: Byte Sequence");
+}
+
+TEST_CASE("EmptyFlowFile") {
+  const auto split_content = std::make_shared<SplitContent>("SplitContent");
+  minifi::test::SingleProcessorTestController controller{split_content};
+  split_content->setProperty(SplitContent::ByteSequenceFormatProperty, 
magic_enum::enum_name(SplitContent::ByteSequenceFormat::Text));
+  split_content->setProperty(SplitContent::ByteSequence, "ub");
+  split_content->setProperty(SplitContent::KeepByteSequence, "true");
+  split_content->setProperty(SplitContent::ByteSequenceLocationProperty, 
magic_enum::enum_name(SplitContent::ByteSequenceLocation::Leading));
+
+  auto trigger_results = controller.trigger("");
+  auto original = trigger_results.at(processors::SplitContent::Original);
+  auto splits = trigger_results.at(processors::SplitContent::Splits);
+
+  REQUIRE(original.size() == 1);
+  REQUIRE(splits.empty());
+
+  CHECK(controller.plan->getContent(original[0]).empty());
+}
+
+TEST_CASE("TextFormatLeadingPosition", "[NiFi]") {
+  const auto split_content = std::make_shared<SplitContent>("SplitContent");
+  minifi::test::SingleProcessorTestController controller{split_content};
+  split_content->setProperty(SplitContent::ByteSequenceFormatProperty, 
magic_enum::enum_name(SplitContent::ByteSequenceFormat::Text));
+  split_content->setProperty(SplitContent::ByteSequence, "ub");
+  split_content->setProperty(SplitContent::KeepByteSequence, "true");
+  split_content->setProperty(SplitContent::ByteSequenceLocationProperty, 
magic_enum::enum_name(SplitContent::ByteSequenceLocation::Leading));
+
+  auto trigger_results = controller.trigger("rub-a-dub-dub");
+  auto original = trigger_results.at(processors::SplitContent::Original);
+  auto splits = trigger_results.at(processors::SplitContent::Splits);
+
+  REQUIRE(original.size() == 1);
+  REQUIRE(splits.size() == 4);
+
+  CHECK(controller.plan->getContent(original[0]) == "rub-a-dub-dub");
+
+  CHECK(controller.plan->getContent(splits[0]) == "r");
+  CHECK(controller.plan->getContent(splits[1]) == "ub-a-d");
+  CHECK(controller.plan->getContent(splits[2]) == "ub-d");
+  CHECK(controller.plan->getContent(splits[3]) == "ub");
+}
+
+TEST_CASE("TextFormatTrailingPosition", "[NiFi]") {
+  const auto split_content = std::make_shared<SplitContent>("SplitContent");
+  minifi::test::SingleProcessorTestController controller{split_content};
+
+  split_content->setProperty(SplitContent::ByteSequenceFormatProperty, 
magic_enum::enum_name(SplitContent::ByteSequenceFormat::Text));
+  split_content->setProperty(SplitContent::ByteSequence, "ub");
+  split_content->setProperty(SplitContent::KeepByteSequence, "true");
+  split_content->setProperty(SplitContent::ByteSequenceLocationProperty, 
magic_enum::enum_name(SplitContent::ByteSequenceLocation::Trailing));
+
+  auto trigger_results = controller.trigger("rub-a-dub-dub");
+
+  auto original = trigger_results.at(processors::SplitContent::Original);
+  auto splits = trigger_results.at(processors::SplitContent::Splits);
+
+  REQUIRE(original.size() == 1);
+  REQUIRE(splits.size() == 3);
+
+  CHECK(controller.plan->getContent(original[0]) == "rub-a-dub-dub");
+
+  CHECK(controller.plan->getContent(splits[0]) == "rub");
+  CHECK(controller.plan->getContent(splits[1]) == "-a-dub");
+  CHECK(controller.plan->getContent(splits[2]) == "-dub");
+}
+
+TEST_CASE("TextFormatSplits", "[NiFi]") {
+  const auto split_content = std::make_shared<SplitContent>("SplitContent");
+  minifi::test::SingleProcessorTestController controller{split_content};
+
+  split_content->setProperty(SplitContent::ByteSequenceFormatProperty, 
magic_enum::enum_name(SplitContent::ByteSequenceFormat::Text));
+  split_content->setProperty(SplitContent::ByteSequence, "test");
+
+  constexpr std::string_view input_1 = "This is a test. This is another test. 
And this is yet another test. Finally this is the last Test.";
+  constexpr std::string_view input_2 = "This is a test. This is another test. 
And this is yet another test. Finally this is the last test";
+
+  const auto [keep_byte_sequence, byte_sequence_location, input, 
expected_splits] = GENERATE_REF(
+      std::make_tuple("true", "Leading", input_1, 
std::vector<std::string_view>{"This is a ", "test. This is another ", "test. 
And this is yet another ", "test. Finally this is the last Test."}),
+      std::make_tuple("false", "Leading", input_1, 
std::vector<std::string_view>{"This is a ", ". This is another ", ". And this 
is yet another ", ". Finally this is the last Test."}),
+      std::make_tuple("true", "Trailing", input_1, 
std::vector<std::string_view>{"This is a test", ". This is another test", ". 
And this is yet another test", ". Finally this is the last Test."}),
+      std::make_tuple("false", "Trailing", input_1, 
std::vector<std::string_view>{"This is a ", ". This is another ", ". And this 
is yet another ", ". Finally this is the last Test."}),
+      std::make_tuple("true", "Leading", input_2, 
std::vector<std::string_view>{"This is a ", "test. This is another ", "test. 
And this is yet another ", "test. Finally this is the last ", "test"}),
+      std::make_tuple("true", "Trailing", input_2, 
std::vector<std::string_view>{"This is a test", ". This is another test", ". 
And this is yet another test", ". Finally this is the last test"}));
+
+  split_content->setProperty(SplitContent::KeepByteSequence, 
keep_byte_sequence);
+  split_content->setProperty(SplitContent::ByteSequenceLocationProperty, 
byte_sequence_location);
+
+  auto trigger_results = controller.trigger(input);
+
+  auto original = trigger_results.at(processors::SplitContent::Original);
+  auto splits = trigger_results.at(processors::SplitContent::Splits);
+
+  REQUIRE(original.size() == 1);
+  REQUIRE(splits.size() == expected_splits.size());
+
+  CHECK(controller.plan->getContent(original[0]) == input);
+
+  for (size_t i = 0; i < expected_splits.size(); ++i) {
+    auto split_i = controller.plan->getContent(splits[i]);
+    auto expected_i = expected_splits[i];
+    CHECK(split_i == expected_i);
+  }
+}
+
+TEST_CASE("SmallSplits", "[NiFi]") {
+  const auto split_content = std::make_shared<SplitContent>("SplitContent");
+  minifi::test::SingleProcessorTestController controller{split_content};
+
+  split_content->setProperty(SplitContent::KeepByteSequence, "false");
+  split_content->setProperty(SplitContent::ByteSequence, "FFFF");
+
+  const auto input_data = createByteVector(1, 2, 3, 4, 5, 0xFF, 0xFF, 0xFF, 5, 
4, 3, 2, 1);
+  std::string_view input(reinterpret_cast<const char*>(input_data.data()), 
input_data.size());
+
+  auto trigger_results = controller.trigger(input);
+
+  auto original = trigger_results.at(processors::SplitContent::Original);
+  auto splits = trigger_results.at(processors::SplitContent::Splits);
+
+  REQUIRE(original.size() == 1);
+  REQUIRE(splits.size() == 2);
+
+  const auto expected_split_1 = createByteVector(1, 2, 3, 4, 5);
+  const auto expected_split_2 = createByteVector(0xFF, 5, 4, 3, 2, 1);
+
+  CHECK(controller.plan->getContentAsBytes(*splits[0]) == expected_split_1);
+  CHECK(controller.plan->getContentAsBytes(*splits[1]) == expected_split_2);
+}
+
+TEST_CASE("WithSingleByteSplit", "[NiFi]") {
+  const auto split_content = std::make_shared<SplitContent>("SplitContent");
+  minifi::test::SingleProcessorTestController controller{split_content};
+
+  split_content->setProperty(SplitContent::KeepByteSequence, "false");
+  split_content->setProperty(SplitContent::ByteSequence, "FF");
+
+  const auto input_data = createByteVector(1, 2, 3, 4, 5, 0xFF, 5, 4, 3, 2, 1);
+  std::string_view input(reinterpret_cast<const char*>(input_data.data()), 
input_data.size());
+
+  auto trigger_results = controller.trigger(input);
+
+  const auto original = trigger_results.at(processors::SplitContent::Original);
+  const auto splits = trigger_results.at(processors::SplitContent::Splits);
+
+  REQUIRE(original.size() == 1);
+  REQUIRE(splits.size() == 2);
+
+  const auto expected_split_1 = createByteVector(1, 2, 3, 4, 5);
+  const auto expected_split_2 = createByteVector(5, 4, 3, 2, 1);
+
+  CHECK(controller.plan->getContentAsBytes(*splits[0]) == expected_split_1);
+  CHECK(controller.plan->getContentAsBytes(*splits[1]) == expected_split_2);
+}
+
+TEST_CASE("WithLargerSplit", "[NiFi]") {
+  const auto split_content = std::make_shared<SplitContent>("SplitContent");
+  minifi::test::SingleProcessorTestController controller{split_content};
+
+  split_content->setProperty(SplitContent::KeepByteSequence, "false");
+  split_content->setProperty(SplitContent::ByteSequence, "05050505");
+
+  const auto input_data = createByteVector(1, 2, 3, 4, 5, 5, 5, 5, 5, 5, 4, 3, 
2, 1);
+  std::string_view input(reinterpret_cast<const char*>(input_data.data()), 
input_data.size());
+
+  auto trigger_results = controller.trigger(input);
+
+  const auto original = trigger_results.at(processors::SplitContent::Original);
+  const auto splits = trigger_results.at(processors::SplitContent::Splits);
+
+  REQUIRE(original.size() == 1);
+  REQUIRE(splits.size() == 2);
+
+  const auto expected_split_1 = createByteVector(1, 2, 3, 4);
+  const auto expected_split_2 = createByteVector(5, 5, 4, 3, 2, 1);
+
+  CHECK(controller.plan->getContentAsBytes(*splits[0]) == expected_split_1);
+  CHECK(controller.plan->getContentAsBytes(*splits[1]) == expected_split_2);
+}
+
+TEST_CASE("KeepingSequence", "[NiFi]") {
+  const auto split_content = std::make_shared<SplitContent>("SplitContent");
+  minifi::test::SingleProcessorTestController controller{split_content};
+
+  split_content->setProperty(SplitContent::KeepByteSequence, "true");
+  split_content->setProperty(SplitContent::ByteSequence, "05050505");
+
+  const auto input_data = createByteVector(1, 2, 3, 4, 5, 5, 5, 5, 5, 5, 4, 3, 
2, 1);
+  std::string_view input(reinterpret_cast<const char*>(input_data.data()), 
input_data.size());
+
+  auto trigger_results = controller.trigger(input);
+
+  const auto original = trigger_results.at(processors::SplitContent::Original);
+  const auto splits = trigger_results.at(processors::SplitContent::Splits);
+
+  REQUIRE(original.size() == 1);
+  REQUIRE(splits.size() == 2);
+
+  const auto expected_split_1 = createByteVector(1, 2, 3, 4, 5, 5, 5, 5);
+  const auto expected_split_2 = createByteVector(5, 5, 4, 3, 2, 1);
+
+  CHECK(controller.plan->getContentAsBytes(*splits[0]) == expected_split_1);
+  CHECK(controller.plan->getContentAsBytes(*splits[1]) == expected_split_2);
+}
+
+TEST_CASE("EndsWithSequence", "[NiFi]") {
+  const auto split_content = std::make_shared<SplitContent>("SplitContent");
+  minifi::test::SingleProcessorTestController controller{split_content};
+
+  split_content->setProperty(SplitContent::KeepByteSequence, "false");
+  split_content->setProperty(SplitContent::ByteSequence, "05050505");
+
+  const auto input_data = createByteVector(1, 2, 3, 4, 5, 5, 5, 5);
+  std::string_view input(reinterpret_cast<const char*>(input_data.data()), 
input_data.size());
+
+  auto trigger_results = controller.trigger(input);
+
+  auto original = trigger_results.at(processors::SplitContent::Original);
+  auto splits = trigger_results.at(processors::SplitContent::Splits);
+
+  REQUIRE(original.size() == 1);
+  REQUIRE(splits.size() == 1);
+
+  auto expected_split = createByteVector(1, 2, 3, 4);
+
+  CHECK(controller.plan->getContentAsBytes(*splits[0]) == expected_split);
+}
+
+TEST_CASE("EndsWithSequenceAndKeepSequence", "[NiFi]") {
+  const auto split_content = std::make_shared<SplitContent>("SplitContent");
+  minifi::test::SingleProcessorTestController controller{split_content};
+
+  split_content->setProperty(SplitContent::KeepByteSequence, "true");
+  split_content->setProperty(SplitContent::ByteSequence, "05050505");
+
+  const auto input_data = createByteVector(1, 2, 3, 4, 5, 5, 5, 5);
+  std::string_view input(reinterpret_cast<const char*>(input_data.data()), 
input_data.size());
+
+  auto trigger_results = controller.trigger(input);
+
+  auto original = trigger_results.at(processors::SplitContent::Original);
+  auto splits = trigger_results.at(processors::SplitContent::Splits);
+
+  REQUIRE(original.size() == 1);
+  REQUIRE(splits.size() == 1);
+
+  auto expected_split_1 = createByteVector(1, 2, 3, 4, 5, 5, 5, 5);
+
+  CHECK(controller.plan->getContentAsBytes(*splits[0]) == expected_split_1);
+}
+
+TEST_CASE("StartsWithSequence", "[NiFi]") {
+  const auto split_content = std::make_shared<SplitContent>("SplitContent");
+  minifi::test::SingleProcessorTestController controller{split_content};
+
+  split_content->setProperty(SplitContent::KeepByteSequence, "false");
+  split_content->setProperty(SplitContent::ByteSequence, "05050505");
+
+  const auto input_data = createByteVector(5, 5, 5, 5, 1, 2, 3, 4);
+  std::string_view input(reinterpret_cast<const char*>(input_data.data()), 
input_data.size());
+
+  auto trigger_results = controller.trigger(input);
+
+  auto original = trigger_results.at(processors::SplitContent::Original);
+  auto splits = trigger_results.at(processors::SplitContent::Splits);
+
+  REQUIRE(original.size() == 1);
+  REQUIRE(splits.size() == 1);
+
+  auto expected_split = createByteVector(1, 2, 3, 4);
+
+  CHECK(controller.plan->getContentAsBytes(*splits[0]) == expected_split);
+}
+
+TEST_CASE("StartsWithSequenceAndKeepTrailingSequence", "[NiFi]") {
+  const auto split_content = std::make_shared<SplitContent>("SplitContent");
+  minifi::test::SingleProcessorTestController controller{split_content};
+
+  split_content->setProperty(SplitContent::KeepByteSequence, "true");
+  split_content->setProperty(SplitContent::ByteSequence, "05050505");
+  split_content->setProperty(SplitContent::ByteSequenceLocationProperty, 
"Trailing");
+
+  const auto input_data = createByteVector(5, 5, 5, 5, 1, 2, 3, 4);
+  std::string_view input(reinterpret_cast<const char*>(input_data.data()), 
input_data.size());
+
+  auto trigger_results = controller.trigger(input);
+
+  auto original = trigger_results.at(processors::SplitContent::Original);
+  auto splits = trigger_results.at(processors::SplitContent::Splits);
+
+  REQUIRE(original.size() == 1);
+  REQUIRE(splits.size() == 2);
+
+  auto expected_split_1 = createByteVector(5, 5, 5, 5);
+  auto expected_split_2 = createByteVector(1, 2, 3, 4);
+
+  CHECK(controller.plan->getContentAsBytes(*splits[0]) == expected_split_1);
+  CHECK(controller.plan->getContentAsBytes(*splits[1]) == expected_split_2);
+}
+
+TEST_CASE("StartsWithSequenceAndKeepLeadingSequence") {
+  const auto split_content = std::make_shared<SplitContent>("SplitContent");
+  minifi::test::SingleProcessorTestController controller{split_content};
+
+  split_content->setProperty(SplitContent::KeepByteSequence, "true");
+  split_content->setProperty(SplitContent::ByteSequence, "05050505");
+  split_content->setProperty(SplitContent::ByteSequenceLocationProperty, 
"Leading");
+
+  const auto input_data = createByteVector(5, 5, 5, 5, 1, 2, 3, 4);
+  std::string_view input(reinterpret_cast<const char*>(input_data.data()), 
input_data.size());
+
+  auto trigger_results = controller.trigger(input);
+
+  auto original = trigger_results.at(processors::SplitContent::Original);
+  auto splits = trigger_results.at(processors::SplitContent::Splits);
+
+  REQUIRE(original.size() == 1);
+  REQUIRE(splits.size() == 1);
+
+  CHECK(controller.plan->getContentAsBytes(*splits[0]) == input_data);
+}
+
+TEST_CASE("StartsWithDoubleSequenceAndKeepLeadingSequence") {
+  const auto split_content = std::make_shared<SplitContent>("SplitContent");
+  minifi::test::SingleProcessorTestController controller{split_content};
+
+  split_content->setProperty(SplitContent::KeepByteSequence, "true");
+  split_content->setProperty(SplitContent::ByteSequence, "05050505");
+  split_content->setProperty(SplitContent::ByteSequenceLocationProperty, 
"Leading");
+
+  const auto input_data = createByteVector(5, 5, 5, 5, 5, 5, 5, 5, 1, 2, 3, 4);
+  std::string_view input(reinterpret_cast<const char*>(input_data.data()), 
input_data.size());
+
+  auto trigger_results = controller.trigger(input);
+
+  auto original = trigger_results.at(processors::SplitContent::Original);
+  auto splits = trigger_results.at(processors::SplitContent::Splits);
+
+  auto expected_split_1 = createByteVector(5, 5, 5, 5);
+  auto expected_split_2 = createByteVector(5, 5, 5, 5, 1, 2, 3, 4);
+
+  REQUIRE(original.size() == 1);
+  REQUIRE(splits.size() == 2);
+
+  CHECK(controller.plan->getContentAsBytes(*splits[0]) == expected_split_1);
+  CHECK(controller.plan->getContentAsBytes(*splits[1]) == expected_split_2);
+}
+
+TEST_CASE("NoSplitterInString", "[NiFi]") {
+  const auto split_content = std::make_shared<SplitContent>("SplitContent");
+  minifi::test::SingleProcessorTestController controller{split_content};
+
+  split_content->setProperty(SplitContent::ByteSequenceFormatProperty, 
magic_enum::enum_name(SplitContent::ByteSequenceFormat::Text));
+  split_content->setProperty(SplitContent::ByteSequence, ",");
+  split_content->setProperty(SplitContent::KeepByteSequence, "false");
+  split_content->setProperty(SplitContent::ByteSequenceLocationProperty, 
magic_enum::enum_name(SplitContent::ByteSequenceLocation::Trailing));
+
+  constexpr std::string_view input = "UVAT";
+  auto trigger_results = controller.trigger(input);
+
+  auto original = trigger_results.at(processors::SplitContent::Original);
+  auto splits = trigger_results.at(processors::SplitContent::Splits);
+
+  REQUIRE(splits.size() == 1);
+  REQUIRE(original.size() == 1);
+
+  CHECK(splits[0]->getAttribute("fragment.identifier").has_value());
+  CHECK(splits[0]->getAttribute("segment.original.filename").has_value());
+
+  CHECK(splits[0]->getAttribute("fragment.count").value() == "1");
+  CHECK(splits[0]->getAttribute("fragment.index").value() == "1");
+
+  CHECK(controller.plan->getContent(splits[0]) == input);
+  CHECK(controller.plan->getContent(original[0]) == input);
+}
+
+TEST_CASE("ByteSequenceAtBufferTargetSize") {
+  const auto split_content = std::make_shared<SplitContent>("SplitContent");
+  minifi::test::SingleProcessorTestController controller{split_content};
+
+  auto x = SplitContent::BUFFER_TARGET_SIZE-10;
+
+  auto [pre_fix_size, separator_size, post_fix_size] = GENERATE_COPY(
+    std::make_tuple(x, x, x),
+    std::make_tuple(10, 10, x),
+    std::make_tuple(10, x, 10),
+    std::make_tuple(10, 10, x),
+    std::make_tuple(10, x, x),
+    std::make_tuple(x, 10, x),
+    std::make_tuple(x, x, 10),
+    std::make_tuple(2*x, x, 10));
+
+
+  const std::string pre_fix = utils::string::repeat("a", pre_fix_size);
+  const std::string separator = utils::string::repeat("b", separator_size);
+  const std::string post_fix = utils::string::repeat("c", post_fix_size);
+
+  split_content->setProperty(SplitContent::ByteSequenceFormatProperty, 
magic_enum::enum_name(SplitContent::ByteSequenceFormat::Text));
+  split_content->setProperty(SplitContent::ByteSequence, separator);
+  split_content->setProperty(SplitContent::KeepByteSequence, "true");
+  split_content->setProperty(SplitContent::ByteSequenceLocationProperty, 
"Trailing");
+
+  auto input = pre_fix + separator + post_fix;
+  auto trigger_results = controller.trigger(input);
+
+  auto original = trigger_results.at(processors::SplitContent::Original);
+  auto splits = trigger_results.at(processors::SplitContent::Splits);
+
+  REQUIRE(splits.size() == 2);
+  REQUIRE(original.size() == 1);
+
+  CHECK(controller.plan->getContent(splits[0]) == std::string(pre_fix) + 
std::string(separator));
+  CHECK(controller.plan->getContent(splits[1]) == post_fix);
+
+  CHECK(controller.plan->getContent(original[0]) == input);
+}
+
+TEST_CASE("TrickyWithLeading", "[NiFi]") {
+  const auto split_content = std::make_shared<SplitContent>("SplitContent");
+  minifi::test::SingleProcessorTestController controller{split_content};
+  split_content->setProperty(SplitContent::ByteSequenceFormatProperty, 
magic_enum::enum_name(SplitContent::ByteSequenceFormat::Text));
+  split_content->setProperty(SplitContent::ByteSequence, "aab");
+  split_content->setProperty(SplitContent::KeepByteSequence, "true");
+  split_content->setProperty(SplitContent::ByteSequenceLocationProperty, 
magic_enum::enum_name(SplitContent::ByteSequenceLocation::Leading));
+
+  auto trigger_results = controller.trigger("aaabc");
+  auto original = trigger_results.at(processors::SplitContent::Original);
+  auto splits = trigger_results.at(processors::SplitContent::Splits);
+
+  REQUIRE(original.size() == 1);
+  REQUIRE(splits.size() == 2);
+
+  CHECK(controller.plan->getContent(original[0]) == "aaabc");
+
+  CHECK(controller.plan->getContent(splits[0]) == "a");
+  CHECK(controller.plan->getContent(splits[1]) == "aabc");
+}
+
+TEST_CASE("TrickyWithTrailing", "[NiFi]") {
+  const auto split_content = std::make_shared<SplitContent>("SplitContent");
+  minifi::test::SingleProcessorTestController controller{split_content};
+  split_content->setProperty(SplitContent::ByteSequenceFormatProperty, 
magic_enum::enum_name(SplitContent::ByteSequenceFormat::Text));
+  split_content->setProperty(SplitContent::ByteSequence, "aab");
+  split_content->setProperty(SplitContent::KeepByteSequence, "true");
+  split_content->setProperty(SplitContent::ByteSequenceLocationProperty, 
magic_enum::enum_name(SplitContent::ByteSequenceLocation::Trailing));
+
+  auto trigger_results = controller.trigger("aaabc");
+  auto original = trigger_results.at(processors::SplitContent::Original);
+  auto splits = trigger_results.at(processors::SplitContent::Splits);
+
+  REQUIRE(original.size() == 1);
+  REQUIRE(splits.size() == 2);
+
+  CHECK(controller.plan->getContent(original[0]) == "aaabc");
+
+  CHECK(controller.plan->getContent(splits[0]) == "aaab");
+  CHECK(controller.plan->getContent(splits[1]) == "c");
+}
+
+TEST_CASE("TrickierWithLeading", "[NiFi]") {
+  const auto split_content = std::make_shared<SplitContent>("SplitContent");
+  minifi::test::SingleProcessorTestController controller{split_content};
+  split_content->setProperty(SplitContent::ByteSequenceFormatProperty, 
magic_enum::enum_name(SplitContent::ByteSequenceFormat::Text));
+  split_content->setProperty(SplitContent::ByteSequence, "abcd");
+  split_content->setProperty(SplitContent::KeepByteSequence, "true");
+  split_content->setProperty(SplitContent::ByteSequenceLocationProperty, 
magic_enum::enum_name(SplitContent::ByteSequenceLocation::Leading));
+
+  auto trigger_results = controller.trigger("abcabcabcdabc");
+  auto original = trigger_results.at(processors::SplitContent::Original);
+  auto splits = trigger_results.at(processors::SplitContent::Splits);
+
+  REQUIRE(original.size() == 1);
+  REQUIRE(splits.size() == 2);
+
+  CHECK(controller.plan->getContent(original[0]) == "abcabcabcdabc");
+
+  CHECK(controller.plan->getContent(splits[0]) == "abcabc");
+  CHECK(controller.plan->getContent(splits[1]) == "abcdabc");
+}
+
+TEST_CASE("TrickierWithTrailing", "[NiFi]") {
+  const auto split_content = std::make_shared<SplitContent>("SplitContent");
+  minifi::test::SingleProcessorTestController controller{split_content};
+  split_content->setProperty(SplitContent::ByteSequenceFormatProperty, 
magic_enum::enum_name(SplitContent::ByteSequenceFormat::Text));
+  split_content->setProperty(SplitContent::ByteSequence, "abcd");
+  split_content->setProperty(SplitContent::KeepByteSequence, "true");
+  split_content->setProperty(SplitContent::ByteSequenceLocationProperty, 
magic_enum::enum_name(SplitContent::ByteSequenceLocation::Trailing));
+
+  auto trigger_results = controller.trigger("abcabcabcdabc");
+  auto original = trigger_results.at(processors::SplitContent::Original);
+  auto splits = trigger_results.at(processors::SplitContent::Splits);
+
+  REQUIRE(original.size() == 1);
+  REQUIRE(splits.size() == 2);
+
+  CHECK(controller.plan->getContent(original[0]) == "abcabcabcdabc");
+
+  CHECK(controller.plan->getContent(splits[0]) == "abcabcabcd");
+  CHECK(controller.plan->getContent(splits[1]) == "abc");
+}
+
+TEST_CASE("OnlyByteSequencesNoKeep", "[NiFi]") {
+  const auto split_content = std::make_shared<SplitContent>("SplitContent");
+  minifi::test::SingleProcessorTestController controller{split_content};
+  split_content->setProperty(SplitContent::ByteSequenceFormatProperty, 
magic_enum::enum_name(SplitContent::ByteSequenceFormat::Text));
+  split_content->setProperty(SplitContent::ByteSequence, "ab");
+  split_content->setProperty(SplitContent::KeepByteSequence, "false");
+  split_content->setProperty(SplitContent::ByteSequenceLocationProperty, 
magic_enum::enum_name(SplitContent::ByteSequenceLocation::Trailing));
+
+  auto trigger_results = controller.trigger("ababab");
+  auto original = trigger_results.at(processors::SplitContent::Original);
+  auto splits = trigger_results.at(processors::SplitContent::Splits);
+
+  REQUIRE(original.size() == 1);
+  REQUIRE(splits.empty());
+}
+
+TEST_CASE("OnlyByteSequencesTrailing", "[NiFi]") {
+  const auto split_content = std::make_shared<SplitContent>("SplitContent");
+  minifi::test::SingleProcessorTestController controller{split_content};
+  split_content->setProperty(SplitContent::ByteSequenceFormatProperty, 
magic_enum::enum_name(SplitContent::ByteSequenceFormat::Text));
+  split_content->setProperty(SplitContent::ByteSequence, "ab");
+  split_content->setProperty(SplitContent::KeepByteSequence, "true");
+  split_content->setProperty(SplitContent::ByteSequenceLocationProperty, 
magic_enum::enum_name(SplitContent::ByteSequenceLocation::Trailing));
+
+  auto trigger_results = controller.trigger("ababab");
+  auto original = trigger_results.at(processors::SplitContent::Original);
+  auto splits = trigger_results.at(processors::SplitContent::Splits);
+
+  REQUIRE(original.size() == 1);
+  REQUIRE(splits.size() == 3);
+
+  CHECK(controller.plan->getContent(splits[0]) == "ab");
+  CHECK(controller.plan->getContent(splits[1]) == "ab");
+  CHECK(controller.plan->getContent(splits[2]) == "ab");
+}
+
+TEST_CASE("OnlyByteSequencesLeading", "[NiFi]") {
+  const auto split_content = std::make_shared<SplitContent>("SplitContent");
+  minifi::test::SingleProcessorTestController controller{split_content};
+  split_content->setProperty(SplitContent::ByteSequenceFormatProperty, 
magic_enum::enum_name(SplitContent::ByteSequenceFormat::Text));
+  split_content->setProperty(SplitContent::ByteSequence, "ab");
+  split_content->setProperty(SplitContent::KeepByteSequence, "true");
+  split_content->setProperty(SplitContent::ByteSequenceLocationProperty, 
magic_enum::enum_name(SplitContent::ByteSequenceLocation::Leading));
+
+  auto trigger_results = controller.trigger("ababab");
+  auto original = trigger_results.at(processors::SplitContent::Original);
+  auto splits = trigger_results.at(processors::SplitContent::Splits);
+
+  REQUIRE(original.size() == 1);
+  REQUIRE(splits.size() == 3);
+
+  CHECK(controller.plan->getContent(splits[0]) == "ab");
+  CHECK(controller.plan->getContent(splits[1]) == "ab");
+  CHECK(controller.plan->getContent(splits[2]) == "ab");
+}
+}  // namespace org::apache::nifi::minifi::processors::test
diff --git a/libminifi/include/io/InputStream.h 
b/libminifi/include/io/InputStream.h
index f18c5655e..ad24d6908 100644
--- a/libminifi/include/io/InputStream.h
+++ b/libminifi/include/io/InputStream.h
@@ -81,6 +81,14 @@ class InputStream : public virtual Stream {
 
     return sizeof(Integral);
   }
+
+  std::optional<std::byte> readByte() {
+    std::array<std::byte, 1> buf{};
+    if (read(buf) != 1) {
+      return std::nullopt;
+    }
+    return buf[0];
+  }
 };
 
 }  // namespace org::apache::nifi::minifi::io
diff --git a/libminifi/src/core/ProcessSession.cpp 
b/libminifi/src/core/ProcessSession.cpp
index 1d62e3d3a..79c02ff63 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -333,6 +333,7 @@ void ProcessSession::appendBuffer(const 
std::shared_ptr<core::FlowFile>& flow_fi
   appendBuffer(flow_file, as_bytes(buffer));
 }
 void ProcessSession::appendBuffer(const std::shared_ptr<core::FlowFile>& 
flow_file, std::span<const std::byte> buffer) {
+  if (buffer.empty()) { return; }
   append(flow_file, [buffer](const std::shared_ptr<io::OutputStream>& 
output_stream) {
     const auto write_status = output_stream->write(buffer);
     return io::isError(write_status) ? -1 : gsl::narrow<int64_t>(write_status);
diff --git a/libminifi/test/libtest/unit/TestBase.cpp 
b/libminifi/test/libtest/unit/TestBase.cpp
index b83c1509c..a580859f0 100644
--- a/libminifi/test/libtest/unit/TestBase.cpp
+++ b/libminifi/test/libtest/unit/TestBase.cpp
@@ -672,12 +672,20 @@ void TestPlan::validateAnnotations() const {
   }
 }
 
+std::vector<std::byte> TestPlan::getContentAsBytes(const core::FlowFile& 
flow_file) const {
+  const auto content_claim = flow_file.getResourceClaim();
+  const auto content_stream = content_repo_->read(*content_claim);
+  const auto output_stream = std::make_shared<minifi::io::BufferStream>();
+  minifi::internal::pipe(*content_stream, *output_stream);
+  return ranges::to<std::vector>(output_stream->getBuffer());
+}
+
 std::string TestPlan::getContent(const minifi::core::FlowFile& file) const {
-  auto content_claim = file.getResourceClaim();
-  auto content_stream = content_repo_->read(*content_claim);
-  auto output_stream = std::make_shared<minifi::io::BufferStream>();
-  minifi::InputStreamPipe{*output_stream}(content_stream);
-  return utils::span_to<std::string>(minifi::utils::as_span<const 
char>(output_stream->getBuffer()).subspan(file.getOffset(), file.getSize()));
+  const auto content_claim = file.getResourceClaim();
+  const auto content_stream = content_repo_->read(*content_claim);
+  const auto output_stream = std::make_shared<minifi::io::BufferStream>();
+  minifi::internal::pipe(*content_stream, *output_stream);
+  return utils::span_to<std::string>(minifi::utils::as_span<const 
char>(output_stream->getBuffer()));
 }
 
 TestController::TestController()
diff --git a/libminifi/test/libtest/unit/TestBase.h 
b/libminifi/test/libtest/unit/TestBase.h
index 05826b2b6..94c90e9be 100644
--- a/libminifi/test/libtest/unit/TestBase.h
+++ b/libminifi/test/libtest/unit/TestBase.h
@@ -285,6 +285,7 @@ class TestPlan {
     return state_storage_;
   }
 
+  std::vector<std::byte> getContentAsBytes(const core::FlowFile& flow_file) 
const;
   std::string getContent(const std::shared_ptr<const minifi::core::FlowFile>& 
file) const { return getContent(*file); }
   std::string getContent(const minifi::core::FlowFile& file) const;
 

Reply via email to