Repository: nifi-minifi-cpp Updated Branches: refs/heads/master 1e9cf2ec7 -> 72b9a0e1a
MINIFICPP-52 basic ExtractText processor Regex support blocked by dynamic properties #37 Requested changes part 1 Use a vector as the buffer, and add Size Limit parameter This closes #152. Signed-off-by: Marc Parisi <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/72b9a0e1 Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/72b9a0e1 Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/72b9a0e1 Branch: refs/heads/master Commit: 72b9a0e1a154495c27899cfdc35b443e1c2a34da Parents: 1e9cf2e Author: Caleb Johnson <[email protected]> Authored: Tue Oct 17 19:50:13 2017 +0000 Committer: Marc Parisi <[email protected]> Committed: Thu Oct 26 09:47:48 2017 -0400 ---------------------------------------------------------------------- README.md | 1 + libminifi/include/processors/ExtractText.h | 91 ++++++++++++++++++ libminifi/src/processors/ExtractText.cpp | 121 ++++++++++++++++++++++++ libminifi/test/unit/ExtractTextTests.cpp | 101 ++++++++++++++++++++ 4 files changed, 314 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/72b9a0e1/README.md ---------------------------------------------------------------------- diff --git a/README.md b/README.md index 5917f57..8f8af9c 100644 --- a/README.md +++ b/README.md @@ -57,6 +57,7 @@ Perspectives of the role of MiNiFi should be from the perspective of the agent a * PutFile * TailFile * MergeContent + * ExtractText * Provenance events generation is supported and are persisted using RocksDB. ## System Requirements http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/72b9a0e1/libminifi/include/processors/ExtractText.h ---------------------------------------------------------------------- diff --git a/libminifi/include/processors/ExtractText.h b/libminifi/include/processors/ExtractText.h new file mode 100644 index 0000000..2d7ba56 --- /dev/null +++ b/libminifi/include/processors/ExtractText.h @@ -0,0 +1,91 @@ +/** + * @file ExtractText.h + * ExtractText class declaration + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __EXTRACT_TEXT_H__ +#define __EXTRACT_TEXT_H__ + +#include "FlowFileRecord.h" +#include "core/Processor.h" +#include "core/ProcessSession.h" +#include "core/Resource.h" + +#include <vector> + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +//! ExtractText Class +class ExtractText : public core::Processor { +public: + //! Constructor + /*! + * Create a new processor + */ + explicit ExtractText(std::string name, uuid_t uuid = nullptr) + : Processor(name, uuid) + { + logger_ = logging::LoggerFactory<ExtractText>::getLogger(); + } + //! Processor Name + static constexpr char const* ProcessorName = "ExtractText"; + //! Supported Properties + static core::Property Attribute; + static core::Property SizeLimit; + //! Supported Relationships + static core::Relationship Success; + //! Default maximum bytes to read into an attribute + static constexpr int DEFAULT_SIZE_LIMIT = 2 * 1024 * 1024; + + //! OnTrigger method, implemented by NiFi ExtractText + void onTrigger(core::ProcessContext *context, core::ProcessSession *session); + //! Initialize, over write by NiFi ExtractText + void initialize(void); + + class ReadCallback : public InputStreamCallback { + public: + ReadCallback(std::shared_ptr<core::FlowFile> flowFile, core::ProcessContext *ct); + ~ReadCallback() {} + int64_t process(std::shared_ptr<io::BaseStream> stream); + + private: + std::shared_ptr<core::FlowFile> flowFile_; + core::ProcessContext *ctx_; + std::vector<uint8_t> buffer_; + int64_t max_read_; + }; + +protected: + +private: + //! Logger + std::shared_ptr<logging::Logger> logger_; +}; + +REGISTER_RESOURCE(ExtractText); + +} /* namespace processors */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/72b9a0e1/libminifi/src/processors/ExtractText.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/processors/ExtractText.cpp b/libminifi/src/processors/ExtractText.cpp new file mode 100644 index 0000000..1bb4d9f --- /dev/null +++ b/libminifi/src/processors/ExtractText.cpp @@ -0,0 +1,121 @@ +/** + * @file ExtractText.cpp + * ExtractText class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include <iterator> +#include <string> +#include <memory> +#include <set> + +#include <iostream> +#include <sstream> + +#include "processors/ExtractText.h" +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" +#include "core/FlowFile.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +core::Property ExtractText::Attribute("Attribute", "Attribute to set from content", ""); +core::Property ExtractText::SizeLimit("Size Limit", "Maximum number of bytes to read into the attribute. 0 for no limit. Default is 2MB."); +core::Relationship ExtractText::Success("success", "success operational on the flow record"); + +void ExtractText::initialize() { + //! Set the supported properties + std::set<core::Property> properties; + properties.insert(Attribute); + setSupportedProperties(properties); + //! Set the supported relationships + std::set<core::Relationship> relationships; + relationships.insert(Success); + setSupportedRelationships(relationships); +} + +void ExtractText::onTrigger(core::ProcessContext *context, core::ProcessSession *session) { + std::shared_ptr<core::FlowFile> flowFile = session->get(); + + if (!flowFile) { + return; + } + + ReadCallback cb(flowFile, context); + session->read(flowFile, &cb); + session->transfer(flowFile, Success); +} + +int64_t ExtractText::ReadCallback::process(std::shared_ptr<io::BaseStream> stream) { + int64_t ret = 0; + int64_t size_limit = flowFile_->getSize(); + uint64_t read_size = 0; + uint64_t loop_read = max_read_; + + std::string attrKey, sizeLimitStr; + ctx_->getProperty(Attribute.getName(), attrKey); + ctx_->getProperty(SizeLimit.getName(), sizeLimitStr); + + if (sizeLimitStr == "") + size_limit = DEFAULT_SIZE_LIMIT; + else if (sizeLimitStr != "0") + size_limit = std::stoi(sizeLimitStr); + + std::ostringstream contentStream; + std::string contentStr; + + while (read_size < size_limit) { + if (size_limit - read_size < max_read_) + loop_read = size_limit - read_size; + + ret = stream->readData(buffer_, loop_read); + buffer_.resize(ret); + + if (ret < 0) { + return -1; + } + + if (ret > 0) { + contentStream.write(reinterpret_cast<const char*>(&buffer_[0]), ret); + if (contentStream.fail()) { + return -1; + } + } else { + break; + } + } + + contentStr = contentStream.str(); + flowFile_->setAttribute(attrKey, contentStr); + return read_size; +} + +ExtractText::ReadCallback::ReadCallback(std::shared_ptr<core::FlowFile> flowFile, core::ProcessContext *ctx) + : max_read_(getpagesize()), + flowFile_(flowFile), + ctx_(ctx) { + buffer_.resize(max_read_); + } + +} /* namespace processors */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/72b9a0e1/libminifi/test/unit/ExtractTextTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/ExtractTextTests.cpp b/libminifi/test/unit/ExtractTextTests.cpp new file mode 100644 index 0000000..de5c591 --- /dev/null +++ b/libminifi/test/unit/ExtractTextTests.cpp @@ -0,0 +1,101 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#define CATCH_CONFIG_MAIN // This tells Catch to provide a main() - only do this in one cpp file +#include <uuid/uuid.h> +#include <fstream> +#include <map> +#include <memory> +#include <utility> +#include <string> +#include <set> +#include <iostream> + +#include "../TestBase.h" +#include "core/Core.h" + +#include "core/FlowFile.h" +#include "core/Processor.h" +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" +#include "core/ProcessorNode.h" + +#include "processors/ExtractText.h" +#include "processors/LogAttribute.h" + +const char* TEST_TEXT = "Test text\n"; +const char* TEST_FILE = "test_file.txt"; +const char* TEST_ATTR = "ExtractedText"; + +TEST_CASE("Test Creation of ExtractText", "[extracttextCreate]") { + TestController testController; + std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::ExtractText>("processorname"); + REQUIRE(processor->getName() == "processorname"); + uuid_t processoruuid; + REQUIRE(processor->getUUID(processoruuid)); +} + +TEST_CASE("Test usage of ExtractText", "[extracttextTest]") { + TestController testController; + LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::ExtractText>(); + LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::PutFile>(); + LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::GetFile>(); + LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::LogAttribute>(); + LogTestController::getInstance().setTrace<core::ProcessSession>(); + LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository>(); + LogTestController::getInstance().setTrace<org::apache::nifi::minifi::Connection>(); + LogTestController::getInstance().setTrace<org::apache::nifi::minifi::core::Connectable>(); + LogTestController::getInstance().setTrace<org::apache::nifi::minifi::core::FlowFile>(); + + std::shared_ptr<TestPlan> plan = testController.createPlan(); + std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>(); + + char dir[] = "/tmp/gt.XXXXXX"; + + REQUIRE(testController.createTempDirectory(dir) != nullptr); + std::shared_ptr<core::Processor> getfile = plan->addProcessor("GetFile", "getfileCreate2"); + plan->setProperty(getfile, org::apache::nifi::minifi::processors::GetFile::Directory.getName(), dir); + plan->setProperty(getfile, org::apache::nifi::minifi::processors::GetFile::KeepSourceFile.getName(), "true"); + + std::shared_ptr<core::Processor> maprocessor = plan->addProcessor("ExtractText", "testExtractText", core::Relationship("success", "description"), true); + plan->setProperty(maprocessor, org::apache::nifi::minifi::processors::ExtractText::Attribute.getName(), TEST_ATTR); + + std::shared_ptr<core::Processor> laprocessor = plan->addProcessor("LogAttribute", "outputLogAttribute", core::Relationship("success", "description"), true); + plan->setProperty(laprocessor, org::apache::nifi::minifi::processors::LogAttribute::AttributesToLog.getName(), TEST_ATTR); + + std::stringstream ss1; + ss1 << dir << "/" << TEST_FILE; + std::string test_file_path = ss1.str(); + + std::ofstream test_file(test_file_path); + if (test_file.is_open()) { + test_file << TEST_TEXT << std::endl; + test_file.close(); + } + + plan->runNextProcessor(); // GetFile + plan->runNextProcessor(); // ExtractText + plan->runNextProcessor(); // LogAttribute + + std::stringstream ss2; + ss2 << "key:" << TEST_ATTR << " value:" << TEST_TEXT; + std::string log_check = ss2.str(); + + REQUIRE(LogTestController::getInstance().contains(log_check)); + + LogTestController::getInstance().reset(); +}
