adamdebreceni commented on a change in pull request #1168:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1168#discussion_r707988352



##########
File path: extensions/standard-processors/processors/RouteText.cpp
##########
@@ -0,0 +1,488 @@
+/**
+ * @file RouteText.cpp
+ * TailFile class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "RouteText.h"
+
+#include <map>
+#include <vector>
+#include <utility>
+#include <algorithm>
+#include <set>
+
+#ifdef __APPLE__
+#include <experimental/functional>
+template<typename It, typename Hash, typename Eq>
+using boyer_moore_searcher = std::experimental::boyer_moore_searcher<It, Hash, 
Eq>;
+#else
+#include <functional>
+template<typename It, typename Hash, typename Eq>
+using boyer_moore_searcher = std::boyer_moore_searcher<It, Hash, Eq>;
+#endif
+
+#include "logging/LoggerConfiguration.h"
+#include "utils/ProcessorConfigUtils.h"
+#include "utils/OptionalUtils.h"
+#include "range/v3/view/transform.hpp"
+#include "range/v3/range/conversion.hpp"
+#include "range/v3/view/tail.hpp"
+#include "range/v3/view/join.hpp"
+#include "range/v3/view/cache1.hpp"
+
+namespace org::apache::nifi::minifi::processors {
+
+const core::Property RouteText::RoutingStrategy(
+    core::PropertyBuilder::createProperty("Routing Strategy")
+    ->withDescription("Specifies how to determine which Relationship(s) to use 
when evaluating the segments "
+                      "of incoming text against the 'Matching Strategy' and 
user-defined properties. "
+                      "'Dynamic Routing' routes to all the matching dynamic 
relationships (or 'unmatched' if none matches). "
+                      "'Route On All' routes to 'matched' iff all dynamic 
relationships match. "
+                      "'Route On Any' routes to 'matched' iff any of the 
dynamic relationships match. ")
+    ->isRequired(true)
+    ->withDefaultValue<std::string>(toString(Routing::DYNAMIC))
+    ->withAllowableValues<std::string>(Routing::values())
+    ->build());
+
+const core::Property RouteText::MatchingStrategy(
+    core::PropertyBuilder::createProperty("Matching Strategy")
+    ->withDescription("Specifies how to evaluate each segment of incoming text 
against the user-defined properties.")
+    ->isRequired(true)
+    ->withAllowableValues<std::string>(Matching::values())
+    ->build());
+
+const core::Property RouteText::TrimWhitespace(
+    core::PropertyBuilder::createProperty("Ignore Leading/Trailing Whitespace")
+    ->withDescription("Indicates whether or not the whitespace at the 
beginning and end should be ignored when evaluating a segment.")
+    ->isRequired(true)
+    ->withDefaultValue<bool>(true)
+    ->build());
+
+const core::Property RouteText::IgnoreCase(
+    core::PropertyBuilder::createProperty("Ignore Case")
+    ->withDescription("If true, capitalization will not be taken into account 
when comparing values. E.g., matching against 'HELLO' or 'hello' will have the 
same result. "
+                      "This property is ignored if the 'Matching Strategy' is 
set to 'Satisfies Expression'.")
+    ->isRequired(true)
+    ->withDefaultValue<bool>(false)
+    ->build());
+
+const core::Property RouteText::GroupingRegex(
+    core::PropertyBuilder::createProperty("Grouping Regular Expression")
+    ->withDescription("Specifies a Regular Expression to evaluate against each 
segment to determine which Group it should be placed in. "
+                      "The Regular Expression must have at least one Capturing 
Group that defines the segment's Group. If multiple Capturing Groups "
+                      "exist in the Regular Expression, the values from all 
Capturing Groups will be concatenated together. Two segments will not be "
+                      "placed into the same FlowFile unless they both have the 
same value for the Group (or neither matches the Regular Expression). "
+                      "For example, to group together all lines in a CSV File 
by the first column, we can set this value to \"(.*?),.*\" (and use \"Per 
Line\" segmentation). "
+                      "Two segments that have the same Group but different 
Relationships will never be placed into the same FlowFile.")
+    ->build());
+
+const core::Property RouteText::GroupingFallbackValue(
+    core::PropertyBuilder::createProperty("Grouping Fallback Value")
+    ->withDescription("If the 'Grouping Regular Expression' is specified and 
the matching fails, this value will be considered the group of the segment.")
+    ->withDefaultValue<std::string>("")
+    ->build());
+
+const core::Property RouteText::SegmentationStrategy(
+    core::PropertyBuilder::createProperty("Segmentation Strategy")
+    ->withDescription("Specifies what portions of the FlowFile content 
constitutes a single segment to be processed.")
+    ->isRequired(true)
+    ->withDefaultValue<std::string>(toString(Segmentation::PER_LINE))
+    ->withAllowableValues<std::string>(Segmentation::values())
+    ->build());
+
+const core::Relationship RouteText::Original("original", "The original input 
file will be routed to this destination");
+
+const core::Relationship RouteText::Unmatched("unmatched", "Segments that do 
not satisfy the required user-defined rules will be routed to this 
Relationship");
+
+const core::Relationship RouteText::Matched("matched", "Segments that satisfy 
the required user-defined rules will be routed to this Relationship");
+
+RouteText::RouteText(const std::string& name, const utils::Identifier& uuid)
+    : core::Processor(name, uuid), 
logger_(logging::LoggerFactory<RouteText>::getLogger()) {}
+
+void RouteText::initialize() {
+  setSupportedProperties({
+     RoutingStrategy,
+     MatchingStrategy,
+     TrimWhitespace,
+     IgnoreCase,
+     GroupingRegex,
+     GroupingFallbackValue,
+     SegmentationStrategy
+  });
+  setSupportedRelationships({Original, Unmatched, Matched});
+}
+
+static std::regex to_regex(const std::string& str) {
+  return std::regex(str);
+}
+
+void RouteText::onSchedule(core::ProcessContext* context, 
core::ProcessSessionFactory* /*sessionFactory*/) {
+  routing_ = utils::parseEnumProperty<Routing>(*context, RoutingStrategy);
+  matching_ = utils::parseEnumProperty<Matching>(*context, MatchingStrategy);
+  context->getProperty(TrimWhitespace.getName(), trim_);
+  case_policy_ = context->getProperty<bool>(IgnoreCase).value_or(false) ? 
CasePolicy::IGNORE_CASE : CasePolicy::CASE_SENSITIVE;
+  group_regex_ = context->getProperty(GroupingRegex) | utils::map(to_regex);
+  segmentation_ = utils::parseEnumProperty<Segmentation>(*context, 
SegmentationStrategy);
+  context->getProperty(GroupingFallbackValue.getName(), group_fallback_);
+}
+
+class RouteText::ReadCallback : public InputStreamCallback {
+  using Fn = std::function<void(Segment)>;
+
+ public:
+  explicit ReadCallback(Segmentation segmentation, Fn&& fn) : 
segmentation_(segmentation), fn_(std::move(fn)) {}
+
+  int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
+    std::vector<uint8_t> buffer;
+    std::string_view content;
+    if (auto opt_content = stream->tryGetBuffer()) {
+      content = std::string_view{reinterpret_cast<const 
char*>(opt_content.value()), stream->size()};
+    } else {
+      // no O(1) content access, read it into our local buffer
+      size_t total_read = 0;
+      size_t remaining = stream->size();
+      buffer.resize(remaining);
+      while (remaining != 0) {
+        size_t ret = stream->read(buffer.data() + total_read, remaining);
+        if (io::isError(ret)) return -1;
+        if (ret == 0) break;
+        remaining -= ret;
+        total_read += ret;
+      }
+      buffer.resize(total_read);
+      content = std::string_view{reinterpret_cast<const char*>(buffer.data()), 
buffer.size()};
+    }
+    switch (segmentation_.value()) {
+      case Segmentation::FULL_TEXT: {
+        fn_({content, 0});
+        return content.length();
+      }
+      case Segmentation::PER_LINE: {
+        // 1-based index as in nifi
+        size_t segment_idx = 1;
+        // do not strip \n\r characters before invocation to be
+        // in-line with the nifi semantics
+        // '\r' is only considered a line terminator if a non-'\n' char follows
+        std::string_view::size_type curr = 0;
+        while (curr < content.length()) {
+          // find beginning of next line
+          std::string_view::size_type next_line = std::string_view::npos;
+          if (auto next_marker = content.find_first_of("\r\n", curr); 
next_marker != std::string_view::npos) {
+            if (content[next_marker] == '\n') {
+              next_line = next_marker + 1;
+            } else if (next_marker + 1 < content.size()) {
+              if (content[next_marker + 1] == '\n') {
+                next_line = next_marker + 2;
+              } else {
+                next_line = next_marker + 1;
+              }
+            }
+          }
+
+          if (next_line == std::string_view::npos) {
+            fn_({content.substr(curr), segment_idx});
+          } else {
+            fn_({content.substr(curr, next_line - curr), segment_idx});
+          }
+          curr = next_line;
+          ++segment_idx;
+        }
+        return content.length();
+      }
+    }
+    throw Exception(PROCESSOR_EXCEPTION, "Unknown segmentation strategy");
+  }
+
+ private:
+  Segmentation segmentation_;
+  Fn fn_;
+};
+
+class RouteText::MatchingContext {
+  struct CaseAwareHash {
+    explicit CaseAwareHash(CasePolicy policy): policy_(policy) {}
+    size_t operator()(char ch) const {
+      if (policy_ == CasePolicy::CASE_SENSITIVE) {
+        return static_cast<size_t>(ch);
+      }
+      return std::hash<int>{}(std::tolower(static_cast<unsigned char>(ch)));
+    }
+
+   private:
+    CasePolicy policy_;
+  };
+
+  struct CaseAwareEq {
+    explicit CaseAwareEq(CasePolicy policy): policy_(policy) {}
+    bool operator()(char a, char b) const {
+      if (policy_ == CasePolicy::CASE_SENSITIVE) {
+        return a == b;
+      }
+      return std::tolower(static_cast<unsigned char>(a)) == 
std::tolower(static_cast<unsigned char>(b));
+    }
+
+   private:
+    CasePolicy policy_;
+  };
+  using Searcher = boyer_moore_searcher<std::string::const_iterator, 
CaseAwareHash, CaseAwareEq>;
+
+ public:
+  MatchingContext(core::ProcessContext& process_context, 
std::shared_ptr<core::FlowFile> flow_file, CasePolicy case_policy)
+    : process_context_(process_context),
+      flow_file_(std::move(flow_file)),
+      case_policy_(case_policy) {}
+
+  const std::regex& getRegexProperty(const core::Property& prop) {
+    auto it = regex_values_.find(prop.getName());
+    if (it != regex_values_.end()) {
+      return it->second;
+    }
+    std::string value;
+    if (!process_context_.getDynamicProperty(prop, value, flow_file_)) {
+      throw Exception(PROCESSOR_EXCEPTION, "Missing dynamic property: '" + 
prop.getName() + "'");
+    }
+    std::regex::flag_type flags = std::regex::ECMAScript;
+    if (case_policy_ == CasePolicy::IGNORE_CASE) {
+      flags |= std::regex::icase;
+    }
+    return (regex_values_[prop.getName()] = std::regex(value, flags));
+  }
+
+  const std::string& getStringProperty(const core::Property& prop) {
+    auto it = string_values_.find(prop.getName());
+    if (it != string_values_.end()) {
+      return it->second;
+    }
+    std::string value;
+    if (!process_context_.getDynamicProperty(prop, value, flow_file_)) {
+      throw Exception(PROCESSOR_EXCEPTION, "Missing dynamic property: '" + 
prop.getName() + "'");
+    }
+    return (string_values_[prop.getName()] = value);
+  }
+
+  const Searcher& getSearcher(const core::Property& prop) {
+    auto it = searcher_values_.find(prop.getName());
+    if (it != searcher_values_.end()) {
+      return it->second.searcher_;
+    }
+    std::string value;
+    if (!process_context_.getDynamicProperty(prop, value, flow_file_)) {
+      throw Exception(PROCESSOR_EXCEPTION, "Missing dynamic property: '" + 
prop.getName() + "'");
+    }
+
+    return searcher_values_.emplace(
+        std::piecewise_construct, std::forward_as_tuple(prop.getName()),
+        std::forward_as_tuple(value, case_policy_)).first->second.searcher_;
+  }
+
+  core::ProcessContext& process_context_;
+  std::shared_ptr<core::FlowFile> flow_file_;
+  CasePolicy case_policy_;
+
+  std::map<std::string, std::string> string_values_;
+  std::map<std::string, std::regex> regex_values_;
+
+  struct OwningSearcher {
+    explicit OwningSearcher(std::string str, CasePolicy case_policy)
+      : str_(std::move(str)), searcher_(str_.cbegin(), str_.cend(), 
CaseAwareHash{case_policy}, CaseAwareEq{case_policy}) {}
+    OwningSearcher(const OwningSearcher&) = delete;
+    OwningSearcher(OwningSearcher&&) = delete;
+    OwningSearcher& operator=(const OwningSearcher&) = delete;
+    OwningSearcher& operator=(OwningSearcher&&) = delete;
+
+    std::string str_;
+    Searcher searcher_;
+  };
+
+  std::map<std::string, OwningSearcher> searcher_values_;
+};
+
+void RouteText::onTrigger(core::ProcessContext *context, core::ProcessSession 
*session) {
+  auto flow_file = session->get();
+  if (!flow_file) {
+    context->yield();
+    return;
+  }
+
+  using GroupName = std::string;
+  std::map<std::pair<core::Relationship, std::optional<GroupName>>, 
std::string> flow_file_contents;
+
+  MatchingContext matching_context(*context, flow_file, case_policy_);
+
+  ReadCallback callback(segmentation_, [&] (Segment segment) {
+    std::string_view original_value = segment.value_;
+    std::string_view preprocessed_value = preprocess(segment.value_);
+
+    if (matching_ != Matching::EXPRESSION) {
+      // an Expression has access to the raw segment like in nifi
+      // all others use the preprocessed_value
+      segment.value_ = preprocessed_value;
+    }
+
+    // group extraction always uses the preprocessed
+    auto group = getGroup(preprocessed_value);
+    switch (routing_.value()) {
+      case Routing::ALL: {
+        for (const auto& prop : dynamic_properties_) {
+          if (!matchSegment(matching_context, segment, prop.second)) {
+            flow_file_contents[{Unmatched, group}] += original_value;
+            return;
+          }
+        }
+        flow_file_contents[{Matched, group}] += original_value;
+        return;
+      }
+      case Routing::ANY: {
+        for (const auto& prop : dynamic_properties_) {
+          if (matchSegment(matching_context, segment, prop.second)) {
+            flow_file_contents[{Matched, group}] += original_value;
+            return;
+          }
+        }
+        flow_file_contents[{Unmatched, group}] += original_value;
+        return;
+      }
+      case Routing::DYNAMIC: {
+        bool routed = false;
+        for (const auto& prop : dynamic_properties_) {
+          if (matchSegment(matching_context, segment, prop.second)) {
+            flow_file_contents[{dynamic_relationships_[prop.first], group}] += 
original_value;
+            routed = true;
+          }
+        }
+        if (!routed) {
+          flow_file_contents[{Unmatched, group}] += original_value;
+        }
+        return;
+      }
+    }
+    throw Exception(PROCESSOR_EXCEPTION, "Unknown routing strategy");
+  });
+  session->read(flow_file, &callback);
+
+  for (const auto& flow_file_content : flow_file_contents) {

Review comment:
       added struct to encapsulate `{Relationship, Group?}` and structured 
bindings




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to