This is an automated email from the ASF dual-hosted git repository. phrocker pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/master by this push: new 8022070 MINIFICPP-726 - Enhance ExtractText to have more feature parity with the Java impl 8022070 is described below commit 8022070f4d030559663ba3cfca12dcfa3a909a4e Author: Arpad Boda <ab...@hortonworks.com> AuthorDate: Mon Apr 8 12:04:48 2019 +0200 MINIFICPP-726 - Enhance ExtractText to have more feature parity with the Java impl MINIFICPP-726 - Add error handling for invalid regular expressions MINIFICPP-726 - Add gcc4.8 compatible solution This closes #533. Signed-off-by: Marc Parisi <phroc...@apache.org> --- libminifi/include/processors/ExtractText.h | 18 ++- libminifi/src/processors/ExtractText.cpp | 172 ++++++++++++++++++++++++++++- libminifi/test/unit/ExtractTextTests.cpp | 63 +++++++++++ 3 files changed, 244 insertions(+), 9 deletions(-) diff --git a/libminifi/include/processors/ExtractText.h b/libminifi/include/processors/ExtractText.h index 4caf349..5e6ff4c 100644 --- a/libminifi/include/processors/ExtractText.h +++ b/libminifi/include/processors/ExtractText.h @@ -50,19 +50,30 @@ public: //! Supported Properties static core::Property Attribute; static core::Property SizeLimit; + + static core::Property RegexMode; + static core::Property IgnoreCaptureGroupZero; + static core::Property InsensitiveMatch; + static core::Property MaxCaptureGroupLen; + static core::Property EnableRepeatingCaptureGroup; + //! Supported Relationships static core::Relationship Success; //! Default maximum bytes to read into an attribute static constexpr int DEFAULT_SIZE_LIMIT = 2 * 1024 * 1024; //! OnTrigger method, implemented by NiFi ExtractText - void onTrigger(core::ProcessContext *context, core::ProcessSession *session) override; + void onTrigger(core::ProcessContext *context, core::ProcessSession *session); //! Initialize, over write by NiFi ExtractText - void initialize(void) override; + void initialize(void); + + virtual bool supportsDynamicProperties() { + return true; + }; class ReadCallback : public InputStreamCallback { public: - ReadCallback(std::shared_ptr<core::FlowFile> flowFile, core::ProcessContext *ct); + ReadCallback(std::shared_ptr<core::FlowFile> flowFile, core::ProcessContext *ct, std::shared_ptr<logging::Logger> lgr); ~ReadCallback() {} int64_t process(std::shared_ptr<io::BaseStream> stream); @@ -70,6 +81,7 @@ public: std::shared_ptr<core::FlowFile> flowFile_; core::ProcessContext *ctx_; std::vector<uint8_t> buffer_; + std::shared_ptr<logging::Logger> logger_; }; protected: diff --git a/libminifi/src/processors/ExtractText.cpp b/libminifi/src/processors/ExtractText.cpp index fcd9eea..d1a5c0d 100644 --- a/libminifi/src/processors/ExtractText.cpp +++ b/libminifi/src/processors/ExtractText.cpp @@ -21,6 +21,7 @@ #include <string> #include <memory> #include <set> +#include <regex> #include <iostream> #include <sstream> @@ -30,6 +31,12 @@ #include "core/ProcessSession.h" #include "core/FlowFile.h" +#if !defined(_WIN32) +#if __cplusplus <= 201103L +#include <regex.h> +#endif +#endif + namespace org { namespace apache { namespace nifi { @@ -37,13 +44,44 @@ namespace minifi { namespace processors { #define MAX_BUFFER_SIZE 4096 +#define MAX_CAPTURE_GROUP_SIZE 1024 core::Property ExtractText::Attribute(core::PropertyBuilder::createProperty("Attribute")->withDescription("Attribute to set from content")->build()); // despite there being a size value, ExtractText was initially built with a numeric for this property core::Property ExtractText::SizeLimit( - core::PropertyBuilder::createProperty("Size Limit")->withDescription("Maximum number of bytes to read into the attribute. 0 for no limit. Default is 2MB.")->withDefaultValue<uint32_t>( - DEFAULT_SIZE_LIMIT)->build()); + core::PropertyBuilder::createProperty("Size Limit") + ->withDescription("Maximum number of bytes to read into the attribute. 0 for no limit. Default is 2MB.") + ->withDefaultValue<uint32_t>(DEFAULT_SIZE_LIMIT)->build()); + +core::Property ExtractText::RegexMode( + core::PropertyBuilder::createProperty("Regex Mode") + ->withDescription("Set this to extract parts of flowfile content using regular experssions in dynamic properties") + ->withDefaultValue<bool>(false)->build()); + +core::Property ExtractText::IgnoreCaptureGroupZero( + core::PropertyBuilder::createProperty("Include Capture Group 0") + ->withDescription("Indicates that Capture Group 0 should be included as an attribute. " + "Capture Group 0 represents the entirety of the regular expression match, is typically not used, and could have considerable length.") + ->withDefaultValue<bool>(true)->build()); + +core::Property ExtractText::InsensitiveMatch( + core::PropertyBuilder::createProperty("Enable Case-insensitive Matching") + ->withDescription("Indicates that two characters match even if they are in a different case. ") + ->withDefaultValue<bool>(false)->build()); + +core::Property ExtractText::MaxCaptureGroupLen( + core::PropertyBuilder::createProperty("Maximum Capture Group Length") + ->withDescription("Specifies the maximum number of characters a given capture group value can have. " + "Any characters beyond the max will be truncated.") + ->withDefaultValue<int>(MAX_CAPTURE_GROUP_SIZE)->build()); + + +core::Property ExtractText::EnableRepeatingCaptureGroup( + core::PropertyBuilder::createProperty("Enable repeating capture group") + ->withDescription("f set to true, every string matching the capture groups will be extracted. " + "Otherwise, if the Regular Expression matches more than once, only the first match will be extracted.") + ->withDefaultValue<bool>(false)->build()); core::Relationship ExtractText::Success("success", "success operational on the flow record"); @@ -52,6 +90,11 @@ void ExtractText::initialize() { std::set<core::Property> properties; properties.insert(Attribute); properties.insert(SizeLimit); + properties.insert(RegexMode); + properties.insert(IgnoreCaptureGroupZero); + properties.insert(MaxCaptureGroupLen); + properties.insert(EnableRepeatingCaptureGroup); + properties.insert(InsensitiveMatch); setSupportedProperties(properties); //! Set the supported relationships std::set<core::Relationship> relationships; @@ -66,7 +109,7 @@ void ExtractText::onTrigger(core::ProcessContext *context, core::ProcessSession return; } - ReadCallback cb(flowFile, context); + ReadCallback cb(flowFile, context, logger_); session->read(flowFile, &cb); session->transfer(flowFile, Success); } @@ -74,11 +117,13 @@ void ExtractText::onTrigger(core::ProcessContext *context, core::ProcessSession int64_t ExtractText::ReadCallback::process(std::shared_ptr<io::BaseStream> stream) { int64_t ret = 0; uint64_t read_size = 0; + bool regex_mode; uint64_t size_limit = flowFile_->getSize(); std::string attrKey, sizeLimitStr; ctx_->getProperty(Attribute.getName(), attrKey); ctx_->getProperty(SizeLimit.getName(), sizeLimitStr); + ctx_->getProperty(RegexMode.getName(), regex_mode); if (sizeLimitStr == "") size_limit = DEFAULT_SIZE_LIMIT; @@ -104,13 +149,128 @@ int64_t ExtractText::ReadCallback::process(std::shared_ptr<io::BaseStream> strea } } - flowFile_->setAttribute(attrKey, contentStream.str()); + if (regex_mode) { + std::regex_constants::syntax_option_type regex_mode = std::regex_constants::ECMAScript; + + bool insensitive; + if (ctx_->getProperty(InsensitiveMatch.getName(), insensitive) && insensitive) { + regex_mode |= std::regex_constants::icase; + } + + bool ignoregroupzero; + ctx_->getProperty(IgnoreCaptureGroupZero.getName(), ignoregroupzero); + + bool repeatingcapture; + ctx_->getProperty(EnableRepeatingCaptureGroup.getName(), repeatingcapture); + + int maxCaptureSize; + ctx_->getProperty(MaxCaptureGroupLen.getName(), maxCaptureSize); + + std::string contentStr = contentStream.str(); + + std::map<std::string, std::string> regexAttributes; + + for (const auto& k : ctx_->getDynamicPropertyKeys()) { + std::string value; + ctx_->getDynamicProperty(k, value); + + std::string workStr = contentStr; + + int matchcount = 0; + +#if (__cplusplus > 201103L) || defined(_WIN32) + + std::regex rgx; + + try { + rgx = std::regex(value, regex_mode); + } catch(const std::regex_error& e) { + logger_->log_error("%s error encountered when trying to construct regular expression from property (key: %s) value: %s", + e.what(), k, value); + continue; + } + + std::smatch matches; + + while (std::regex_search(workStr, matches, rgx)) { + size_t i = ignoregroupzero ? 1 : 0; + + for (; i < matches.size(); ++i, ++matchcount) { + std::string attributeValue = matches[i].str(); + if (attributeValue.length() > maxCaptureSize) { + attributeValue = attributeValue.substr(0, maxCaptureSize); + } + if (matchcount == 0) { + regexAttributes[k] = attributeValue; + } + regexAttributes[k + '.' + std::to_string(matchcount)] = attributeValue; + } + if (!repeatingcapture) { + break; + } + workStr = matches.suffix(); + } +#else + + size_t maxGroups = std::count(value.begin(), value.end(), '(') + 1; + + regex_t regexCompiled; + std::vector<regmatch_t> groups; + groups.reserve(maxGroups); + + if (regcomp(®exCompiled, value.c_str(), REG_EXTENDED | (insensitive ? REG_ICASE : 0))) { + logger_->log_error("error encountered when trying to construct regular expression from property (key: %s) value: %s", + k, value); + continue; + } + + while (regexec(®exCompiled, workStr.c_str(), groups.capacity(), groups.data(), 0) == 0) { + size_t g = 0; + size_t match_len = 0; + for (g = 0; g < maxGroups; g++) { + if (groups[g].rm_so == -1) { + break; // No more groups + } + + if (g == 0) { + match_len = groups[g].rm_eo; + if (ignoregroupzero) { + continue; + } + } + + std::string attributeValue(workStr.begin() + groups[g].rm_so, workStr.begin() + groups[g].rm_eo); + if (attributeValue.length() > maxCaptureSize) { + attributeValue = attributeValue.substr(0, maxCaptureSize); + } + + if (matchcount == 0) { + regexAttributes[k] = attributeValue; + } + regexAttributes[k + '.' + std::to_string(matchcount)] = attributeValue; + matchcount++; + } + if (!repeatingcapture || (match_len >= workStr.length())) { + break; + } + workStr = workStr.substr(match_len + 1); + } +#endif + } + + for (const auto& kv : regexAttributes) { + flowFile_->setAttribute(kv.first, kv.second); + } + } else { + flowFile_->setAttribute(attrKey, contentStream.str()); + } return read_size; } -ExtractText::ReadCallback::ReadCallback(std::shared_ptr<core::FlowFile> flowFile, core::ProcessContext *ctx) +ExtractText::ReadCallback::ReadCallback(std::shared_ptr<core::FlowFile> flowFile, core::ProcessContext *ctx, std::shared_ptr<logging::Logger> lgr) : flowFile_(flowFile), - ctx_(ctx) { + ctx_(ctx), + logger_(lgr) { buffer_.reserve(std::min<uint64_t>(flowFile->getSize(), MAX_BUFFER_SIZE)); } diff --git a/libminifi/test/unit/ExtractTextTests.cpp b/libminifi/test/unit/ExtractTextTests.cpp index f07a8bb..fd5604e 100644 --- a/libminifi/test/unit/ExtractTextTests.cpp +++ b/libminifi/test/unit/ExtractTextTests.cpp @@ -16,6 +16,7 @@ * limitations under the License. */ #include <uuid/uuid.h> +#include <list> #include <fstream> #include <map> #include <memory> @@ -38,6 +39,7 @@ #include "processors/LogAttribute.h" const char* TEST_TEXT = "Test text\n"; +const char* REGEX_TEST_TEXT = "Speed limit 130 | Speed limit 80"; const char* TEST_FILE = "test_file.txt"; const char* TEST_ATTR = "ExtractedText"; @@ -122,3 +124,64 @@ TEST_CASE("Test usage of ExtractText", "[extracttextTest]") { LogTestController::getInstance().reset(); } + +TEST_CASE("Test usage of ExtractText in regex mode", "[extracttextRegexTest]") { + TestController testController; + LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::ExtractText>(); + LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::GetFile>(); + LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::LogAttribute>(); + + std::shared_ptr<TestPlan> plan = testController.createPlan(); + std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>(); + + char dir[] = "/tmp/gt.XXXXXX"; + + REQUIRE(testController.createTempDirectory(dir) != nullptr); + std::shared_ptr<core::Processor> getfile = plan->addProcessor("GetFile", "getfileCreate2"); + plan->setProperty(getfile, org::apache::nifi::minifi::processors::GetFile::Directory.getName(), dir); + plan->setProperty(getfile, org::apache::nifi::minifi::processors::GetFile::KeepSourceFile.getName(), "true"); + + std::shared_ptr<core::Processor> maprocessor = plan->addProcessor("ExtractText", "testExtractText", + core::Relationship("success", "description"), + true); + plan->setProperty(maprocessor, org::apache::nifi::minifi::processors::ExtractText::RegexMode.getName(), "true"); + plan->setProperty(maprocessor, org::apache::nifi::minifi::processors::ExtractText::IgnoreCaptureGroupZero.getName(), "true"); + plan->setProperty(maprocessor, org::apache::nifi::minifi::processors::ExtractText::EnableRepeatingCaptureGroup.getName(), "true"); + plan->setProperty(maprocessor, "RegexAttr", "Speed limit ([0-9]+)", true); + plan->setProperty(maprocessor, "InvalidRegex", "[Invalid)A(F)", true); + + std::shared_ptr<core::Processor> laprocessor = plan->addProcessor("LogAttribute", "outputLogAttribute", + core::Relationship("success", "description"), + true); + plan->setProperty(laprocessor, org::apache::nifi::minifi::processors::LogAttribute::AttributesToLog.getName(), + TEST_ATTR); + + std::stringstream ss; + ss << dir << "/" << TEST_FILE; + std::string test_file_path = ss.str(); + + std::ofstream test_file(test_file_path); + if (test_file.is_open()) { + test_file << REGEX_TEST_TEXT << std::endl; + test_file.close(); + } + + plan->runNextProcessor(); // GetFile + plan->runNextProcessor(); // ExtractText + plan->runNextProcessor(); // LogAttribute + + std::list<std::string> suffixes = {"", ".0", ".1"}; + + for (const auto& suffix : suffixes) { + ss.str(""); + ss << "key:" << "RegexAttr" << suffix << " value:" << ((suffix == ".1") ? "80" : "130"); + std::string log_check = ss.str(); + REQUIRE(LogTestController::getInstance().contains(log_check)); + } + + std::string error_str = "error encountered when trying to construct regular expression from property (key: InvalidRegex)"; + + REQUIRE(LogTestController::getInstance().contains(error_str)); + + LogTestController::getInstance().reset(); +}