This is an automated email from the ASF dual-hosted git repository. lordgamez pushed a commit to branch MINIFICPP-2458 in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 21d9dca1658fc252ed9e96d64322af8e678aa95b Author: Gabor Gyimesi <[email protected]> AuthorDate: Thu Jun 12 13:44:54 2025 +0200 MINIFICPP-2458 Create EvaluateJsonPath processor --- LICENSE | 34 ++- NOTICE | 1 + PROCESSORS.md | 27 ++ .../CMakeLists.txt => cmake/Jsoncons.cmake | 23 +- .../features/evaluate_json_path.feature | 50 ++++ .../minifi/processors/EvaluateJsonPath.py | 22 ++ extensions/standard-processors/CMakeLists.txt | 3 +- .../processors/EvaluateJsonPath.cpp | 162 ++++++++++ .../processors/EvaluateJsonPath.h | 195 ++++++++++++ .../tests/unit/EvaluateJsonPathTests.cpp | 330 +++++++++++++++++++++ 10 files changed, 829 insertions(+), 18 deletions(-) diff --git a/LICENSE b/LICENSE index 28eb012c2..a294d6e32 100644 --- a/LICENSE +++ b/LICENSE @@ -437,7 +437,7 @@ The views and conclusions contained in the software and documentation are those should not be interpreted as representing official policies, either expressed or implied, of Dmitry Vyukov. -This product bundles 'concurrentqueue' which is available a Boost Software License. +This product bundles 'concurrentqueue' which is available under a Boost Software License. Boost Software License - Version 1.0 - August 17th, 2003 @@ -3485,3 +3485,35 @@ AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + + +This product bundles 'jsoncons' which is available under a Boost Software License. + +// Copyright Daniel Parker 2013 - 2020. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +Boost Software License - Version 1.0 - August 17th, 2003 + +Permission is hereby granted, free of charge, to any person or organization +obtaining a copy of the software and accompanying documentation covered by +this license (the "Software") to use, reproduce, display, distribute, +execute, and transmit the Software, and to prepare derivative works of the +Software, and to permit third-parties to whom the Software is furnished to +do so, all subject to the following: + +The copyright notices in the Software and this entire statement, including +the above license grant, this restriction and the following disclaimer, +must be included in all copies of the Software, in whole or in part, and +all derivative works of the Software, unless such copies or derivative +works are solely in the form of machine-executable object code generated by +a source language processor. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT +SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE +FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE, +ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +DEALINGS IN THE SOFTWARE. diff --git a/NOTICE b/NOTICE index e32c36117..c2b80d9ca 100644 --- a/NOTICE +++ b/NOTICE @@ -78,6 +78,7 @@ This software includes third party software subject to the following copyrights: - llhttp - Copyright Fedor Indutny, 2018. - benchmark - Copyright 2015 Google Inc. - llama.cpp - Copyright (c) 2023-2024 The ggml authors +- jsoncons - Copyright Daniel Parker 2013 - 2020. The licenses for these third party components are included in LICENSE.txt diff --git a/PROCESSORS.md b/PROCESSORS.md index bec6d178f..dfeef9556 100644 --- a/PROCESSORS.md +++ b/PROCESSORS.md @@ -32,6 +32,7 @@ limitations under the License. - [DeleteAzureDataLakeStorage](#DeleteAzureDataLakeStorage) - [DeleteGCSObject](#DeleteGCSObject) - [DeleteS3Object](#DeleteS3Object) +- [EvaluateJsonPath](#EvaluateJsonPath) - [ExecuteProcess](#ExecuteProcess) - [ExecutePythonProcessor](#ExecutePythonProcessor) - [ExecuteScript](#ExecuteScript) @@ -627,6 +628,32 @@ In the list below, the names of required properties appear in bold. Any other pr | failure | FlowFiles are routed to failure relationship | +## EvaluateJsonPath + +### Description + +Evaluates one or more JsonPath expressions against the content of a FlowFile. The results of those expressions are assigned to FlowFile Attributes or are written to the content of the FlowFile itself, depending on configuration of the Processor. JsonPaths are entered by adding user-defined properties; the name of the property maps to the Attribute Name into which the result will be placed (if the Destination is flowfile-attribute; otherwise, the property name is ignored). The value of th [...] + +### Properties + +In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language. + +| Name | Default Value | Allowable Values | Description | +|-------------------------------|--------------------|-----------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| **Destination** | flowfile-attribute | flowfile-content<br/>flowfile-attribute | Indicates whether the results of the JsonPath evaluation are written to the FlowFile content or a FlowFile attribute. If using attribute, must specify the Attribute Name property. If set to flowfile-content, only one JsonPath may be specified, and the property name is ignored. | +| **Null Value Representation** | empty string | empty string<br/>the string 'null' | Indicates the desired representation of JSON Path expressions resulting in a null value. | +| **Path Not Found Behavior** | ignore | warn<br/>ignore<br/>skip | Indicates how to handle missing JSON path expressions when destination is set to 'flowfile-attribute'. Selecting 'warn' will generate a warning when a JSON path expression is not found. Selecting 'skip' will omit attributes for any unmatched JSON path expressions. | +| **Return Type** | auto-detect | auto-detect<br/>json<br/>scalar | Indicates the desired return type of the JSON Path expressions. Selecting 'auto-detect' will set the return type to 'json' for a Destination of 'flowfile-content', and 'scalar' for a Destination of 'flowfile-attribute'. | + +### Relationships + +| Name | Description | +|-----------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| failure | FlowFiles are routed to this relationship when the JsonPath cannot be evaluated against the content of the FlowFile; for instance, if the FlowFile is not valid JSON | +| matched | FlowFiles are routed to this relationship when the JsonPath is successfully evaluated and the FlowFile is modified as a result | +| unmatched | FlowFiles are routed to this relationship when the JsonPath does not match the content of the FlowFile and the Destination is set to flowfile-content | + + ## ExecuteProcess ### Description diff --git a/extensions/standard-processors/CMakeLists.txt b/cmake/Jsoncons.cmake similarity index 54% copy from extensions/standard-processors/CMakeLists.txt copy to cmake/Jsoncons.cmake index 7c643c087..82ae6d2ff 100644 --- a/extensions/standard-processors/CMakeLists.txt +++ b/cmake/Jsoncons.cmake @@ -1,4 +1,3 @@ -# # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information @@ -15,22 +14,14 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -# - - -include(${CMAKE_SOURCE_DIR}/extensions/ExtensionHeader.txt) - -file(GLOB SOURCES "processors/*.cpp" "controllers/*.cpp" "utils/*.cpp" "modbus/*.cpp") - -add_minifi_library(minifi-standard-processors SHARED ${SOURCES}) -target_include_directories(minifi-standard-processors PUBLIC "${CMAKE_SOURCE_DIR}/extensions/standard-processors") -include(RangeV3) -include(Asio) -target_link_libraries(minifi-standard-processors ${LIBMINIFI} Threads::Threads range-v3 asio) +include(FetchContent) -include(Coroutines) -enable_coroutines() +set(JSONCONS_BUILD_TESTS OFF CACHE BOOL "" FORCE) -register_extension(minifi-standard-processors "STANDARD PROCESSORS" STANDARD-PROCESSORS "Provides standard processors" "extensions/standard-processors/tests/") +FetchContent_Declare(jsoncons + URL https://github.com/danielaparker/jsoncons/archive/refs/tags/v1.3.2.tar.gz + URL_HASH SHA256=f22fb163df1a12c2f9ee5f95cad9fc37c6cfbefe0ae6f30aba7440832ef70fbe + ) +FetchContent_MakeAvailable(jsoncons) diff --git a/docker/test/integration/features/evaluate_json_path.feature b/docker/test/integration/features/evaluate_json_path.feature new file mode 100644 index 000000000..71d36647c --- /dev/null +++ b/docker/test/integration/features/evaluate_json_path.feature @@ -0,0 +1,50 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +@CORE +Feature: Writing JSON path query result to attribute or flow file using EvaluateJsonPath processor + Background: + Given the content of "/tmp/output" is monitored + + Scenario: Write query result to flow file + Given a GetFile processor with the "Input Directory" property set to "/tmp/input" + And a file with filename "test_file.json" and content "{"books": [{"title": "The Great Gatsby", "author": "F. Scott Fitzgerald"}, {"title": "1984", "author": "George Orwell"}]}" is present in "/tmp/input" + And a EvaluateJsonPath processor with the "Destination" property set to "flowfile-content" + And the "JsonPath" property of the EvaluateJsonPath processor is set to "$.books[*].title" + And a PutFile processor with the "Directory" property set to "/tmp/output" + And the "success" relationship of the GetFile processor is connected to the EvaluateJsonPath + And the "matched" relationship of the EvaluateJsonPath processor is connected to the PutFile + When the MiNiFi instance starts up + Then a flowfile with the JSON content "["The Great Gatsby","1984"]" is placed in the monitored directory in less than 10 seconds + + Scenario: Write query result to attributes + Given a GetFile processor with the "Input Directory" property set to "/tmp/input" + And a file with filename "test_file.json" and content "{"title": "1984", "author": null}" is present in "/tmp/input" + And a EvaluateJsonPath processor with the "Destination" property set to "flowfile-attribute" + And the "Null Value Representation" property of the EvaluateJsonPath processor is set to "the string 'null'" + And the "Path Not Found Behavior" property of the EvaluateJsonPath processor is set to "skip" + And the "title" property of the EvaluateJsonPath processor is set to "$.title" + And the "author" property of the EvaluateJsonPath processor is set to "$.author" + And the "release" property of the EvaluateJsonPath processor is set to "$.release" + And a PutFile processor with the "Directory" property set to "/tmp/output" + And a LogAttribute processor + And the "success" relationship of the GetFile processor is connected to the EvaluateJsonPath + And the "matched" relationship of the EvaluateJsonPath processor is connected to the PutFile + And the "success" relationship of the PutFile processor is connected to the LogAttribute + When the MiNiFi instance starts up + Then a flowfile with the JSON content "{"title": "1984", "author": null}" is placed in the monitored directory in less than 10 seconds + And the Minifi logs contain the following message: "key:title value:1984" in less than 10 seconds + And the Minifi logs contain the following message: "key:author value:null" in less than 0 seconds + And the Minifi logs do not contain the following message: "key:release" after 0 seconds diff --git a/docker/test/integration/minifi/processors/EvaluateJsonPath.py b/docker/test/integration/minifi/processors/EvaluateJsonPath.py new file mode 100644 index 000000000..7874963e5 --- /dev/null +++ b/docker/test/integration/minifi/processors/EvaluateJsonPath.py @@ -0,0 +1,22 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from ..core.Processor import Processor + + +class EvaluateJsonPath(Processor): + def __init__(self, context): + super(EvaluateJsonPath, self).__init__(context=context, clazz='EvaluateJsonPath') diff --git a/extensions/standard-processors/CMakeLists.txt b/extensions/standard-processors/CMakeLists.txt index 7c643c087..7c5129fef 100644 --- a/extensions/standard-processors/CMakeLists.txt +++ b/extensions/standard-processors/CMakeLists.txt @@ -27,7 +27,8 @@ target_include_directories(minifi-standard-processors PUBLIC "${CMAKE_SOURCE_DIR include(RangeV3) include(Asio) -target_link_libraries(minifi-standard-processors ${LIBMINIFI} Threads::Threads range-v3 asio) +include(Jsoncons) +target_link_libraries(minifi-standard-processors ${LIBMINIFI} Threads::Threads range-v3 asio jsoncons) include(Coroutines) enable_coroutines() diff --git a/extensions/standard-processors/processors/EvaluateJsonPath.cpp b/extensions/standard-processors/processors/EvaluateJsonPath.cpp new file mode 100644 index 000000000..4b39572da --- /dev/null +++ b/extensions/standard-processors/processors/EvaluateJsonPath.cpp @@ -0,0 +1,162 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "EvaluateJsonPath.h" + +#include <unordered_map> + +#include "core/ProcessSession.h" +#include "core/ProcessContext.h" +#include "core/Resource.h" +#include "utils/ProcessorConfigUtils.h" + +#include "jsoncons_ext/jsonpath/jsonpath.hpp" + +namespace org::apache::nifi::minifi::processors { + +namespace { +bool isScalar(const jsoncons::json& value) { + return !value.is_array() && !value.is_object(); +} + +bool isQueryResultEmptyOrScalar(const jsoncons::json& query_result) { + return query_result.empty() || (query_result.size() == 1 && isScalar(query_result[0])); +} +} // namespace + +void EvaluateJsonPath::initialize() { + setSupportedProperties(Properties); + setSupportedRelationships(Relationships); +} + +void EvaluateJsonPath::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) { + destination_ = utils::parseEnumProperty<evaluate_json_path::DestinationType>(context, EvaluateJsonPath::Destination); + if (destination_ == evaluate_json_path::DestinationType::FlowFileContent && getDynamicProperties().size() > 1) { + throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Only one dynamic property is allowed for JSON path when destination is set to flowfile-content"); + } + null_value_representation_ = utils::parseEnumProperty<evaluate_json_path::NullValueRepresentationOption>(context, EvaluateJsonPath::NullValueRepresentation); + path_not_found_behavior_ = utils::parseEnumProperty<evaluate_json_path::PathNotFoundBehaviorOption>(context, EvaluateJsonPath::PathNotFoundBehavior); + return_type_ = utils::parseEnumProperty<evaluate_json_path::ReturnTypeOption>(context, EvaluateJsonPath::ReturnType); + if (return_type_ == evaluate_json_path::ReturnTypeOption::AutoDetect) { + if (destination_ == evaluate_json_path::DestinationType::FlowFileContent) { + return_type_ = evaluate_json_path::ReturnTypeOption::JSON; + } else { + return_type_ = evaluate_json_path::ReturnTypeOption::Scalar; + } + } +} + +std::string EvaluateJsonPath::extractQueryResult(const jsoncons::json& query_result) const { + gsl_Expects(!query_result.empty()); + if (query_result.size() > 1) { + gsl_Assert(return_type_ == evaluate_json_path::ReturnTypeOption::JSON); + return query_result.to_string(); + } + + if (query_result[0].is_null()) { + return null_value_representation_ == evaluate_json_path::NullValueRepresentationOption::EmptyString ? "" : "null"; + } + + if (query_result[0].is_string()) { + return query_result[0].as<std::string>(); + } + + return query_result[0].to_string(); +} + +void EvaluateJsonPath::writeQueryResult(core::ProcessSession& session, core::FlowFile& flow_file, const jsoncons::json& query_result, const std::string& property_name, + std::unordered_map<std::string, std::string>& attributes_to_set) const { + if (destination_ == evaluate_json_path::DestinationType::FlowFileContent) { + session.write(flow_file, [&query_result, this](const std::shared_ptr<io::OutputStream>& output_stream) -> int64_t { + auto result_string = extractQueryResult(query_result); + return gsl::narrow<int64_t>(output_stream->write(reinterpret_cast<const uint8_t*>(result_string.data()), result_string.size())); + }); + } else { + attributes_to_set.emplace(property_name, extractQueryResult(query_result)); + } +} + +void EvaluateJsonPath::onTrigger(core::ProcessContext&, core::ProcessSession& session) { + auto flow_file = session.get(); + if (!flow_file) { + return; + } + + const auto flow_file_read_result = session.readBuffer(flow_file); + const auto json_string = std::string(reinterpret_cast<const char*>(flow_file_read_result.buffer.data()), flow_file_read_result.buffer.size()); + if (json_string.empty()) { + logger_->log_error("FlowFile content is empty, transferring to Failure relationship"); + session.transfer(flow_file, Failure); + return; + } + + jsoncons::json json_object; + try { + json_object = jsoncons::json::parse(json_string); + } catch (const jsoncons::json_exception& e) { + logger_->log_error("FlowFile content is not a valid JSON document, transferring to Failure relationship: {}", e.what()); + session.transfer(flow_file, Failure); + return; + } + + std::unordered_map<std::string, std::string> attributes_to_set; + for (const auto& [property_name, json_path] : getDynamicProperties()) { + jsoncons::json query_result; + try { + query_result = jsoncons::jsonpath::json_query(json_object, json_path); + } catch (const jsoncons::jsonpath::jsonpath_error& e) { + logger_->log_error("Invalid JSON path expression '{}' found for attribute key '{}': {}", json_path, property_name, e.what()); + session.transfer(flow_file, Failure); + return; + } + + if (!query_result.is_array() || query_result.empty()) { + if (path_not_found_behavior_ == evaluate_json_path::PathNotFoundBehaviorOption::Warn) { + logger_->log_warn("JSON path '{}' not found for attribute key '{}'", json_path, property_name); + } + + if (destination_ == evaluate_json_path::DestinationType::FlowFileContent) { + logger_->log_debug("JSON path '{}' not found for attribute key '{}', transferring to Unmatched relationship", json_path, property_name); + session.transfer(flow_file, Unmatched); + return; + } + + if (path_not_found_behavior_ != evaluate_json_path::PathNotFoundBehaviorOption::Skip) { + flow_file->setAttribute(property_name, ""); + } + continue; + } + + if (return_type_ == evaluate_json_path::ReturnTypeOption::Scalar && !isQueryResultEmptyOrScalar(query_result)) { + logger_->log_error("JSON path '{}' returned a non-scalar value or multiple values for attribute key '{}', transferring to Failure relationship", json_path, property_name); + session.transfer(flow_file, Failure); + return; + } + + writeQueryResult(session, *flow_file, query_result, property_name, attributes_to_set); + } + + if (destination_ == evaluate_json_path::DestinationType::FlowFileAttribute) { + for (const auto& [key, value] : attributes_to_set) { + session.putAttribute(*flow_file, key, value); + } + } + session.transfer(flow_file, Matched); +} + +REGISTER_RESOURCE(EvaluateJsonPath, Processor); + +} // namespace org::apache::nifi::minifi::processors diff --git a/extensions/standard-processors/processors/EvaluateJsonPath.h b/extensions/standard-processors/processors/EvaluateJsonPath.h new file mode 100644 index 000000000..79af1a806 --- /dev/null +++ b/extensions/standard-processors/processors/EvaluateJsonPath.h @@ -0,0 +1,195 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include <string> +#include <string_view> +#include <array> + +#include "core/logging/LoggerFactory.h" +#include "core/Processor.h" +#include "core/PropertyDefinition.h" +#include "core/PropertyDefinitionBuilder.h" +#include "minifi-cpp/core/PropertyValidator.h" +#include "core/RelationshipDefinition.h" + +#include "jsoncons/json.hpp" + +namespace org::apache::nifi::minifi::processors::evaluate_json_path { +enum class DestinationType { + FlowFileContent, + FlowFileAttribute +}; + +enum class NullValueRepresentationOption { + EmptyString, + Null +}; + +enum class ReturnTypeOption { + AutoDetect, + JSON, + Scalar +}; + +enum class PathNotFoundBehaviorOption { + Warn, + Ignore, + Skip +}; +} // namespace org::apache::nifi::minifi::processors::evaluate_json_path + +namespace magic_enum::customize { +using DestinationType = org::apache::nifi::minifi::processors::evaluate_json_path::DestinationType; +using NullValueRepresentationOption = org::apache::nifi::minifi::processors::evaluate_json_path::NullValueRepresentationOption; +using ReturnTypeOption = org::apache::nifi::minifi::processors::evaluate_json_path::ReturnTypeOption; +using PathNotFoundBehaviorOption = org::apache::nifi::minifi::processors::evaluate_json_path::PathNotFoundBehaviorOption; + +template <> +constexpr customize_t enum_name<DestinationType>(DestinationType value) noexcept { + switch (value) { + case DestinationType::FlowFileContent: + return "flowfile-content"; + case DestinationType::FlowFileAttribute: + return "flowfile-attribute"; + } + return invalid_tag; +} + +template <> +constexpr customize_t enum_name<NullValueRepresentationOption>(NullValueRepresentationOption value) noexcept { + switch (value) { + case NullValueRepresentationOption::EmptyString: + return "empty string"; + case NullValueRepresentationOption::Null: + return "the string 'null'"; + } + return invalid_tag; +} + +template <> +constexpr customize_t enum_name<ReturnTypeOption>(ReturnTypeOption value) noexcept { + switch (value) { + case ReturnTypeOption::AutoDetect: + return "auto-detect"; + case ReturnTypeOption::JSON: + return "json"; + case ReturnTypeOption::Scalar: + return "scalar"; + } + return invalid_tag; +} + +template <> +constexpr customize_t enum_name<PathNotFoundBehaviorOption>(PathNotFoundBehaviorOption value) noexcept { + switch (value) { + case PathNotFoundBehaviorOption::Warn: + return "warn"; + case PathNotFoundBehaviorOption::Ignore: + return "ignore"; + case PathNotFoundBehaviorOption::Skip: + return "skip"; + } + return invalid_tag; +} +} // namespace magic_enum::customize + +namespace org::apache::nifi::minifi::processors { + +class EvaluateJsonPath final : public core::ProcessorImpl { + public: + EXTENSIONAPI static constexpr const char* Description = "Evaluates one or more JsonPath expressions against the content of a FlowFile. The results of those expressions are assigned to " + "FlowFile Attributes or are written to the content of the FlowFile itself, depending on configuration of the Processor. JsonPaths are entered by adding user-defined properties; " + "the name of the property maps to the Attribute Name into which the result will be placed (if the Destination is flowfile-attribute; otherwise, the property name is ignored). " + "The value of the property must be a valid JsonPath expression. A Return Type of 'auto-detect' will make a determination based off the configured destination. When 'Destination' is set to " + "'flowfile-attribute,' a return type of 'scalar' will be used. When 'Destination' is set to 'flowfile-content,' a return type of 'JSON' will be used.If the JsonPath evaluates to a JSON " + "array or JSON object and the Return Type is set to 'scalar' the FlowFile will be unmodified and will be routed to failure. A Return Type of JSON can return scalar values if the provided " + "JsonPath evaluates to the specified value and will be routed as a match.If Destination is 'flowfile-content' and the JsonPath does not evaluate to a defined path, the FlowFile will be " + "routed to 'unmatched' without having its contents modified. If Destination is 'flowfile-attribute' and the expression matches nothing, attributes will be created with empty strings as the " + "value unless 'Path Not Found Behaviour' is set to 'skip', and the FlowFile will always be routed to 'matched.'"; + + EXTENSIONAPI static constexpr auto Destination = core::PropertyDefinitionBuilder<2>::createProperty("Destination") + .withDescription("Indicates whether the results of the JsonPath evaluation are written to the FlowFile content or a FlowFile attribute. If using attribute, must specify the Attribute Name " + "property. If set to flowfile-content, only one JsonPath may be specified, and the property name is ignored.") + .withAllowedValues(magic_enum::enum_names<evaluate_json_path::DestinationType>()) + .withDefaultValue(magic_enum::enum_name(evaluate_json_path::DestinationType::FlowFileAttribute)) + .isRequired(true) + .build(); + EXTENSIONAPI static constexpr auto NullValueRepresentation = core::PropertyDefinitionBuilder<2>::createProperty("Null Value Representation") + .withDescription("Indicates the desired representation of JSON Path expressions resulting in a null value.") + .withAllowedValues(magic_enum::enum_names<evaluate_json_path::NullValueRepresentationOption>()) + .withDefaultValue(magic_enum::enum_name(evaluate_json_path::NullValueRepresentationOption::EmptyString)) + .isRequired(true) + .build(); + EXTENSIONAPI static constexpr auto PathNotFoundBehavior = core::PropertyDefinitionBuilder<3>::createProperty("Path Not Found Behavior") + .withDescription("Indicates how to handle missing JSON path expressions when destination is set to 'flowfile-attribute'. Selecting 'warn' will generate a warning when a JSON path expression is " + "not found. Selecting 'skip' will omit attributes for any unmatched JSON path expressions.") + .withAllowedValues(magic_enum::enum_names<evaluate_json_path::PathNotFoundBehaviorOption>()) + .withDefaultValue(magic_enum::enum_name(evaluate_json_path::PathNotFoundBehaviorOption::Ignore)) + .isRequired(true) + .build(); + EXTENSIONAPI static constexpr auto ReturnType = core::PropertyDefinitionBuilder<3>::createProperty("Return Type") + .withDescription("Indicates the desired return type of the JSON Path expressions. Selecting 'auto-detect' will set the return type to 'json' for a Destination of 'flowfile-content', and " + "'scalar' for a Destination of 'flowfile-attribute'.") + .withAllowedValues(magic_enum::enum_names<evaluate_json_path::ReturnTypeOption>()) + .withDefaultValue(magic_enum::enum_name(evaluate_json_path::ReturnTypeOption::AutoDetect)) + .isRequired(true) + .build(); + + EXTENSIONAPI static constexpr auto Properties = std::to_array<core::PropertyReference>({ + Destination, + NullValueRepresentation, + PathNotFoundBehavior, + ReturnType + }); + + EXTENSIONAPI static constexpr core::RelationshipDefinition Failure{"failure", "FlowFiles are routed to this relationship when the JsonPath cannot be evaluated against the content of the FlowFile; " + "for instance, if the FlowFile is not valid JSON"}; + EXTENSIONAPI static constexpr core::RelationshipDefinition Matched{"matched", "FlowFiles are routed to this relationship when the JsonPath is successfully evaluated and the FlowFile is modified " + "as a result"}; + EXTENSIONAPI static constexpr core::RelationshipDefinition Unmatched{"unmatched", "FlowFiles are routed to this relationship when the JsonPath does not match the content of the FlowFile and the " + "Destination is set to flowfile-content"}; + EXTENSIONAPI static constexpr auto Relationships = std::array{Failure, Matched, Unmatched}; + + EXTENSIONAPI static constexpr bool SupportsDynamicProperties = true; + EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false; + EXTENSIONAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_REQUIRED; + EXTENSIONAPI static constexpr bool IsSingleThreaded = false; + + ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS + + explicit EvaluateJsonPath(const std::string_view name, const utils::Identifier& uuid = {}) + : core::ProcessorImpl(name, uuid) { + logger_ = core::logging::LoggerFactory<EvaluateJsonPath>::getLogger(uuid_); + } + + void initialize() override; + void onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& session_factory) override; + void onTrigger(core::ProcessContext& context, core::ProcessSession& session) override; + + private: + std::string extractQueryResult(const jsoncons::json& query_result) const; + void writeQueryResult(core::ProcessSession& session, core::FlowFile& flow_file, const jsoncons::json& query_result, const std::string& property_name, + std::unordered_map<std::string, std::string>& attributes_to_set) const; + + evaluate_json_path::DestinationType destination_ = evaluate_json_path::DestinationType::FlowFileAttribute; + evaluate_json_path::NullValueRepresentationOption null_value_representation_ = evaluate_json_path::NullValueRepresentationOption::EmptyString; + evaluate_json_path::PathNotFoundBehaviorOption path_not_found_behavior_ = evaluate_json_path::PathNotFoundBehaviorOption::Ignore; + evaluate_json_path::ReturnTypeOption return_type_ = evaluate_json_path::ReturnTypeOption::AutoDetect; +}; + +} // namespace org::apache::nifi::minifi::processors diff --git a/extensions/standard-processors/tests/unit/EvaluateJsonPathTests.cpp b/extensions/standard-processors/tests/unit/EvaluateJsonPathTests.cpp new file mode 100644 index 000000000..12c354303 --- /dev/null +++ b/extensions/standard-processors/tests/unit/EvaluateJsonPathTests.cpp @@ -0,0 +1,330 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "unit/TestBase.h" +#include "unit/Catch.h" +#include "unit/SingleProcessorTestController.h" +#include "processors/EvaluateJsonPath.h" +#include "unit/TestUtils.h" + +namespace org::apache::nifi::minifi::test { + +class EvaluateJsonPathTestFixture { + public: + EvaluateJsonPathTestFixture() : + controller_(std::make_unique<processors::EvaluateJsonPath>("EvaluateJsonPath")), + evaluate_json_path_processor_(dynamic_cast<processors::EvaluateJsonPath*>(controller_.getProcessor())) { + REQUIRE(evaluate_json_path_processor_); + LogTestController::getInstance().setTrace<processors::EvaluateJsonPath>(); + } + + protected: + SingleProcessorTestController controller_; + processors::EvaluateJsonPath* evaluate_json_path_processor_; +}; + +TEST_CASE_METHOD(EvaluateJsonPathTestFixture, "When destination is set to flowfile content only one dynamic property is allowed", "[EvaluateJsonPathTests]") { + controller_.plan->setProperty(evaluate_json_path_processor_, processors::EvaluateJsonPath::Destination, "flowfile-content"); + controller_.plan->setDynamicProperty(evaluate_json_path_processor_, "attribute1", "value1"); + controller_.plan->setDynamicProperty(evaluate_json_path_processor_, "attribute2", "value2"); + REQUIRE_THROWS_WITH(controller_.trigger({{.content = "foo"}}), "Process Schedule Operation: Only one dynamic property is allowed for JSON path when destination is set to flowfile-content"); +} + +TEST_CASE_METHOD(EvaluateJsonPathTestFixture, "Input flowfile has invalid JSON as content", "[EvaluateJsonPathTests]") { + ProcessorTriggerResult result; + std::string error_log; + SECTION("Flow file content is empty") { + result = controller_.trigger({{.content = ""}}); + error_log = "FlowFile content is empty, transferring to Failure relationship"; + } + + SECTION("Flow file content is invalid json") { + result = controller_.trigger({{.content = "invalid json"}}); + error_log = "FlowFile content is not a valid JSON document, transferring to Failure relationship"; + } + + CHECK(result.at(processors::EvaluateJsonPath::Matched).empty()); + CHECK(result.at(processors::EvaluateJsonPath::Unmatched).empty()); + CHECK(result.at(processors::EvaluateJsonPath::Failure).size() == 1); + CHECK(utils::verifyLogLinePresenceInPollTime(1s, error_log)); +} + +TEST_CASE_METHOD(EvaluateJsonPathTestFixture, "Dynamic property contains invalid JSON path expression", "[EvaluateJsonPathTests]") { + controller_.plan->setDynamicProperty(evaluate_json_path_processor_, "attribute", "1234"); + + auto result = controller_.trigger({{.content = "{}"}}); + + REQUIRE(result.at(processors::EvaluateJsonPath::Matched).empty()); + REQUIRE(result.at(processors::EvaluateJsonPath::Unmatched).empty()); + REQUIRE(result.at(processors::EvaluateJsonPath::Failure).size() == 1); + + const auto result_flow_file = result.at(processors::EvaluateJsonPath::Failure).at(0); + + CHECK(controller_.plan->getContent(result_flow_file) == "{}"); + CHECK(utils::verifyLogLinePresenceInPollTime(0s, "Invalid JSON path expression '1234' found for attribute key 'attribute'")); +} + +TEST_CASE_METHOD(EvaluateJsonPathTestFixture, "JSON paths are not found in content when destination is set to attribute", "[EvaluateJsonPathTests]") { + controller_.plan->setDynamicProperty(evaluate_json_path_processor_, "attribute1", "$.firstName"); + controller_.plan->setDynamicProperty(evaluate_json_path_processor_, "attribute2", "$.lastName"); + + std::map<std::string, std::string> expected_attributes = { + {"attribute1", ""}, + {"attribute2", ""} + }; + + bool warn_path_not_found_behavior = false; + bool expect_attributes = false; + + SECTION("Ignore path not found behavior") { + controller_.plan->setProperty(evaluate_json_path_processor_, processors::EvaluateJsonPath::PathNotFoundBehavior, "ignore"); + expect_attributes = true; + } + + SECTION("Skip path not found behavior") { + controller_.plan->setProperty(evaluate_json_path_processor_, processors::EvaluateJsonPath::PathNotFoundBehavior, "skip"); + } + + SECTION("Warn path not found behavior") { + controller_.plan->setProperty(evaluate_json_path_processor_, processors::EvaluateJsonPath::PathNotFoundBehavior, "warn"); + warn_path_not_found_behavior = true; + expect_attributes = true; + } + + auto result = controller_.trigger({{.content = "{}"}}); + + REQUIRE(result.at(processors::EvaluateJsonPath::Matched).size() == 1); + REQUIRE(result.at(processors::EvaluateJsonPath::Unmatched).empty()); + REQUIRE(result.at(processors::EvaluateJsonPath::Failure).empty()); + + const auto result_flow_file = result.at(processors::EvaluateJsonPath::Matched).at(0); + + CHECK(controller_.plan->getContent(result_flow_file) == "{}"); + + for (const auto& [key, value] : expected_attributes) { + std::string attribute_value; + if (!expect_attributes) { + CHECK_FALSE(result_flow_file->getAttribute(key, attribute_value)); + } else { + CHECK(result_flow_file->getAttribute(key, attribute_value)); + CHECK(attribute_value == value); + } + } + + if (warn_path_not_found_behavior) { + CHECK(utils::verifyLogLinePresenceInPollTime(0s, "JSON path '$.firstName' not found for attribute key 'attribute1'")); + CHECK(utils::verifyLogLinePresenceInPollTime(0s, "JSON path '$.lastName' not found for attribute key 'attribute2'")); + } +} + +TEST_CASE_METHOD(EvaluateJsonPathTestFixture, "JSON paths are not found in content when destination is set in content", "[EvaluateJsonPathTests]") { + controller_.plan->setProperty(evaluate_json_path_processor_, processors::EvaluateJsonPath::Destination, "flowfile-content"); + controller_.plan->setDynamicProperty(evaluate_json_path_processor_, "attribute", "$.firstName"); + + bool warn_path_not_found_behavior = false; + SECTION("Ignore path not found behavior") { + controller_.plan->setProperty(evaluate_json_path_processor_, processors::EvaluateJsonPath::PathNotFoundBehavior, "ignore"); + } + + SECTION("Skip path not found behavior") { + controller_.plan->setProperty(evaluate_json_path_processor_, processors::EvaluateJsonPath::PathNotFoundBehavior, "skip"); + } + + SECTION("Warn path not found behavior") { + controller_.plan->setProperty(evaluate_json_path_processor_, processors::EvaluateJsonPath::PathNotFoundBehavior, "warn"); + warn_path_not_found_behavior = true; + } + + auto result = controller_.trigger({{.content = "{}"}}); + + REQUIRE(result.at(processors::EvaluateJsonPath::Matched).empty()); + REQUIRE(result.at(processors::EvaluateJsonPath::Unmatched).size() == 1); + REQUIRE(result.at(processors::EvaluateJsonPath::Failure).empty()); + + const auto result_flow_file = result.at(processors::EvaluateJsonPath::Unmatched).at(0); + + CHECK(controller_.plan->getContent(result_flow_file) == "{}"); + + std::string attribute_value; + CHECK_FALSE(result_flow_file->getAttribute("attribute", attribute_value)); + + if (warn_path_not_found_behavior) { + CHECK(utils::verifyLogLinePresenceInPollTime(0s, "JSON path '$.firstName' not found for attribute key 'attribute'")); + } +} + +TEST_CASE_METHOD(EvaluateJsonPathTestFixture, "JSON path query result does not match the required return type", "[EvaluateJsonPathTests]") { + controller_.plan->setDynamicProperty(evaluate_json_path_processor_, "attribute", "$.name"); + + SECTION("Return type is set to scalar automatically when destination is set to flowfile-attribute") { + controller_.plan->setProperty(evaluate_json_path_processor_, processors::EvaluateJsonPath::Destination, "flowfile-attribute"); + } + + std::string json_content = R"({"name": {"firstName": "John", "lastName": "Doe"}})"; + auto result = controller_.trigger({{.content = json_content}}); + + REQUIRE(result.at(processors::EvaluateJsonPath::Matched).empty()); + REQUIRE(result.at(processors::EvaluateJsonPath::Unmatched).empty()); + REQUIRE(result.at(processors::EvaluateJsonPath::Failure).size() == 1); + + const auto result_flow_file = result.at(processors::EvaluateJsonPath::Failure).at(0); + + CHECK(controller_.plan->getContent(result_flow_file) == json_content); + std::string attribute_value; + CHECK_FALSE(result_flow_file->getAttribute("attribute", attribute_value)); + CHECK(utils::verifyLogLinePresenceInPollTime(0s, "JSON path '$.name' returned a non-scalar value or multiple values for attribute key 'attribute', transferring to Failure relationship")); +} + +TEST_CASE_METHOD(EvaluateJsonPathTestFixture, "Query JSON object and write it to flow file", "[EvaluateJsonPathTests]") { + controller_.plan->setProperty(evaluate_json_path_processor_, processors::EvaluateJsonPath::Destination, "flowfile-content"); + controller_.plan->setDynamicProperty(evaluate_json_path_processor_, "jsonPath", "$.name"); + + std::string json_content = R"({"name": {"firstName": "John", "lastName": "Doe"}})"; + auto result = controller_.trigger({{.content = json_content}}); + + REQUIRE(result.at(processors::EvaluateJsonPath::Matched).size() == 1); + REQUIRE(result.at(processors::EvaluateJsonPath::Unmatched).empty()); + REQUIRE(result.at(processors::EvaluateJsonPath::Failure).empty()); + + const auto result_flow_file = result.at(processors::EvaluateJsonPath::Matched).at(0); + + CHECK(controller_.plan->getContent(result_flow_file) == R"({"firstName":"John","lastName":"Doe"})"); + std::string attribute_value; + CHECK_FALSE(result_flow_file->getAttribute("jsonPath", attribute_value)); +} + +TEST_CASE_METHOD(EvaluateJsonPathTestFixture, "Query multiple scalars and write them to attributes", "[EvaluateJsonPathTests]") { + controller_.plan->setProperty(evaluate_json_path_processor_, processors::EvaluateJsonPath::Destination, "flowfile-attribute"); + controller_.plan->setDynamicProperty(evaluate_json_path_processor_, "firstName", "$.name.firstName"); + controller_.plan->setDynamicProperty(evaluate_json_path_processor_, "lastName", "$.name.lastName"); + controller_.plan->setDynamicProperty(evaluate_json_path_processor_, "id", "$.id"); + + std::string json_content = R"({"id": 1234, "name": {"firstName": "John", "lastName": "Doe"}})"; + auto result = controller_.trigger({{.content = json_content}}); + + REQUIRE(result.at(processors::EvaluateJsonPath::Matched).size() == 1); + REQUIRE(result.at(processors::EvaluateJsonPath::Unmatched).empty()); + REQUIRE(result.at(processors::EvaluateJsonPath::Failure).empty()); + + const auto result_flow_file = result.at(processors::EvaluateJsonPath::Matched).at(0); + + CHECK(controller_.plan->getContent(result_flow_file) == json_content); + auto attribute_value = result_flow_file->getAttribute("firstName"); + CHECK(attribute_value.value() == "John"); + attribute_value = result_flow_file->getAttribute("lastName"); + CHECK(attribute_value.value() == "Doe"); + attribute_value = result_flow_file->getAttribute("id"); + CHECK(attribute_value.value() == "1234"); +} + +TEST_CASE_METHOD(EvaluateJsonPathTestFixture, "Query a single scalar and write it to flow file", "[EvaluateJsonPathTests]") { + controller_.plan->setProperty(evaluate_json_path_processor_, processors::EvaluateJsonPath::Destination, "flowfile-content"); + controller_.plan->setDynamicProperty(evaluate_json_path_processor_, "firstName", "$.name.firstName"); + + std::string json_content = R"({"id": 1234, "name": {"firstName": "John", "lastName": "Doe"}})"; + auto result = controller_.trigger({{.content = json_content}}); + + REQUIRE(result.at(processors::EvaluateJsonPath::Matched).size() == 1); + REQUIRE(result.at(processors::EvaluateJsonPath::Unmatched).empty()); + REQUIRE(result.at(processors::EvaluateJsonPath::Failure).empty()); + + const auto result_flow_file = result.at(processors::EvaluateJsonPath::Matched).at(0); + + CHECK(controller_.plan->getContent(result_flow_file) == "John"); + std::string attribute_value; + CHECK_FALSE(result_flow_file->getAttribute("firstName", attribute_value)); +} + +TEST_CASE_METHOD(EvaluateJsonPathTestFixture, "Query has multiple results", "[EvaluateJsonPathTests]") { + controller_.plan->setProperty(evaluate_json_path_processor_, processors::EvaluateJsonPath::Destination, "flowfile-content"); + controller_.plan->setDynamicProperty(evaluate_json_path_processor_, "firstName", "$.users[*].name.firstName"); + + std::string json_content = R"({"users": [{"id": 1234, "name": {"firstName": "John", "lastName": "Doe"}}, {"id": 2345, "name": {"firstName": "Jane", "lastName": "Smith"}}]})"; + auto result = controller_.trigger({{.content = json_content}}); + + REQUIRE(result.at(processors::EvaluateJsonPath::Matched).size() == 1); + REQUIRE(result.at(processors::EvaluateJsonPath::Unmatched).empty()); + REQUIRE(result.at(processors::EvaluateJsonPath::Failure).empty()); + + const auto result_flow_file = result.at(processors::EvaluateJsonPath::Matched).at(0); + + CHECK(controller_.plan->getContent(result_flow_file) == "[\"John\",\"Jane\"]"); + std::string attribute_value; + CHECK_FALSE(result_flow_file->getAttribute("firstName", attribute_value)); +} + +TEST_CASE_METHOD(EvaluateJsonPathTestFixture, "Query result is null value in flow file content", "[EvaluateJsonPathTests]") { + controller_.plan->setProperty(evaluate_json_path_processor_, processors::EvaluateJsonPath::Destination, "flowfile-content"); + controller_.plan->setDynamicProperty(evaluate_json_path_processor_, "email", "$.name.email"); + + std::string expected_content; + SECTION("Null value representation is set to empty string") { + expected_content = ""; + } + + SECTION("Null value representation is null string") { + controller_.plan->setProperty(evaluate_json_path_processor_, processors::EvaluateJsonPath::NullValueRepresentation, "the string 'null'"); + expected_content = "null"; + } + + std::string json_content = R"({"id": 1234, "name": {"firstName": "John", "lastName": "Doe", "email": null}})"; + auto result = controller_.trigger({{.content = json_content}}); + + REQUIRE(result.at(processors::EvaluateJsonPath::Matched).size() == 1); + REQUIRE(result.at(processors::EvaluateJsonPath::Unmatched).empty()); + REQUIRE(result.at(processors::EvaluateJsonPath::Failure).empty()); + + const auto result_flow_file = result.at(processors::EvaluateJsonPath::Matched).at(0); + + CHECK(controller_.plan->getContent(result_flow_file) == expected_content); + std::string attribute_value; + CHECK_FALSE(result_flow_file->getAttribute("firstName", attribute_value)); +} + +TEST_CASE_METHOD(EvaluateJsonPathTestFixture, "Query result is null value in flow file attribute", "[EvaluateJsonPathTests]") { + controller_.plan->setProperty(evaluate_json_path_processor_, processors::EvaluateJsonPath::Destination, "flowfile-attribute"); + controller_.plan->setDynamicProperty(evaluate_json_path_processor_, "firstName", "$.user.firstName"); + controller_.plan->setDynamicProperty(evaluate_json_path_processor_, "email", "$.user.email"); + + std::string expected_null_value; + SECTION("Null value representation is set to empty string") { + expected_null_value = ""; + } + + SECTION("Null value representation is null string") { + controller_.plan->setProperty(evaluate_json_path_processor_, processors::EvaluateJsonPath::NullValueRepresentation, "the string 'null'"); + expected_null_value = "null"; + } + + std::string json_content = R"({"id": 1234, "user": {"firstName": "John", "lastName": "Doe", "email": null}})"; + auto result = controller_.trigger({{.content = json_content}}); + + REQUIRE(result.at(processors::EvaluateJsonPath::Matched).size() == 1); + REQUIRE(result.at(processors::EvaluateJsonPath::Unmatched).empty()); + REQUIRE(result.at(processors::EvaluateJsonPath::Failure).empty()); + + const auto result_flow_file = result.at(processors::EvaluateJsonPath::Matched).at(0); + + CHECK(controller_.plan->getContent(result_flow_file) == json_content); + auto attribute = result_flow_file->getAttribute("firstName"); + REQUIRE(attribute); + CHECK(attribute.value() == "John"); + attribute = result_flow_file->getAttribute("email"); + REQUIRE(attribute); + CHECK(attribute.value() == expected_null_value); +} + +} // namespace org::apache::nifi::minifi::test
