adamdebreceni commented on a change in pull request #1168: URL: https://github.com/apache/nifi-minifi-cpp/pull/1168#discussion_r740790952
########## File path: cmake/BuildTests.cmake ########## @@ -111,6 +111,11 @@ FOREACH(testfile ${UNIT_TESTS}) target_link_libraries(${testfilename} ${CATCH_MAIN_LIB}) MATH(EXPR UNIT_TEST_COUNT "${UNIT_TEST_COUNT}+1") add_test(NAME "${testfilename}" COMMAND "${testfilename}" WORKING_DIRECTORY ${TEST_DIR}) + if (WIN32) + target_compile_options(${testfilename} PRIVATE "/W") + else() + target_compile_options(${testfilename} PRIVATE "-w") + endif() Review comment: I wanted to disable treating warnings as errors, preferably for a single file (`ProcessContextUtilsTests.cpp`) as `SMART_ENUM` generates methods that are not used ########## File path: extensions/standard-processors/processors/RouteText.cpp ########## @@ -0,0 +1,474 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "RouteText.h" + +#include <map> +#include <vector> +#include <utility> +#include <algorithm> +#include <set> + +#ifdef __APPLE__ +#include <experimental/functional> +template<typename It, typename Hash, typename Eq> +using boyer_moore_searcher = std::experimental::boyer_moore_searcher<It, Hash, Eq>; +#else +#include <functional> +template<typename It, typename Hash, typename Eq> +using boyer_moore_searcher = std::boyer_moore_searcher<It, Hash, Eq>; +#endif + +#include "logging/LoggerConfiguration.h" +#include "utils/ProcessorConfigUtils.h" +#include "utils/OptionalUtils.h" +#include "range/v3/view/transform.hpp" +#include "range/v3/range/conversion.hpp" +#include "range/v3/view/tail.hpp" +#include "range/v3/view/join.hpp" +#include "range/v3/view/cache1.hpp" +#include "core/Resource.h" + +namespace org::apache::nifi::minifi::processors { + +const core::Property RouteText::RoutingStrategy( + core::PropertyBuilder::createProperty("Routing Strategy") + ->withDescription("Specifies how to determine which Relationship(s) to use when evaluating the segments " + "of incoming text against the 'Matching Strategy' and user-defined properties. " + "'Dynamic Routing' routes to all the matching dynamic relationships (or 'unmatched' if none matches). " + "'Route On All' routes to 'matched' iff all dynamic relationships match. " + "'Route On Any' routes to 'matched' iff any of the dynamic relationships match. ") + ->isRequired(true) + ->withDefaultValue<std::string>(toString(Routing::DYNAMIC)) + ->withAllowableValues<std::string>(Routing::values()) + ->build()); + +const core::Property RouteText::MatchingStrategy( + core::PropertyBuilder::createProperty("Matching Strategy") + ->withDescription("Specifies how to evaluate each segment of incoming text against the user-defined properties. " + "Possible values are: 'Starts With', 'Ends With', 'Contains', 'Equals', 'Matches Regex', 'Contains Regex', 'Satisfies Expression'.") + ->isRequired(true) + ->withAllowableValues<std::string>(Matching::values()) + ->build()); + +const core::Property RouteText::TrimWhitespace( + core::PropertyBuilder::createProperty("Ignore Leading/Trailing Whitespace") + ->withDescription("Indicates whether or not the whitespace at the beginning and end should be ignored when evaluating a segment.") + ->isRequired(true) + ->withDefaultValue<bool>(true) + ->build()); + +const core::Property RouteText::IgnoreCase( + core::PropertyBuilder::createProperty("Ignore Case") + ->withDescription("If true, capitalization will not be taken into account when comparing values. E.g., matching against 'HELLO' or 'hello' will have the same result. " + "This property is ignored if the 'Matching Strategy' is set to 'Satisfies Expression'.") + ->isRequired(true) + ->withDefaultValue<bool>(false) + ->build()); + +const core::Property RouteText::GroupingRegex( + core::PropertyBuilder::createProperty("Grouping Regular Expression") + ->withDescription("Specifies a Regular Expression to evaluate against each segment to determine which Group it should be placed in. " + "The Regular Expression must have at least one Capturing Group that defines the segment's Group. If multiple Capturing Groups " + "exist in the Regular Expression, the values from all Capturing Groups will be joined together with \", \". Two segments will not be " + "placed into the same FlowFile unless they both have the same value for the Group (or neither matches the Regular Expression). " + "For example, to group together all lines in a CSV File by the first column, we can set this value to \"(.*?),.*\" (and use \"Per Line\" segmentation). " + "Two segments that have the same Group but different Relationships will never be placed into the same FlowFile.") + ->build()); + +const core::Property RouteText::GroupingFallbackValue( + core::PropertyBuilder::createProperty("Grouping Fallback Value") + ->withDescription("If the 'Grouping Regular Expression' is specified and the matching fails, this value will be considered the group of the segment.") + ->withDefaultValue<std::string>("") + ->build()); + +const core::Property RouteText::SegmentationStrategy( + core::PropertyBuilder::createProperty("Segmentation Strategy") + ->withDescription("Specifies what portions of the FlowFile content constitutes a single segment to be processed. " + "'Full Text' considers the whole content as a single segment, 'Per Line' considers each line of the content as a separate segment") + ->isRequired(true) + ->withDefaultValue<std::string>(toString(Segmentation::PER_LINE)) + ->withAllowableValues<std::string>(Segmentation::values()) + ->build()); + +const core::Relationship RouteText::Original("original", "The original input file will be routed to this destination"); + +const core::Relationship RouteText::Unmatched("unmatched", "Segments that do not satisfy the required user-defined rules will be routed to this Relationship"); + +const core::Relationship RouteText::Matched("matched", "Segments that satisfy the required user-defined rules will be routed to this Relationship"); + +RouteText::RouteText(const std::string& name, const utils::Identifier& uuid) + : core::Processor(name, uuid), logger_(core::logging::LoggerFactory<RouteText>::getLogger()) {} + +void RouteText::initialize() { + setSupportedProperties({ + RoutingStrategy, + MatchingStrategy, + TrimWhitespace, + IgnoreCase, + GroupingRegex, + GroupingFallbackValue, + SegmentationStrategy + }); + setSupportedRelationships({Original, Unmatched, Matched}); +} + +void RouteText::onSchedule(core::ProcessContext* context, core::ProcessSessionFactory* /*sessionFactory*/) { Review comment: added ########## File path: libminifi/include/core/ProcessContext.h ########## @@ -126,6 +126,23 @@ class ProcessContext : public controller::ControllerServiceLookup, public core:: virtual bool getDynamicProperty(const Property &property, std::string &value, const std::shared_ptr<FlowFile>& /*flow_file*/) { return getDynamicProperty(property.getName(), value); } + bool getDynamicProperty(const Property &property, std::string &value, const std::shared_ptr<FlowFile>& flow_file, const std::map<std::string, std::string>& variables) { + std::map<std::string, std::optional<std::string>> original_attributes; + for (const auto& var : variables) { + original_attributes[var.first] = flow_file->getAttribute(var.first); + flow_file->setAttribute(var.first, var.second); + } Review comment: done ########## File path: extensions/standard-processors/processors/RouteText.cpp ########## @@ -0,0 +1,474 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "RouteText.h" + +#include <map> +#include <vector> +#include <utility> +#include <algorithm> +#include <set> + +#ifdef __APPLE__ +#include <experimental/functional> +template<typename It, typename Hash, typename Eq> +using boyer_moore_searcher = std::experimental::boyer_moore_searcher<It, Hash, Eq>; +#else +#include <functional> +template<typename It, typename Hash, typename Eq> +using boyer_moore_searcher = std::boyer_moore_searcher<It, Hash, Eq>; +#endif + +#include "logging/LoggerConfiguration.h" +#include "utils/ProcessorConfigUtils.h" +#include "utils/OptionalUtils.h" +#include "range/v3/view/transform.hpp" +#include "range/v3/range/conversion.hpp" +#include "range/v3/view/tail.hpp" +#include "range/v3/view/join.hpp" +#include "range/v3/view/cache1.hpp" +#include "core/Resource.h" + +namespace org::apache::nifi::minifi::processors { + +const core::Property RouteText::RoutingStrategy( + core::PropertyBuilder::createProperty("Routing Strategy") + ->withDescription("Specifies how to determine which Relationship(s) to use when evaluating the segments " + "of incoming text against the 'Matching Strategy' and user-defined properties. " + "'Dynamic Routing' routes to all the matching dynamic relationships (or 'unmatched' if none matches). " + "'Route On All' routes to 'matched' iff all dynamic relationships match. " + "'Route On Any' routes to 'matched' iff any of the dynamic relationships match. ") + ->isRequired(true) + ->withDefaultValue<std::string>(toString(Routing::DYNAMIC)) + ->withAllowableValues<std::string>(Routing::values()) + ->build()); + +const core::Property RouteText::MatchingStrategy( + core::PropertyBuilder::createProperty("Matching Strategy") + ->withDescription("Specifies how to evaluate each segment of incoming text against the user-defined properties. " + "Possible values are: 'Starts With', 'Ends With', 'Contains', 'Equals', 'Matches Regex', 'Contains Regex', 'Satisfies Expression'.") + ->isRequired(true) + ->withAllowableValues<std::string>(Matching::values()) + ->build()); + +const core::Property RouteText::TrimWhitespace( + core::PropertyBuilder::createProperty("Ignore Leading/Trailing Whitespace") + ->withDescription("Indicates whether or not the whitespace at the beginning and end should be ignored when evaluating a segment.") + ->isRequired(true) + ->withDefaultValue<bool>(true) + ->build()); + +const core::Property RouteText::IgnoreCase( + core::PropertyBuilder::createProperty("Ignore Case") + ->withDescription("If true, capitalization will not be taken into account when comparing values. E.g., matching against 'HELLO' or 'hello' will have the same result. " + "This property is ignored if the 'Matching Strategy' is set to 'Satisfies Expression'.") + ->isRequired(true) + ->withDefaultValue<bool>(false) + ->build()); + +const core::Property RouteText::GroupingRegex( + core::PropertyBuilder::createProperty("Grouping Regular Expression") + ->withDescription("Specifies a Regular Expression to evaluate against each segment to determine which Group it should be placed in. " + "The Regular Expression must have at least one Capturing Group that defines the segment's Group. If multiple Capturing Groups " + "exist in the Regular Expression, the values from all Capturing Groups will be joined together with \", \". Two segments will not be " + "placed into the same FlowFile unless they both have the same value for the Group (or neither matches the Regular Expression). " + "For example, to group together all lines in a CSV File by the first column, we can set this value to \"(.*?),.*\" (and use \"Per Line\" segmentation). " + "Two segments that have the same Group but different Relationships will never be placed into the same FlowFile.") + ->build()); + +const core::Property RouteText::GroupingFallbackValue( + core::PropertyBuilder::createProperty("Grouping Fallback Value") + ->withDescription("If the 'Grouping Regular Expression' is specified and the matching fails, this value will be considered the group of the segment.") + ->withDefaultValue<std::string>("") + ->build()); + +const core::Property RouteText::SegmentationStrategy( + core::PropertyBuilder::createProperty("Segmentation Strategy") + ->withDescription("Specifies what portions of the FlowFile content constitutes a single segment to be processed. " + "'Full Text' considers the whole content as a single segment, 'Per Line' considers each line of the content as a separate segment") + ->isRequired(true) + ->withDefaultValue<std::string>(toString(Segmentation::PER_LINE)) + ->withAllowableValues<std::string>(Segmentation::values()) + ->build()); + +const core::Relationship RouteText::Original("original", "The original input file will be routed to this destination"); + +const core::Relationship RouteText::Unmatched("unmatched", "Segments that do not satisfy the required user-defined rules will be routed to this Relationship"); + +const core::Relationship RouteText::Matched("matched", "Segments that satisfy the required user-defined rules will be routed to this Relationship"); + +RouteText::RouteText(const std::string& name, const utils::Identifier& uuid) + : core::Processor(name, uuid), logger_(core::logging::LoggerFactory<RouteText>::getLogger()) {} + +void RouteText::initialize() { + setSupportedProperties({ + RoutingStrategy, + MatchingStrategy, + TrimWhitespace, + IgnoreCase, + GroupingRegex, + GroupingFallbackValue, + SegmentationStrategy + }); + setSupportedRelationships({Original, Unmatched, Matched}); +} + +void RouteText::onSchedule(core::ProcessContext* context, core::ProcessSessionFactory* /*sessionFactory*/) { + routing_ = utils::parseEnumProperty<Routing>(*context, RoutingStrategy); + matching_ = utils::parseEnumProperty<Matching>(*context, MatchingStrategy); + context->getProperty(TrimWhitespace.getName(), trim_); + case_policy_ = context->getProperty<bool>(IgnoreCase).value_or(false) ? CasePolicy::IGNORE_CASE : CasePolicy::CASE_SENSITIVE; + group_regex_ = context->getProperty(GroupingRegex) | utils::map([] (const auto& str) {return std::regex(str);}); + segmentation_ = utils::parseEnumProperty<Segmentation>(*context, SegmentationStrategy); + context->getProperty(GroupingFallbackValue.getName(), group_fallback_); +} + +class RouteText::ReadCallback : public InputStreamCallback { + using Fn = std::function<void(Segment)>; + + public: + ReadCallback(Segmentation segmentation, size_t file_size, Fn&& fn) + : segmentation_(segmentation), file_size_(file_size), fn_(std::move(fn)) {} + + int64_t process(const std::shared_ptr<io::BaseStream>& stream) override { + std::vector<uint8_t> buffer; + size_t ret = stream->read(buffer, file_size_); + if (io::isError(ret)) { + return -1; + } + if (ret != file_size_) { + throw Exception(PROCESS_SESSION_EXCEPTION, "Couldn't read whole flowfile content"); + } + std::string_view content{reinterpret_cast<const char*>(buffer.data()), buffer.size()}; + switch (segmentation_.value()) { + case Segmentation::FULL_TEXT: { + fn_({content, 0}); + return content.length(); + } + case Segmentation::PER_LINE: { + // 1-based index as in nifi + size_t segment_idx = 1; + std::string_view::size_type curr = 0; + while (curr < content.length()) { + // find beginning of next line + std::string_view::size_type next_line = content.find('\n', curr); + + if (next_line == std::string_view::npos) { + fn_({content.substr(curr), segment_idx}); + } else { + // include newline character to be in-line with nifi semantics + ++next_line; + fn_({content.substr(curr, next_line - curr), segment_idx}); + } + curr = next_line; + ++segment_idx; + } + return content.length(); + } + } + throw Exception(PROCESSOR_EXCEPTION, "Unknown segmentation strategy"); + } + + private: + Segmentation segmentation_; + size_t file_size_; + Fn fn_; +}; + +class RouteText::MatchingContext { + struct CaseAwareHash { + explicit CaseAwareHash(CasePolicy policy): policy_(policy) {} + size_t operator()(char ch) const { + if (policy_ == CasePolicy::CASE_SENSITIVE) { + return static_cast<size_t>(ch); + } + return std::hash<int>{}(std::tolower(static_cast<unsigned char>(ch))); + } + + private: + CasePolicy policy_; + }; + + struct CaseAwareEq { + explicit CaseAwareEq(CasePolicy policy): policy_(policy) {} + bool operator()(char a, char b) const { + if (policy_ == CasePolicy::CASE_SENSITIVE) { + return a == b; + } + return std::tolower(static_cast<unsigned char>(a)) == std::tolower(static_cast<unsigned char>(b)); + } + + private: + CasePolicy policy_; + }; + using Searcher = boyer_moore_searcher<std::string::const_iterator, CaseAwareHash, CaseAwareEq>; + + public: + MatchingContext(core::ProcessContext& process_context, std::shared_ptr<core::FlowFile> flow_file, CasePolicy case_policy) + : process_context_(process_context), + flow_file_(std::move(flow_file)), + case_policy_(case_policy) {} + + const std::regex& getRegexProperty(const core::Property& prop) { + auto it = regex_values_.find(prop.getName()); + if (it != regex_values_.end()) { + return it->second; + } + std::string value; + if (!process_context_.getDynamicProperty(prop, value, flow_file_)) { + throw Exception(PROCESSOR_EXCEPTION, "Missing dynamic property: '" + prop.getName() + "'"); + } + std::regex::flag_type flags = std::regex::ECMAScript; + if (case_policy_ == CasePolicy::IGNORE_CASE) { + flags |= std::regex::icase; + } + return (regex_values_[prop.getName()] = std::regex(value, flags)); + } + + const std::string& getStringProperty(const core::Property& prop) { + auto it = string_values_.find(prop.getName()); + if (it != string_values_.end()) { + return it->second; + } + std::string value; + if (!process_context_.getDynamicProperty(prop, value, flow_file_)) { + throw Exception(PROCESSOR_EXCEPTION, "Missing dynamic property: '" + prop.getName() + "'"); + } + return (string_values_[prop.getName()] = value); + } + + const Searcher& getSearcher(const core::Property& prop) { + auto it = searcher_values_.find(prop.getName()); + if (it != searcher_values_.end()) { + return it->second.searcher_; + } + std::string value; + if (!process_context_.getDynamicProperty(prop, value, flow_file_)) { + throw Exception(PROCESSOR_EXCEPTION, "Missing dynamic property: '" + prop.getName() + "'"); + } + + return searcher_values_.emplace( + std::piecewise_construct, std::forward_as_tuple(prop.getName()), + std::forward_as_tuple(value, case_policy_)).first->second.searcher_; + } + + core::ProcessContext& process_context_; + std::shared_ptr<core::FlowFile> flow_file_; + CasePolicy case_policy_; + + std::map<std::string, std::string> string_values_; + std::map<std::string, std::regex> regex_values_; + + struct OwningSearcher { + OwningSearcher(std::string str, CasePolicy case_policy) + : str_(std::move(str)), searcher_(str_.cbegin(), str_.cend(), CaseAwareHash{case_policy}, CaseAwareEq{case_policy}) {} + OwningSearcher(const OwningSearcher&) = delete; + OwningSearcher(OwningSearcher&&) = delete; + OwningSearcher& operator=(const OwningSearcher&) = delete; + OwningSearcher& operator=(OwningSearcher&&) = delete; + + std::string str_; + Searcher searcher_; + }; + + std::map<std::string, OwningSearcher> searcher_values_; +}; + +namespace { +struct Route { + core::Relationship relationship_; + std::optional<std::string> group_name_; + + bool operator<(const Route& other) const { + return std::tie(relationship_, group_name_) < std::tie(other.relationship_, other.group_name_); + } +}; +} // namespace + +void RouteText::onTrigger(core::ProcessContext *context, core::ProcessSession *session) { Review comment: added ########## File path: extensions/standard-processors/processors/RouteText.cpp ########## @@ -0,0 +1,474 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "RouteText.h" + +#include <map> +#include <vector> +#include <utility> +#include <algorithm> +#include <set> + +#ifdef __APPLE__ +#include <experimental/functional> +template<typename It, typename Hash, typename Eq> +using boyer_moore_searcher = std::experimental::boyer_moore_searcher<It, Hash, Eq>; +#else +#include <functional> +template<typename It, typename Hash, typename Eq> +using boyer_moore_searcher = std::boyer_moore_searcher<It, Hash, Eq>; +#endif + +#include "logging/LoggerConfiguration.h" +#include "utils/ProcessorConfigUtils.h" +#include "utils/OptionalUtils.h" +#include "range/v3/view/transform.hpp" +#include "range/v3/range/conversion.hpp" +#include "range/v3/view/tail.hpp" +#include "range/v3/view/join.hpp" +#include "range/v3/view/cache1.hpp" +#include "core/Resource.h" + +namespace org::apache::nifi::minifi::processors { + +const core::Property RouteText::RoutingStrategy( + core::PropertyBuilder::createProperty("Routing Strategy") + ->withDescription("Specifies how to determine which Relationship(s) to use when evaluating the segments " + "of incoming text against the 'Matching Strategy' and user-defined properties. " + "'Dynamic Routing' routes to all the matching dynamic relationships (or 'unmatched' if none matches). " + "'Route On All' routes to 'matched' iff all dynamic relationships match. " + "'Route On Any' routes to 'matched' iff any of the dynamic relationships match. ") + ->isRequired(true) + ->withDefaultValue<std::string>(toString(Routing::DYNAMIC)) + ->withAllowableValues<std::string>(Routing::values()) + ->build()); + +const core::Property RouteText::MatchingStrategy( + core::PropertyBuilder::createProperty("Matching Strategy") + ->withDescription("Specifies how to evaluate each segment of incoming text against the user-defined properties. " + "Possible values are: 'Starts With', 'Ends With', 'Contains', 'Equals', 'Matches Regex', 'Contains Regex', 'Satisfies Expression'.") + ->isRequired(true) + ->withAllowableValues<std::string>(Matching::values()) + ->build()); + +const core::Property RouteText::TrimWhitespace( + core::PropertyBuilder::createProperty("Ignore Leading/Trailing Whitespace") + ->withDescription("Indicates whether or not the whitespace at the beginning and end should be ignored when evaluating a segment.") + ->isRequired(true) + ->withDefaultValue<bool>(true) + ->build()); + +const core::Property RouteText::IgnoreCase( + core::PropertyBuilder::createProperty("Ignore Case") + ->withDescription("If true, capitalization will not be taken into account when comparing values. E.g., matching against 'HELLO' or 'hello' will have the same result. " + "This property is ignored if the 'Matching Strategy' is set to 'Satisfies Expression'.") + ->isRequired(true) + ->withDefaultValue<bool>(false) + ->build()); + +const core::Property RouteText::GroupingRegex( + core::PropertyBuilder::createProperty("Grouping Regular Expression") + ->withDescription("Specifies a Regular Expression to evaluate against each segment to determine which Group it should be placed in. " + "The Regular Expression must have at least one Capturing Group that defines the segment's Group. If multiple Capturing Groups " + "exist in the Regular Expression, the values from all Capturing Groups will be joined together with \", \". Two segments will not be " + "placed into the same FlowFile unless they both have the same value for the Group (or neither matches the Regular Expression). " + "For example, to group together all lines in a CSV File by the first column, we can set this value to \"(.*?),.*\" (and use \"Per Line\" segmentation). " + "Two segments that have the same Group but different Relationships will never be placed into the same FlowFile.") + ->build()); + +const core::Property RouteText::GroupingFallbackValue( + core::PropertyBuilder::createProperty("Grouping Fallback Value") + ->withDescription("If the 'Grouping Regular Expression' is specified and the matching fails, this value will be considered the group of the segment.") + ->withDefaultValue<std::string>("") + ->build()); + +const core::Property RouteText::SegmentationStrategy( + core::PropertyBuilder::createProperty("Segmentation Strategy") + ->withDescription("Specifies what portions of the FlowFile content constitutes a single segment to be processed. " + "'Full Text' considers the whole content as a single segment, 'Per Line' considers each line of the content as a separate segment") + ->isRequired(true) + ->withDefaultValue<std::string>(toString(Segmentation::PER_LINE)) + ->withAllowableValues<std::string>(Segmentation::values()) + ->build()); + +const core::Relationship RouteText::Original("original", "The original input file will be routed to this destination"); + +const core::Relationship RouteText::Unmatched("unmatched", "Segments that do not satisfy the required user-defined rules will be routed to this Relationship"); + +const core::Relationship RouteText::Matched("matched", "Segments that satisfy the required user-defined rules will be routed to this Relationship"); + +RouteText::RouteText(const std::string& name, const utils::Identifier& uuid) + : core::Processor(name, uuid), logger_(core::logging::LoggerFactory<RouteText>::getLogger()) {} + +void RouteText::initialize() { + setSupportedProperties({ + RoutingStrategy, + MatchingStrategy, + TrimWhitespace, + IgnoreCase, + GroupingRegex, + GroupingFallbackValue, + SegmentationStrategy + }); + setSupportedRelationships({Original, Unmatched, Matched}); +} + +void RouteText::onSchedule(core::ProcessContext* context, core::ProcessSessionFactory* /*sessionFactory*/) { + routing_ = utils::parseEnumProperty<Routing>(*context, RoutingStrategy); + matching_ = utils::parseEnumProperty<Matching>(*context, MatchingStrategy); + context->getProperty(TrimWhitespace.getName(), trim_); + case_policy_ = context->getProperty<bool>(IgnoreCase).value_or(false) ? CasePolicy::IGNORE_CASE : CasePolicy::CASE_SENSITIVE; + group_regex_ = context->getProperty(GroupingRegex) | utils::map([] (const auto& str) {return std::regex(str);}); + segmentation_ = utils::parseEnumProperty<Segmentation>(*context, SegmentationStrategy); + context->getProperty(GroupingFallbackValue.getName(), group_fallback_); +} + +class RouteText::ReadCallback : public InputStreamCallback { + using Fn = std::function<void(Segment)>; + + public: + ReadCallback(Segmentation segmentation, size_t file_size, Fn&& fn) + : segmentation_(segmentation), file_size_(file_size), fn_(std::move(fn)) {} + + int64_t process(const std::shared_ptr<io::BaseStream>& stream) override { + std::vector<uint8_t> buffer; + size_t ret = stream->read(buffer, file_size_); + if (io::isError(ret)) { + return -1; + } + if (ret != file_size_) { + throw Exception(PROCESS_SESSION_EXCEPTION, "Couldn't read whole flowfile content"); + } + std::string_view content{reinterpret_cast<const char*>(buffer.data()), buffer.size()}; + switch (segmentation_.value()) { + case Segmentation::FULL_TEXT: { + fn_({content, 0}); + return content.length(); + } + case Segmentation::PER_LINE: { + // 1-based index as in nifi + size_t segment_idx = 1; + std::string_view::size_type curr = 0; + while (curr < content.length()) { + // find beginning of next line + std::string_view::size_type next_line = content.find('\n', curr); + + if (next_line == std::string_view::npos) { + fn_({content.substr(curr), segment_idx}); + } else { + // include newline character to be in-line with nifi semantics + ++next_line; + fn_({content.substr(curr, next_line - curr), segment_idx}); + } + curr = next_line; + ++segment_idx; + } + return content.length(); + } + } + throw Exception(PROCESSOR_EXCEPTION, "Unknown segmentation strategy"); + } + + private: + Segmentation segmentation_; + size_t file_size_; + Fn fn_; +}; + +class RouteText::MatchingContext { + struct CaseAwareHash { + explicit CaseAwareHash(CasePolicy policy): policy_(policy) {} + size_t operator()(char ch) const { + if (policy_ == CasePolicy::CASE_SENSITIVE) { + return static_cast<size_t>(ch); + } + return std::hash<int>{}(std::tolower(static_cast<unsigned char>(ch))); + } + + private: + CasePolicy policy_; + }; + + struct CaseAwareEq { + explicit CaseAwareEq(CasePolicy policy): policy_(policy) {} + bool operator()(char a, char b) const { + if (policy_ == CasePolicy::CASE_SENSITIVE) { + return a == b; + } + return std::tolower(static_cast<unsigned char>(a)) == std::tolower(static_cast<unsigned char>(b)); + } + + private: + CasePolicy policy_; + }; + using Searcher = boyer_moore_searcher<std::string::const_iterator, CaseAwareHash, CaseAwareEq>; + + public: + MatchingContext(core::ProcessContext& process_context, std::shared_ptr<core::FlowFile> flow_file, CasePolicy case_policy) + : process_context_(process_context), + flow_file_(std::move(flow_file)), + case_policy_(case_policy) {} + + const std::regex& getRegexProperty(const core::Property& prop) { + auto it = regex_values_.find(prop.getName()); + if (it != regex_values_.end()) { + return it->second; + } + std::string value; + if (!process_context_.getDynamicProperty(prop, value, flow_file_)) { + throw Exception(PROCESSOR_EXCEPTION, "Missing dynamic property: '" + prop.getName() + "'"); + } + std::regex::flag_type flags = std::regex::ECMAScript; + if (case_policy_ == CasePolicy::IGNORE_CASE) { + flags |= std::regex::icase; + } + return (regex_values_[prop.getName()] = std::regex(value, flags)); + } + + const std::string& getStringProperty(const core::Property& prop) { + auto it = string_values_.find(prop.getName()); + if (it != string_values_.end()) { + return it->second; + } + std::string value; + if (!process_context_.getDynamicProperty(prop, value, flow_file_)) { + throw Exception(PROCESSOR_EXCEPTION, "Missing dynamic property: '" + prop.getName() + "'"); + } + return (string_values_[prop.getName()] = value); + } + + const Searcher& getSearcher(const core::Property& prop) { + auto it = searcher_values_.find(prop.getName()); + if (it != searcher_values_.end()) { + return it->second.searcher_; + } + std::string value; + if (!process_context_.getDynamicProperty(prop, value, flow_file_)) { + throw Exception(PROCESSOR_EXCEPTION, "Missing dynamic property: '" + prop.getName() + "'"); + } + + return searcher_values_.emplace( + std::piecewise_construct, std::forward_as_tuple(prop.getName()), + std::forward_as_tuple(value, case_policy_)).first->second.searcher_; + } + + core::ProcessContext& process_context_; + std::shared_ptr<core::FlowFile> flow_file_; + CasePolicy case_policy_; + + std::map<std::string, std::string> string_values_; + std::map<std::string, std::regex> regex_values_; + + struct OwningSearcher { + OwningSearcher(std::string str, CasePolicy case_policy) + : str_(std::move(str)), searcher_(str_.cbegin(), str_.cend(), CaseAwareHash{case_policy}, CaseAwareEq{case_policy}) {} + OwningSearcher(const OwningSearcher&) = delete; + OwningSearcher(OwningSearcher&&) = delete; + OwningSearcher& operator=(const OwningSearcher&) = delete; + OwningSearcher& operator=(OwningSearcher&&) = delete; + + std::string str_; + Searcher searcher_; + }; + + std::map<std::string, OwningSearcher> searcher_values_; +}; + +namespace { +struct Route { + core::Relationship relationship_; + std::optional<std::string> group_name_; + + bool operator<(const Route& other) const { + return std::tie(relationship_, group_name_) < std::tie(other.relationship_, other.group_name_); + } Review comment: reverted, what could have been the problem? ########## File path: cmake/BuildTests.cmake ########## @@ -111,6 +111,11 @@ FOREACH(testfile ${UNIT_TESTS}) target_link_libraries(${testfilename} ${CATCH_MAIN_LIB}) MATH(EXPR UNIT_TEST_COUNT "${UNIT_TEST_COUNT}+1") add_test(NAME "${testfilename}" COMMAND "${testfilename}" WORKING_DIRECTORY ${TEST_DIR}) + if (WIN32) + target_compile_options(${testfilename} PRIVATE "/W") + else() + target_compile_options(${testfilename} PRIVATE "-w") + endif() Review comment: using the pragma gcc seems to solve it ########## File path: cmake/BuildTests.cmake ########## @@ -111,6 +111,11 @@ FOREACH(testfile ${UNIT_TESTS}) target_link_libraries(${testfilename} ${CATCH_MAIN_LIB}) MATH(EXPR UNIT_TEST_COUNT "${UNIT_TEST_COUNT}+1") add_test(NAME "${testfilename}" COMMAND "${testfilename}" WORKING_DIRECTORY ${TEST_DIR}) + if (WIN32) + target_compile_options(${testfilename} PRIVATE "/W") + else() + target_compile_options(${testfilename} PRIVATE "-w") + endif() Review comment: added push/pop -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org