This is an automated email from the ASF dual-hosted git repository.

phrocker pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/master by this push:
     new 8022070  MINIFICPP-726 - Enhance ExtractText to have more feature 
parity with the Java impl
8022070 is described below

commit 8022070f4d030559663ba3cfca12dcfa3a909a4e
Author: Arpad Boda <ab...@hortonworks.com>
AuthorDate: Mon Apr 8 12:04:48 2019 +0200

    MINIFICPP-726 - Enhance ExtractText to have more feature parity with the 
Java impl
    
    MINIFICPP-726 - Add error handling for invalid regular expressions
    
    MINIFICPP-726 - Add gcc4.8 compatible solution
    
    This closes #533.
    
    Signed-off-by: Marc Parisi <phroc...@apache.org>
---
 libminifi/include/processors/ExtractText.h |  18 ++-
 libminifi/src/processors/ExtractText.cpp   | 172 ++++++++++++++++++++++++++++-
 libminifi/test/unit/ExtractTextTests.cpp   |  63 +++++++++++
 3 files changed, 244 insertions(+), 9 deletions(-)

diff --git a/libminifi/include/processors/ExtractText.h 
b/libminifi/include/processors/ExtractText.h
index 4caf349..5e6ff4c 100644
--- a/libminifi/include/processors/ExtractText.h
+++ b/libminifi/include/processors/ExtractText.h
@@ -50,19 +50,30 @@ public:
     //! Supported Properties
     static core::Property Attribute;
     static core::Property SizeLimit;
+
+    static core::Property RegexMode;
+    static core::Property IgnoreCaptureGroupZero;
+    static core::Property InsensitiveMatch;
+    static core::Property MaxCaptureGroupLen;
+    static core::Property EnableRepeatingCaptureGroup;
+
     //! Supported Relationships
     static core::Relationship Success;
     //! Default maximum bytes to read into an attribute
     static constexpr int DEFAULT_SIZE_LIMIT = 2 * 1024 * 1024;
 
     //! OnTrigger method, implemented by NiFi ExtractText
-    void onTrigger(core::ProcessContext *context, core::ProcessSession 
*session) override;
+    void onTrigger(core::ProcessContext *context, core::ProcessSession 
*session);
     //! Initialize, over write by NiFi ExtractText
-    void initialize(void) override;
+    void initialize(void);
+
+    virtual bool supportsDynamicProperties() {
+      return true;
+    };
 
     class ReadCallback : public InputStreamCallback {
     public:
-        ReadCallback(std::shared_ptr<core::FlowFile> flowFile, 
core::ProcessContext *ct);
+        ReadCallback(std::shared_ptr<core::FlowFile> flowFile, 
core::ProcessContext *ct, std::shared_ptr<logging::Logger> lgr);
         ~ReadCallback() {}
         int64_t process(std::shared_ptr<io::BaseStream> stream);
 
@@ -70,6 +81,7 @@ public:
         std::shared_ptr<core::FlowFile> flowFile_;
         core::ProcessContext *ctx_;
         std::vector<uint8_t> buffer_;
+        std::shared_ptr<logging::Logger> logger_;
     };
 
 protected:
diff --git a/libminifi/src/processors/ExtractText.cpp 
b/libminifi/src/processors/ExtractText.cpp
index fcd9eea..d1a5c0d 100644
--- a/libminifi/src/processors/ExtractText.cpp
+++ b/libminifi/src/processors/ExtractText.cpp
@@ -21,6 +21,7 @@
 #include <string>
 #include <memory>
 #include <set>
+#include <regex>
 
 #include <iostream>
 #include <sstream>
@@ -30,6 +31,12 @@
 #include "core/ProcessSession.h"
 #include "core/FlowFile.h"
 
+#if !defined(_WIN32)
+#if __cplusplus <= 201103L
+#include <regex.h>
+#endif
+#endif
+
 namespace org {
 namespace apache {
 namespace nifi {
@@ -37,13 +44,44 @@ namespace minifi {
 namespace processors {
 
 #define MAX_BUFFER_SIZE 4096
+#define MAX_CAPTURE_GROUP_SIZE 1024
 
 core::Property 
ExtractText::Attribute(core::PropertyBuilder::createProperty("Attribute")->withDescription("Attribute
 to set from content")->build());
 
 // despite there being a size value, ExtractText was initially built with a 
numeric for this property
 core::Property ExtractText::SizeLimit(
-    core::PropertyBuilder::createProperty("Size 
Limit")->withDescription("Maximum number of bytes to read into the attribute. 0 
for no limit. Default is 2MB.")->withDefaultValue<uint32_t>(
-        DEFAULT_SIZE_LIMIT)->build());
+    core::PropertyBuilder::createProperty("Size Limit")
+    ->withDescription("Maximum number of bytes to read into the attribute. 0 
for no limit. Default is 2MB.")
+    ->withDefaultValue<uint32_t>(DEFAULT_SIZE_LIMIT)->build());
+
+core::Property ExtractText::RegexMode(
+    core::PropertyBuilder::createProperty("Regex Mode")
+    ->withDescription("Set this to extract parts of flowfile content using 
regular experssions in dynamic properties")
+    ->withDefaultValue<bool>(false)->build());
+
+core::Property ExtractText::IgnoreCaptureGroupZero(
+    core::PropertyBuilder::createProperty("Include Capture Group 0")
+    ->withDescription("Indicates that Capture Group 0 should be included as an 
attribute. "
+                      "Capture Group 0 represents the entirety of the regular 
expression match, is typically not used, and could have considerable length.")
+    ->withDefaultValue<bool>(true)->build());
+
+core::Property ExtractText::InsensitiveMatch(
+    core::PropertyBuilder::createProperty("Enable Case-insensitive Matching")
+    ->withDescription("Indicates that two characters match even if they are in 
a different case. ")
+    ->withDefaultValue<bool>(false)->build());
+
+core::Property ExtractText::MaxCaptureGroupLen(
+    core::PropertyBuilder::createProperty("Maximum Capture Group Length")
+    ->withDescription("Specifies the maximum number of characters a given 
capture group value can have. "
+                      "Any characters beyond the max will be truncated.")
+    ->withDefaultValue<int>(MAX_CAPTURE_GROUP_SIZE)->build());
+
+
+core::Property ExtractText::EnableRepeatingCaptureGroup(
+    core::PropertyBuilder::createProperty("Enable repeating capture group")
+    ->withDescription("f set to true, every string matching the capture groups 
will be extracted. "
+                      "Otherwise, if the Regular Expression matches more than 
once, only the first match will be extracted.")
+    ->withDefaultValue<bool>(false)->build());
 
 core::Relationship ExtractText::Success("success", "success operational on the 
flow record");
 
@@ -52,6 +90,11 @@ void ExtractText::initialize() {
   std::set<core::Property> properties;
   properties.insert(Attribute);
   properties.insert(SizeLimit);
+  properties.insert(RegexMode);
+  properties.insert(IgnoreCaptureGroupZero);
+  properties.insert(MaxCaptureGroupLen);
+  properties.insert(EnableRepeatingCaptureGroup);
+  properties.insert(InsensitiveMatch);
   setSupportedProperties(properties);
   //! Set the supported relationships
   std::set<core::Relationship> relationships;
@@ -66,7 +109,7 @@ void ExtractText::onTrigger(core::ProcessContext *context, 
core::ProcessSession
     return;
   }
 
-  ReadCallback cb(flowFile, context);
+  ReadCallback cb(flowFile, context, logger_);
   session->read(flowFile, &cb);
   session->transfer(flowFile, Success);
 }
@@ -74,11 +117,13 @@ void ExtractText::onTrigger(core::ProcessContext *context, 
core::ProcessSession
 int64_t ExtractText::ReadCallback::process(std::shared_ptr<io::BaseStream> 
stream) {
   int64_t ret = 0;
   uint64_t read_size = 0;
+  bool regex_mode;
   uint64_t size_limit = flowFile_->getSize();
 
   std::string attrKey, sizeLimitStr;
   ctx_->getProperty(Attribute.getName(), attrKey);
   ctx_->getProperty(SizeLimit.getName(), sizeLimitStr);
+  ctx_->getProperty(RegexMode.getName(), regex_mode);
 
   if (sizeLimitStr == "")
     size_limit = DEFAULT_SIZE_LIMIT;
@@ -104,13 +149,128 @@ int64_t 
ExtractText::ReadCallback::process(std::shared_ptr<io::BaseStream> strea
     }
   }
 
-  flowFile_->setAttribute(attrKey, contentStream.str());
+  if (regex_mode) {
+    std::regex_constants::syntax_option_type regex_mode = 
std::regex_constants::ECMAScript;
+
+    bool insensitive;
+    if (ctx_->getProperty(InsensitiveMatch.getName(), insensitive) && 
insensitive) {
+      regex_mode |= std::regex_constants::icase;
+    }
+
+    bool ignoregroupzero;
+    ctx_->getProperty(IgnoreCaptureGroupZero.getName(), ignoregroupzero);
+
+    bool repeatingcapture;
+    ctx_->getProperty(EnableRepeatingCaptureGroup.getName(), repeatingcapture);
+
+    int maxCaptureSize;
+    ctx_->getProperty(MaxCaptureGroupLen.getName(), maxCaptureSize);
+
+    std::string contentStr = contentStream.str();
+
+    std::map<std::string, std::string> regexAttributes;
+
+    for (const auto& k : ctx_->getDynamicPropertyKeys()) {
+      std::string value;
+      ctx_->getDynamicProperty(k, value);
+
+      std::string workStr = contentStr;
+
+      int matchcount = 0;
+
+#if (__cplusplus > 201103L) || defined(_WIN32)
+
+      std::regex rgx;
+
+      try {
+        rgx = std::regex(value, regex_mode);
+      } catch(const std::regex_error& e) {
+        logger_->log_error("%s error encountered when trying to construct 
regular expression from property (key: %s) value: %s",
+            e.what(), k, value);
+        continue;
+      }
+
+      std::smatch matches;
+
+      while (std::regex_search(workStr, matches, rgx)) {
+        size_t i = ignoregroupzero ? 1 : 0;
+
+        for (; i < matches.size(); ++i, ++matchcount) {
+          std::string attributeValue = matches[i].str();
+          if (attributeValue.length() > maxCaptureSize) {
+            attributeValue = attributeValue.substr(0, maxCaptureSize);
+          }
+          if (matchcount == 0) {
+            regexAttributes[k] = attributeValue;
+          }
+          regexAttributes[k + '.' + std::to_string(matchcount)] = 
attributeValue;
+        }
+        if (!repeatingcapture) {
+          break;
+        }
+        workStr = matches.suffix();
+      }
+#else
+
+      size_t maxGroups = std::count(value.begin(), value.end(), '(') + 1;
+
+      regex_t regexCompiled;
+      std::vector<regmatch_t> groups;
+      groups.reserve(maxGroups);
+
+      if (regcomp(&regexCompiled, value.c_str(), REG_EXTENDED | (insensitive ? 
REG_ICASE : 0))) {
+        logger_->log_error("error encountered when trying to construct regular 
expression from property (key: %s) value: %s",
+                            k, value);
+        continue;
+      }
+
+      while (regexec(&regexCompiled, workStr.c_str(), groups.capacity(), 
groups.data(), 0) == 0) {
+        size_t g = 0;
+        size_t match_len = 0;
+        for (g = 0; g < maxGroups; g++) {
+          if (groups[g].rm_so == -1) {
+            break;  // No more groups
+          }
+
+          if (g == 0) {
+            match_len = groups[g].rm_eo;
+            if (ignoregroupzero) {
+              continue;
+            }
+          }
+
+          std::string attributeValue(workStr.begin() + groups[g].rm_so, 
workStr.begin() + groups[g].rm_eo);
+          if (attributeValue.length() > maxCaptureSize) {
+            attributeValue = attributeValue.substr(0, maxCaptureSize);
+          }
+
+          if (matchcount == 0) {
+            regexAttributes[k] = attributeValue;
+          }
+          regexAttributes[k + '.' + std::to_string(matchcount)] = 
attributeValue;
+          matchcount++;
+        }
+        if (!repeatingcapture || (match_len >= workStr.length())) {
+          break;
+        }
+        workStr = workStr.substr(match_len + 1);
+      }
+#endif
+    }
+
+    for (const auto& kv : regexAttributes) {
+      flowFile_->setAttribute(kv.first, kv.second);
+    }
+  } else {
+    flowFile_->setAttribute(attrKey, contentStream.str());
+  }
   return read_size;
 }
 
-ExtractText::ReadCallback::ReadCallback(std::shared_ptr<core::FlowFile> 
flowFile, core::ProcessContext *ctx)
+ExtractText::ReadCallback::ReadCallback(std::shared_ptr<core::FlowFile> 
flowFile, core::ProcessContext *ctx,  std::shared_ptr<logging::Logger> lgr)
     : flowFile_(flowFile),
-      ctx_(ctx) {
+      ctx_(ctx),
+      logger_(lgr) {
   buffer_.reserve(std::min<uint64_t>(flowFile->getSize(), MAX_BUFFER_SIZE));
 }
 
diff --git a/libminifi/test/unit/ExtractTextTests.cpp 
b/libminifi/test/unit/ExtractTextTests.cpp
index f07a8bb..fd5604e 100644
--- a/libminifi/test/unit/ExtractTextTests.cpp
+++ b/libminifi/test/unit/ExtractTextTests.cpp
@@ -16,6 +16,7 @@
  * limitations under the License.
  */
 #include <uuid/uuid.h>
+#include <list>
 #include <fstream>
 #include <map>
 #include <memory>
@@ -38,6 +39,7 @@
 #include "processors/LogAttribute.h"
 
 const char* TEST_TEXT = "Test text\n";
+const char* REGEX_TEST_TEXT = "Speed limit 130 | Speed limit 80";
 const char* TEST_FILE = "test_file.txt";
 const char* TEST_ATTR = "ExtractedText";
 
@@ -122,3 +124,64 @@ TEST_CASE("Test usage of ExtractText", 
"[extracttextTest]") {
 
     LogTestController::getInstance().reset();
 }
+
+TEST_CASE("Test usage of ExtractText in regex mode", "[extracttextRegexTest]") 
{
+    TestController testController;
+    
LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::ExtractText>();
+    
LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::GetFile>();
+    
LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::LogAttribute>();
+
+    std::shared_ptr<TestPlan> plan = testController.createPlan();
+    std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
+
+    char dir[] = "/tmp/gt.XXXXXX";
+
+    REQUIRE(testController.createTempDirectory(dir) != nullptr);
+    std::shared_ptr<core::Processor> getfile = plan->addProcessor("GetFile", 
"getfileCreate2");
+    plan->setProperty(getfile, 
org::apache::nifi::minifi::processors::GetFile::Directory.getName(), dir);
+    plan->setProperty(getfile, 
org::apache::nifi::minifi::processors::GetFile::KeepSourceFile.getName(), 
"true");
+
+    std::shared_ptr<core::Processor> maprocessor = 
plan->addProcessor("ExtractText", "testExtractText",
+                                                                      
core::Relationship("success", "description"),
+                                                                      true);
+    plan->setProperty(maprocessor, 
org::apache::nifi::minifi::processors::ExtractText::RegexMode.getName(), 
"true");
+    plan->setProperty(maprocessor, 
org::apache::nifi::minifi::processors::ExtractText::IgnoreCaptureGroupZero.getName(),
 "true");
+    plan->setProperty(maprocessor, 
org::apache::nifi::minifi::processors::ExtractText::EnableRepeatingCaptureGroup.getName(),
 "true");
+    plan->setProperty(maprocessor, "RegexAttr", "Speed limit ([0-9]+)", true);
+    plan->setProperty(maprocessor, "InvalidRegex", "[Invalid)A(F)", true);
+
+    std::shared_ptr<core::Processor> laprocessor = 
plan->addProcessor("LogAttribute", "outputLogAttribute",
+                                                                      
core::Relationship("success", "description"),
+                                                                      true);
+    plan->setProperty(laprocessor, 
org::apache::nifi::minifi::processors::LogAttribute::AttributesToLog.getName(),
+                      TEST_ATTR);
+
+    std::stringstream ss;
+    ss << dir << "/" << TEST_FILE;
+    std::string test_file_path = ss.str();
+
+    std::ofstream test_file(test_file_path);
+    if (test_file.is_open()) {
+        test_file << REGEX_TEST_TEXT << std::endl;
+        test_file.close();
+    }
+
+    plan->runNextProcessor();  // GetFile
+    plan->runNextProcessor();  // ExtractText
+    plan->runNextProcessor();  // LogAttribute
+
+    std::list<std::string> suffixes = {"", ".0", ".1"};
+
+    for (const auto& suffix : suffixes) {
+        ss.str("");
+        ss << "key:" << "RegexAttr" << suffix << " value:" << ((suffix == 
".1") ? "80" : "130");
+        std::string log_check = ss.str();
+        REQUIRE(LogTestController::getInstance().contains(log_check));
+    }
+
+    std::string error_str = "error encountered when trying to construct 
regular expression from property (key: InvalidRegex)";
+
+    REQUIRE(LogTestController::getInstance().contains(error_str));
+
+    LogTestController::getInstance().reset();
+}

Reply via email to