This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 107598b95fa507a665e3bd7725df8f99f2314c02
Author: Gabor Gyimesi <[email protected]>
AuthorDate: Tue Sep 16 14:40:22 2025 +0200

    MINIFICPP-2577 Create SplitJson processor
    
    Closes #1980
    
    Signed-off-by: Marton Szasz <[email protected]>
---
 PROCESSORS.md                                      |  34 +++++
 .../test/integration/features/split_json.feature   |  42 ++++++
 .../integration/minifi/processors/SplitJson.py     |  26 ++++
 .../standard-processors/processors/SplitJson.cpp   | 133 ++++++++++++++++++
 .../standard-processors/processors/SplitJson.h     | 125 +++++++++++++++++
 .../tests/unit/SplitJsonTests.cpp                  | 150 +++++++++++++++++++++
 6 files changed, 510 insertions(+)

diff --git a/PROCESSORS.md b/PROCESSORS.md
index a5ce38b42..2aecfe389 100644
--- a/PROCESSORS.md
+++ b/PROCESSORS.md
@@ -101,6 +101,7 @@ limitations under the License.
 - [RouteText](#RouteText)
 - [SegmentContent](#SegmentContent)
 - [SplitContent](#SplitContent)
+- [SplitJson](#SplitJson)
 - [SplitRecord](#SplitRecord)
 - [SplitText](#SplitText)
 - [TailEventLog](#TailEventLog)
@@ -3132,6 +3133,39 @@ In the list below, the names of required properties 
appear in bold. Any other pr
 | segment.original.filename |              | The filename of the parent 
FlowFile                                                                        
                    |
 
 
+## SplitJson
+
+### Description
+
+Splits a JSON File into multiple, separate FlowFiles for an array element 
specified by a JsonPath expression. Each generated FlowFile is comprised of an 
element of the specified array and transferred to relationship 'split,' with 
the original file transferred to the 'original' relationship. If the specified 
JsonPath is not found or does not evaluate to an array element, the original 
file is routed to 'failure' and no files are generated.
+
+### Properties
+
+In the list below, the names of required properties appear in bold. Any other 
properties (not in bold) are considered optional. The table also indicates any 
default values, and whether a property supports the NiFi Expression Language.
+
+| Name                          | Default Value | Allowable Values             
      | Description                                                             
                    |
+|-------------------------------|---------------|------------------------------------|---------------------------------------------------------------------------------------------|
+| **JsonPath Expression**       |               |                              
      | A JsonPath expression that indicates the array element to split into 
JSON/scalar fragments. |
+| **Null Value Representation** | empty string  | empty string<br/>the string 
'null' | Indicates the desired representation of JSON Path expressions 
resulting in a null value.    |
+
+### Relationships
+
+| Name     | Description                                                       
                                                                                
                       |
+|----------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| failure  | If a FlowFile fails processing for any reason (for example, the 
FlowFile is not valid JSON or the specified path does not exist), it will be 
routed to this relationship |
+| original | The original FlowFile that was split into segments. If the 
FlowFile fails processing, nothing will be sent to this relationship            
                              |
+| split    | All segments of the original FlowFile will be routed to this 
relationship                                                                    
                            |
+
+### Output Attributes
+
+| Attribute                 | Relationship    | Description                    
                                                                                
                |
+|---------------------------|-----------------|--------------------------------------------------------------------------------------------------------------------------------|
+| fragment.identifier       | split, original | All split FlowFiles produced 
from the same parent FlowFile will have the same randomly generated UUID added 
for this attribute |
+| fragment.index            | split           | A one-up number that indicates 
the ordering of the split FlowFiles that were created from a single parent 
FlowFile             |
+| fragment.count            | split, original | The number of split FlowFiles 
generated from the parent FlowFile                                              
                 |
+| segment.original.filename | split           | The filename of the parent 
FlowFile                                                                        
                    |
+
+
 ## SplitRecord
 
 ### Description
diff --git a/docker/test/integration/features/split_json.feature 
b/docker/test/integration/features/split_json.feature
new file mode 100644
index 000000000..ef7cbf67c
--- /dev/null
+++ b/docker/test/integration/features/split_json.feature
@@ -0,0 +1,42 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+@CORE
+Feature: Splitting JSON content using SplitJson processor
+  Background:
+    Given the content of "/tmp/output" is monitored
+
+  Scenario: Split multiple query results to separate flow files
+    Given a GetFile processor with the "Input Directory" property set to 
"/tmp/input"
+    And a file with filename "test_file.json" and content "{"company": 
{"departments": [{"name": "Engineering", "employees": ["Alice", "Bob"]}, 
{"name": "Marketing", "employees": "Dave"}, {"name": "Sales", "employees": 
null}]}}" is present in "/tmp/input"
+    And a SplitJson processor with the "JsonPath Expression" property set to 
"$.company.departments[*].employees"
+    And the "Null Value Representation" property of the SplitJson processor is 
set to "the string 'null'"
+    And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And a LogAttribute processor with the "FlowFiles To Log" property set to 
"0"
+    And the "success" relationship of the GetFile processor is connected to 
the SplitJson
+    And the "split" relationship of the SplitJson processor is connected to 
the PutFile
+    And the "original" relationship of the SplitJson processor is connected to 
the PutFile
+    And the "success" relationship of the PutFile processor is connected to 
the LogAttribute
+    When the MiNiFi instance starts up
+    Then at least one flowfile with the content "["Alice","Bob"]" is placed in 
the monitored directory in less than 10 seconds
+    And at least one flowfile with the content "Dave" is placed in the 
monitored directory in less than 0 seconds
+    And at least one flowfile with the content "null" is placed in the 
monitored directory in less than 0 seconds
+    And at least one flowfile with the content "{"company": {"departments": 
[{"name": "Engineering", "employees": ["Alice", "Bob"]}, {"name": "Marketing", 
"employees": "Dave"}, {"name": "Sales", "employees": null}]}}" is placed in the 
monitored directory in less than 0 seconds
+    And the Minifi logs contain the following message: "key:fragment.count 
value:3" in less than 0 seconds
+    And the Minifi logs contain the following message: "key:fragment.index 
value:0" in less than 0 seconds
+    And the Minifi logs contain the following message: "key:fragment.index 
value:1" in less than 0 seconds
+    And the Minifi logs contain the following message: "key:fragment.index 
value:2" in less than 0 seconds
+    And the Minifi logs contain the following message: 
"key:fragment.identifier value:" in less than 0 seconds
+    And the Minifi logs contain the following message: 
"key:segment.original.filename value:" in less than 0 seconds
diff --git a/docker/test/integration/minifi/processors/SplitJson.py 
b/docker/test/integration/minifi/processors/SplitJson.py
new file mode 100644
index 000000000..d1b6087f7
--- /dev/null
+++ b/docker/test/integration/minifi/processors/SplitJson.py
@@ -0,0 +1,26 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from ..core.Processor import Processor
+
+
+class SplitJson(Processor):
+    def __init__(self, context, schedule={'scheduling strategy': 
'EVENT_DRIVEN'}):
+        super(SplitJson, self).__init__(
+            context=context,
+            clazz='SplitJson',
+            schedule=schedule,
+            auto_terminate=['original', "split", "failure"])
diff --git a/extensions/standard-processors/processors/SplitJson.cpp 
b/extensions/standard-processors/processors/SplitJson.cpp
new file mode 100644
index 000000000..eacbf0001
--- /dev/null
+++ b/extensions/standard-processors/processors/SplitJson.cpp
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "SplitJson.h"
+
+#include <unordered_map>
+
+#include "core/ProcessSession.h"
+#include "core/ProcessContext.h"
+#include "core/Resource.h"
+#include "utils/ProcessorConfigUtils.h"
+#include "utils/Id.h"
+
+#include "jsoncons_ext/jsonpath/jsonpath.hpp"
+
+namespace org::apache::nifi::minifi::processors {
+
+void SplitJson::initialize() {
+  setSupportedProperties(Properties);
+  setSupportedRelationships(Relationships);
+}
+
+void SplitJson::onSchedule(core::ProcessContext& context, 
core::ProcessSessionFactory&) {
+  json_path_expression_ = utils::parseProperty(context, 
SplitJson::JsonPathExpression);
+  null_value_representation_ = 
utils::parseEnumProperty<split_json::NullValueRepresentationOption>(context, 
SplitJson::NullValueRepresentation);
+}
+
+std::optional<jsoncons::json> 
SplitJson::queryArrayUsingJsonPath(core::ProcessSession& session, const 
std::shared_ptr<core::FlowFile>& flow_file) const {
+  const auto json_string = to_string(session.readBuffer(flow_file));
+  if (json_string.empty()) {
+    logger_->log_error("FlowFile content is empty, transferring to the 
'failure' relationship");
+    return std::nullopt;
+  }
+
+  jsoncons::json json_object;
+  try {
+    json_object = jsoncons::json::parse(json_string);
+  } catch (const jsoncons::json_exception& e) {
+    logger_->log_error("FlowFile content is not a valid JSON document, 
transferring to the 'failure' relationship: {}", e.what());
+    return std::nullopt;
+  }
+
+  jsoncons::json query_result;
+  try {
+    query_result = jsoncons::jsonpath::json_query(json_object, 
json_path_expression_);
+  } catch (const jsoncons::jsonpath::jsonpath_error& e) {
+    logger_->log_error("Invalid JSON path expression '{}' set in the 'JsonPath 
Expression' property: {}", json_path_expression_, e.what());
+    return std::nullopt;
+  }
+
+  return query_result;
+}
+
+std::string SplitJson::jsonValueToString(const jsoncons::json& json_value) 
const {
+  if (json_value.is_null()) {
+    return null_value_representation_ == 
split_json::NullValueRepresentationOption::EmptyString ? "" : "null";
+  }
+
+  if (json_value.is_string()) {
+    return json_value.as<std::string>();
+  }
+
+  return json_value.to_string();
+}
+
+void SplitJson::onTrigger(core::ProcessContext& context, core::ProcessSession& 
session) {
+  auto flow_file = session.get();
+  if (!flow_file) {
+    context.yield();
+    return;
+  }
+
+  auto query_result = queryArrayUsingJsonPath(session, flow_file);
+  if (!query_result) {
+    session.transfer(flow_file, Failure);
+    return;
+  }
+
+  gsl_Assert(query_result->is_array());
+  if (query_result->empty()) {
+    logger_->log_error("JSON Path expression '{}' did not match the input flow 
file content, transferring to the 'failure' relationship", 
json_path_expression_);
+    session.transfer(flow_file, Failure);
+    return;
+  }
+
+  if (query_result->size() == 1 && !query_result.value()[0].is_array()) {
+    logger_->log_error("JSON Path expression '{}' did not return an array, 
transferring to the 'failure' relationship", json_path_expression_);
+    session.transfer(flow_file, Failure);
+    return;
+  }
+
+  const auto fragment_id = 
utils::IdGenerator::getIdGenerator()->generate().to_string();
+  const jsoncons::json& result_array = query_result->size() == 1 ? 
query_result.value()[0] : *query_result;
+  const auto original_filename = 
flow_file->getAttribute(core::SpecialFlowAttribute::FILENAME);
+
+  for (size_t i = 0; i < result_array.size(); ++i) {
+    auto child_flow_file = session.create(flow_file.get());
+    child_flow_file->setAttribute(core::SpecialFlowAttribute::FILENAME, 
child_flow_file->getUUIDStr());
+    child_flow_file->setAttribute(SplitJson::FragmentIndex.name, 
std::to_string(i));
+    child_flow_file->setAttribute(SplitJson::FragmentCount.name, 
std::to_string(result_array.size()));
+    child_flow_file->setAttribute(SplitJson::FragmentIdentifier.name, 
fragment_id);
+    child_flow_file->setAttribute(SplitJson::SegmentOriginalFilename.name,  
original_filename ? original_filename.value() : "");
+
+    auto& json_value_to_write = result_array[i];
+    session.write(child_flow_file, [this, &json_value_to_write](const 
std::shared_ptr<io::OutputStream>& output_stream) -> int64_t {
+      auto result_string = jsonValueToString(json_value_to_write);
+      return gsl::narrow<int64_t>(output_stream->write(reinterpret_cast<const 
uint8_t*>(result_string.data()), result_string.size()));
+    });
+
+    session.transfer(child_flow_file, Split);
+  }
+
+  flow_file->setAttribute(SplitJson::FragmentIdentifier.name, fragment_id);
+  flow_file->setAttribute(SplitJson::FragmentCount.name, 
std::to_string(result_array.size()));
+  session.transfer(flow_file, Original);
+}
+
+REGISTER_RESOURCE(SplitJson, Processor);
+
+}  // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/standard-processors/processors/SplitJson.h 
b/extensions/standard-processors/processors/SplitJson.h
new file mode 100644
index 000000000..ff386c187
--- /dev/null
+++ b/extensions/standard-processors/processors/SplitJson.h
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <string>
+#include <string_view>
+#include <array>
+#include <optional>
+
+#include "core/logging/LoggerFactory.h"
+#include "core/ProcessorImpl.h"
+#include "core/PropertyDefinition.h"
+#include "core/PropertyDefinitionBuilder.h"
+#include "minifi-cpp/core/PropertyValidator.h"
+#include "core/RelationshipDefinition.h"
+
+#include "jsoncons/json.hpp"
+
+namespace org::apache::nifi::minifi::processors::split_json {
+enum class NullValueRepresentationOption {
+  EmptyString,
+  Null
+};
+}  // namespace org::apache::nifi::minifi::processors::split_json
+
+namespace magic_enum::customize {
+using NullValueRepresentationOption = 
org::apache::nifi::minifi::processors::split_json::NullValueRepresentationOption;
+
+template <>
+constexpr customize_t 
enum_name<NullValueRepresentationOption>(NullValueRepresentationOption value) 
noexcept {
+  switch (value) {
+    case NullValueRepresentationOption::EmptyString:
+      return "empty string";
+    case NullValueRepresentationOption::Null:
+      return "the string 'null'";
+  }
+  return invalid_tag;
+}
+}  // namespace magic_enum::customize
+
+namespace org::apache::nifi::minifi::processors {
+
+class SplitJson final : public core::ProcessorImpl {
+ public:
+  EXTENSIONAPI static constexpr const char* Description =
+      "Splits a JSON File into multiple, separate FlowFiles for an array 
element specified by a JsonPath expression. Each generated FlowFile is 
comprised of an element of the specified array "
+      "and transferred to relationship 'split,' with the original file 
transferred to the 'original' relationship. If the specified JsonPath is not 
found or does not evaluate to an array element, "
+      "the original file is routed to 'failure' and no files are generated.";
+
+  EXTENSIONAPI static constexpr auto JsonPathExpression = 
core::PropertyDefinitionBuilder<>::createProperty("JsonPath Expression")
+      .withDescription("A JsonPath expression that indicates the array element 
to split into JSON/scalar fragments.")
+      .withValidator(core::StandardPropertyValidators::NON_BLANK_VALIDATOR)
+      .isRequired(true)
+      .build();
+  EXTENSIONAPI static constexpr auto NullValueRepresentation = 
core::PropertyDefinitionBuilder<2>::createProperty("Null Value Representation")
+      .withDescription("Indicates the desired representation of JSON Path 
expressions resulting in a null value.")
+      
.withAllowedValues(magic_enum::enum_names<split_json::NullValueRepresentationOption>())
+      
.withDefaultValue(magic_enum::enum_name(split_json::NullValueRepresentationOption::EmptyString))
+      .isRequired(true)
+      .build();
+
+  EXTENSIONAPI static constexpr auto Properties = 
std::to_array<core::PropertyReference>({
+      JsonPathExpression,
+      NullValueRepresentation
+  });
+
+  EXTENSIONAPI static constexpr core::RelationshipDefinition 
Failure{"failure", "If a FlowFile fails processing for any reason (for example, 
the FlowFile is not valid JSON or the specified path "
+      "does not exist), it will be routed to this relationship"};
+  EXTENSIONAPI static constexpr core::RelationshipDefinition 
Original{"original", "The original FlowFile that was split into segments. If 
the FlowFile fails processing, nothing will be sent "
+      "to this relationship"};
+  EXTENSIONAPI static constexpr core::RelationshipDefinition Split{"split", 
"All segments of the original FlowFile will be routed to this relationship"};
+
+  EXTENSIONAPI static constexpr auto Relationships = std::array{Failure, 
Original, Split};
+
+  EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false;
+  EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false;
+  EXTENSIONAPI static constexpr core::annotation::Input InputRequirement = 
core::annotation::Input::INPUT_REQUIRED;
+  EXTENSIONAPI static constexpr bool IsSingleThreaded = false;
+
+  ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
+
+  EXTENSIONAPI static constexpr auto FragmentIdentifier = 
core::OutputAttributeDefinition<2>{"fragment.identifier", {Split, Original},
+    "All split FlowFiles produced from the same parent FlowFile will have the 
same randomly generated UUID added for this attribute"};
+  EXTENSIONAPI static constexpr auto FragmentIndex = 
core::OutputAttributeDefinition<>{"fragment.index", {Split},
+    "A one-up number that indicates the ordering of the split FlowFiles that 
were created from a single parent FlowFile"};
+  EXTENSIONAPI static constexpr auto FragmentCount = 
core::OutputAttributeDefinition<2>{"fragment.count", {Split, Original},
+    "The number of split FlowFiles generated from the parent FlowFile"};
+  EXTENSIONAPI static constexpr auto SegmentOriginalFilename = 
core::OutputAttributeDefinition<>{"segment.original.filename", {Split},
+    "The filename of the parent FlowFile"};
+  EXTENSIONAPI static constexpr auto OutputAttributes = 
std::to_array<core::OutputAttributeReference>({
+    FragmentIdentifier,
+    FragmentIndex,
+    FragmentCount,
+    SegmentOriginalFilename
+  });
+
+  using ProcessorImpl::ProcessorImpl;
+
+  void initialize() override;
+  void onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& 
session_factory) override;
+  void onTrigger(core::ProcessContext& context, core::ProcessSession& session) 
override;
+
+ private:
+  std::optional<jsoncons::json> queryArrayUsingJsonPath(core::ProcessSession& 
session, const std::shared_ptr<core::FlowFile>& flow) const;
+  std::string jsonValueToString(const jsoncons::json& json_value) const;
+
+  std::string json_path_expression_;
+  split_json::NullValueRepresentationOption null_value_representation_ = 
split_json::NullValueRepresentationOption::EmptyString;
+};
+
+}  // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/standard-processors/tests/unit/SplitJsonTests.cpp 
b/extensions/standard-processors/tests/unit/SplitJsonTests.cpp
new file mode 100644
index 000000000..e0e126c1a
--- /dev/null
+++ b/extensions/standard-processors/tests/unit/SplitJsonTests.cpp
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "unit/TestBase.h"
+#include "unit/Catch.h"
+#include "unit/SingleProcessorTestController.h"
+#include "processors/SplitJson.h"
+#include "unit/TestUtils.h"
+#include "unit/ProcessorUtils.h"
+
+namespace org::apache::nifi::minifi::test {
+
+class SplitJsonTestFixture {
+ public:
+  SplitJsonTestFixture() :
+      controller_(utils::make_processor<processors::SplitJson>("SplitJson")),
+      split_json_processor_(controller_.getProcessor()) {
+    REQUIRE(split_json_processor_);
+    LogTestController::getInstance().setTrace<processors::SplitJson>();
+  }
+
+ protected:
+  void verifySuccessfulSplit(const std::string& input_json_content, const 
std::string& json_path_expression, const std::vector<std::string>& 
expected_split_contents) {
+    REQUIRE(controller_.plan->setProperty(split_json_processor_, 
processors::SplitJson::JsonPathExpression, json_path_expression));
+
+    auto result = controller_.trigger({{.content = input_json_content, 
.attributes = {{"filename", "unique_file_name"}}}});
+
+    CHECK(result.at(processors::SplitJson::Failure).empty());
+    REQUIRE(result.at(processors::SplitJson::Original).size() == 1);
+    auto original_flow_file = result.at(processors::SplitJson::Original).at(0);
+    CHECK(controller_.plan->getContent(original_flow_file) == 
input_json_content);
+    auto original_fragment_id = 
original_flow_file->getAttribute(processors::SplitJson::FragmentIdentifier.name);
+    CHECK_FALSE(original_fragment_id->empty());
+    auto fragment_count = 
original_flow_file->getAttribute(processors::SplitJson::FragmentCount.name);
+    CHECK(fragment_count.value() == 
std::to_string(expected_split_contents.size()));
+
+    REQUIRE(result.at(processors::SplitJson::Split).size() == 
expected_split_contents.size());
+    for (size_t i = 0; i < expected_split_contents.size(); ++i) {
+      auto flow_file = result.at(processors::SplitJson::Split).at(i);
+      CHECK(controller_.plan->getContent(flow_file) == 
expected_split_contents[i]);
+      auto fragment_id = 
flow_file->getAttribute(processors::SplitJson::FragmentIdentifier.name);
+      CHECK(fragment_id.value() == original_fragment_id.value());
+      
CHECK(flow_file->getAttribute(processors::SplitJson::FragmentCount.name).value()
 == std::to_string(expected_split_contents.size()));
+      
CHECK(flow_file->getAttribute(processors::SplitJson::FragmentIndex.name).value()
 == std::to_string(i));
+      
CHECK(flow_file->getAttribute(processors::SplitJson::SegmentOriginalFilename.name).value()
 == "unique_file_name");
+      
CHECK_FALSE(flow_file->getAttribute(core::SpecialFlowAttribute::FILENAME).value()
 == "unique_file_name");
+    }
+  }
+
+  SingleProcessorTestController controller_;
+  core::Processor* split_json_processor_;
+};
+
+TEST_CASE_METHOD(SplitJsonTestFixture, "Query fails with parsing issues", 
"[SplitJsonTests]") {
+  ProcessorTriggerResult result;
+  std::string error_log;
+  REQUIRE(controller_.plan->setProperty(split_json_processor_, 
processors::SplitJson::JsonPathExpression, "invalid json path"));
+  SECTION("Flow file content is empty") {
+    result = controller_.trigger({{.content = ""}});
+    error_log = "FlowFile content is empty, transferring to the 'failure' 
relationship";
+  }
+
+  SECTION("Flow file content is invalid json") {
+    result = controller_.trigger({{.content = "invalid json"}});
+    error_log = "FlowFile content is not a valid JSON document, transferring 
to the 'failure' relationship";
+  }
+
+  SECTION("Json Path expression is invalid") {
+    result = controller_.trigger({{.content = "{}"}});
+    error_log = "Invalid JSON path expression 'invalid json path' set in the 
'JsonPath Expression' property:";
+  }
+
+  CHECK(result.at(processors::SplitJson::Original).empty());
+  CHECK(result.at(processors::SplitJson::Split).empty());
+  CHECK(result.at(processors::SplitJson::Failure).size() == 1);
+  CHECK(utils::verifyLogLinePresenceInPollTime(1s, error_log));
+}
+
+TEST_CASE_METHOD(SplitJsonTestFixture, "Query does not match input JSON 
content", "[SplitJsonTests]") {
+  REQUIRE(controller_.plan->setProperty(split_json_processor_, 
processors::SplitJson::JsonPathExpression, "$.email"));
+
+  std::string input_json;
+  SECTION("Flow file content does not contain the specified path") {
+    input_json = R"({"name": "John"})";
+  }
+
+  SECTION("Flow file content null") {
+    input_json = "null";
+  }
+
+  auto result = controller_.trigger({{.content = input_json}});
+
+  CHECK(result.at(processors::SplitJson::Original).empty());
+  CHECK(result.at(processors::SplitJson::Split).empty());
+  CHECK(result.at(processors::SplitJson::Failure).size() == 1);
+  CHECK(utils::verifyLogLinePresenceInPollTime(1s, "JSON Path expression 
'$.email' did not match the input flow file content, transferring to the 
'failure' relationship"));
+}
+
+TEST_CASE_METHOD(SplitJsonTestFixture, "Query returns non-array result", 
"[SplitJsonTests]") {
+  REQUIRE(controller_.plan->setProperty(split_json_processor_, 
processors::SplitJson::JsonPathExpression, "$.name"));
+
+  auto result = controller_.trigger({{.content = R"({"name": "John"})"}});
+
+  CHECK(result.at(processors::SplitJson::Original).empty());
+  CHECK(result.at(processors::SplitJson::Split).empty());
+  CHECK(result.at(processors::SplitJson::Failure).size() == 1);
+  CHECK(utils::verifyLogLinePresenceInPollTime(1s, "JSON Path expression 
'$.name' did not return an array, transferring to the 'failure' relationship"));
+}
+
+TEST_CASE_METHOD(SplitJsonTestFixture, "Query returns a single array of 
scalars", "[SplitJsonTests]") {
+  verifySuccessfulSplit(R"({"names": ["John", "Jane"]})", "$.names", {"John", 
"Jane"});
+}
+
+TEST_CASE_METHOD(SplitJsonTestFixture, "Query returns an multiple matches", 
"[SplitJsonTests]") {
+  const std::string json_content = R"({"company": {"departments": [{"name": 
"Engineering", "employees": ["Alice", "Bob"]}, {"name": "Marketing", 
"employees": "Dave"}]}})";
+  verifySuccessfulSplit(json_content, "$.company.departments[*].employees", 
{R"(["Alice","Bob"])", "Dave"});
+}
+
+TEST_CASE_METHOD(SplitJsonTestFixture, "Query returns an array of objects", 
"[SplitJsonTests]") {
+  const std::string json_content = R"({"company": {"departments": [{"name": 
"Engineering", "employees": ["Alice", "Bob"]}, {"name": "Marketing", 
"employees": "Dave"}]}})";
+  verifySuccessfulSplit(json_content, "$.company.departments[*]", 
{R"({"employees":["Alice","Bob"],"name":"Engineering"})", 
R"({"employees":"Dave","name":"Marketing"})"});
+}
+
+TEST_CASE_METHOD(SplitJsonTestFixture, "Query returns an array of scalars with 
null values", "[SplitJsonTests]") {
+  const std::string json_content = R"({"fruits": ["Apple", null, "Banana", 
null, "Cherry"]})";
+  SECTION("Null value representation is set to empty string") {
+    REQUIRE(controller_.plan->setProperty(split_json_processor_, 
processors::SplitJson::NullValueRepresentation, "empty string"));
+    verifySuccessfulSplit(json_content, "$.fruits", {"Apple", "", "Banana", 
"", "Cherry"});
+  }
+
+  SECTION("Null value representation is set to 'null' string") {
+    REQUIRE(controller_.plan->setProperty(split_json_processor_, 
processors::SplitJson::NullValueRepresentation, "the string 'null'"));
+    verifySuccessfulSplit(json_content, "$.fruits", {"Apple", "null", 
"Banana", "null", "Cherry"});
+  }
+}
+
+}  // namespace org::apache::nifi::minifi::test

Reply via email to