Repository: nifi-minifi-cpp Updated Branches: refs/heads/master 0a2d25c48 -> b20da80eb
MINIFI-341: Introduce delimiter to TailFile to delimit incoming data appropriately. This closed #116. Signed-off-by: Marc Parisi <phroc...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/b20da80e Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/b20da80e Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/b20da80e Branch: refs/heads/master Commit: b20da80eb73d49286041ed3f7a2e04afb6b0a7f3 Parents: 0a2d25c Author: Jeremy Dyer <jeremyd...@apache.org> Authored: Thu Jun 22 22:29:21 2017 -0400 Committer: Marc Parisi <phroc...@apache.org> Committed: Wed Jul 5 13:58:39 2017 -0400 ---------------------------------------------------------------------- .gitignore | 3 + libminifi/include/core/ProcessSession.h | 2 + libminifi/include/processors/TailFile.h | 10 ++ libminifi/src/core/ProcessSession.cpp | 85 ++++++++++++ libminifi/src/processors/TailFile.cpp | 68 +++++++--- libminifi/test/resources/TestTailFile.txt | 2 + libminifi/test/unit/TailFileTests.cpp | 171 +++++++++++++++++++++++++ 7 files changed, 325 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b20da80e/.gitignore ---------------------------------------------------------------------- diff --git a/.gitignore b/.gitignore index 4a4c064..3032f38 100644 --- a/.gitignore +++ b/.gitignore @@ -22,6 +22,9 @@ # Ignore JetBrains project files .idea +# Ignore JetBrains cLion project files. +.project + # Ignore kdevelop metadata nifi-minifi-cpp.kdev4 .kdev4 http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b20da80e/libminifi/include/core/ProcessSession.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/ProcessSession.h b/libminifi/include/core/ProcessSession.h index 4d20f59..ad79d12 100644 --- a/libminifi/include/core/ProcessSession.h +++ b/libminifi/include/core/ProcessSession.h @@ -123,6 +123,8 @@ class ProcessSession { void import(std::string source, std::shared_ptr<core::FlowFile> &&flow, bool keepSource = true, uint64_t offset = 0); + void import(std::string source, std::vector<std::shared_ptr<FlowFileRecord>> flows, + bool keepSource, uint64_t offset, char inputDelimiter); // Prevent default copy constructor and assignment operation // Only support pass by reference or pointer http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b20da80e/libminifi/include/processors/TailFile.h ---------------------------------------------------------------------- diff --git a/libminifi/include/processors/TailFile.h b/libminifi/include/processors/TailFile.h index 59cf224..5e166cf 100644 --- a/libminifi/include/processors/TailFile.h +++ b/libminifi/include/processors/TailFile.h @@ -54,10 +54,18 @@ class TailFile : public core::Processor { // Supported Properties static core::Property FileName; static core::Property StateFile; + static core::Property Delimiter; // Supported Relationships static core::Relationship Success; public: + /** + * Function that's executed when the processor is scheduled. + * @param context process context. + * @param sessionFactory process session factory that is used when creating + * ProcessSession objects. + */ + void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory); // OnTrigger method, implemented by NiFi TailFile virtual void onTrigger(core::ProcessContext *context, core::ProcessSession *session); // Initialize, over write by NiFi TailFile @@ -75,6 +83,8 @@ class TailFile : public core::Processor { std::string _stateFile; // State related to the tailed file std::string _currentTailFileName; + // Delimiter for the data incoming from the tailed file. + std::string _delimiter; // determine if state is recovered; bool _stateRecovered; uint64_t _currentTailFilePosition; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b20da80e/libminifi/src/core/ProcessSession.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp index 037660f..df21a34 100644 --- a/libminifi/src/core/ProcessSession.cpp +++ b/libminifi/src/core/ProcessSession.cpp @@ -626,6 +626,91 @@ bool keepSource, } } +void ProcessSession::import(std::string source, std::vector<std::shared_ptr<FlowFileRecord>> flows, + bool keepSource, uint64_t offset, char inputDelimiter) { + std::shared_ptr<ResourceClaim> claim; + + std::shared_ptr<FlowFileRecord> flowFile; + + char *buf = NULL; + int size = 4096; + buf = new char[size]; + + try { + // Open the input file and seek to the appropriate location. + std::ifstream input; + input.open(source.c_str(), std::fstream::in | std::fstream::binary); + if (input.is_open()) { + input.seekg(offset, input.beg); + while (input.good()) { + flowFile = std::static_pointer_cast<FlowFileRecord>(create()); + claim = std::make_shared<ResourceClaim>(); + uint64_t startTime = getTimeMillis(); + input.getline(buf, size, inputDelimiter); + std::ofstream fs; + fs.open(claim->getContentFullPath().c_str(), std::fstream::out | std::fstream::binary | std::fstream::trunc); + + if (fs.is_open()) { + if (input) + fs.write(buf, strlen(buf)); + else + fs.write(buf, input.gcount()); + + if (fs.good() && fs.tellp() >= 0) { + flowFile->setSize(fs.tellp()); + flowFile->setOffset(0); + if (flowFile->getResourceClaim() != nullptr) { + // Remove the old claim + flowFile->getResourceClaim()->decreaseFlowFileRecordOwnedCount(); + flowFile->clearResourceClaim(); + } + flowFile->setResourceClaim(claim); + claim->increaseFlowFileRecordOwnedCount(); + + logger_->log_debug("Import offset %d length %d into content %s for FlowFile UUID %s", flowFile->getOffset(), + flowFile->getSize(), flowFile->getResourceClaim()->getContentFullPath().c_str(), + flowFile->getUUIDStr().c_str()); + + fs.close(); + std::string details = process_context_->getProcessorNode().getName() + " modify flow record content " + flowFile->getUUIDStr(); + uint64_t endTime = getTimeMillis(); + provenance_report_->modifyContent(flowFile, details, endTime - startTime); + flows.push_back(flowFile); + + } else { + fs.close(); + throw Exception(FILE_OPERATION_EXCEPTION, "File Export Error creating Flowfile"); + } + } + } + input.close(); + if (!keepSource) + std::remove(source.c_str()); + } else { + input.close(); + throw Exception(FILE_OPERATION_EXCEPTION, "File Import Error"); + } + + delete[] buf; + } catch (std::exception &exception) { + if (flowFile && flowFile->getResourceClaim() == claim) { + flowFile->getResourceClaim()->decreaseFlowFileRecordOwnedCount(); + flowFile->clearResourceClaim(); + } + logger_->log_debug("Caught Exception %s", exception.what()); + delete[] buf; + throw; + } catch (...) { + if (flowFile && flowFile->getResourceClaim() == claim) { + flowFile->getResourceClaim()->decreaseFlowFileRecordOwnedCount(); + flowFile->clearResourceClaim(); + } + logger_->log_debug("Caught Exception during process session write"); + delete[] buf; + throw; + } +} + void ProcessSession::import(std::string source, std::shared_ptr<core::FlowFile> &&flow, bool keepSource, uint64_t offset) { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b20da80e/libminifi/src/processors/TailFile.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/processors/TailFile.cpp b/libminifi/src/processors/TailFile.cpp index d4f1b80..46ed1fb 100644 --- a/libminifi/src/processors/TailFile.cpp +++ b/libminifi/src/processors/TailFile.cpp @@ -50,6 +50,8 @@ core::Property TailFile::FileName("File to Tail", "Fully-qualified filename of t core::Property TailFile::StateFile("State File", "Specifies the file that should be used for storing state about" " what data has been ingested so that upon restart NiFi can resume from where it left off", "TailFileState"); +core::Property TailFile::Delimiter("Input Delimiter", "Specifies the character that should be used for delimiting the data being tailed" + "from the incoming file.", ""); core::Relationship TailFile::Success("success", "All files are routed to success"); void TailFile::initialize() { @@ -57,6 +59,7 @@ void TailFile::initialize() { std::set<core::Property> properties; properties.insert(FileName); properties.insert(StateFile); + properties.insert(Delimiter); setSupportedProperties(properties); // Set the supported relationships std::set<core::Relationship> relationships; @@ -64,6 +67,14 @@ void TailFile::initialize() { setSupportedRelationships(relationships); } +void TailFile::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) { + std::string value; + + if (context->getProperty(Delimiter.getName(), value)) { + _delimiter = value; + } +} + std::string TailFile::trimLeft(const std::string& s) { return org::apache::nifi::minifi::utils::StringUtils::trimLeft(s); } @@ -222,27 +233,52 @@ void TailFile::onTrigger(core::ProcessContext *context, core::ProcessSession *se checkRollOver(fileLocation, fileName); std::string fullPath = fileLocation + "/" + _currentTailFileName; struct stat statbuf; + if (stat(fullPath.c_str(), &statbuf) == 0) { if (statbuf.st_size <= this->_currentTailFilePosition) { - // there are no new input for the current tail fil + // there are no new input for the current tail file context->yield(); return; } - std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->create()); - if (!flowFile) - return; - std::size_t found = _currentTailFileName.find_last_of("."); - std::string baseName = _currentTailFileName.substr(0, found); - std::string extension = _currentTailFileName.substr(found + 1); - flowFile->updateKeyedAttribute(PATH, fileLocation); - flowFile->addKeyedAttribute(ABSOLUTE_PATH, fullPath); - session->import(fullPath, flowFile, true, this->_currentTailFilePosition); - session->transfer(flowFile, Success); - logger_->log_info("TailFile %s for %d bytes", _currentTailFileName.c_str(), flowFile->getSize()); - std::string logName = baseName + "." + std::to_string(_currentTailFilePosition) + "-" + std::to_string(_currentTailFilePosition + flowFile->getSize()) + "." + extension; - flowFile->updateKeyedAttribute(FILENAME, logName); - this->_currentTailFilePosition += flowFile->getSize(); - storeState(); + + std::size_t found = _currentTailFileName.find_last_of("."); + std::string baseName = _currentTailFileName.substr(0, found); + std::string extension = _currentTailFileName.substr(found + 1); + + if (!this->_delimiter.empty()) { + char delim = this->_delimiter.c_str()[0]; + std::vector<std::shared_ptr<FlowFileRecord>> flowFiles = std::vector<std::shared_ptr<FlowFileRecord>>(); + session->import(fullPath, flowFiles, true, this->_currentTailFilePosition, delim); + logger_->log_info("%d flowfiles were received from TailFile input", flowFiles.size()); + + for (std::shared_ptr<FlowFileRecord> ffr : flowFiles) { + logger_->log_info("TailFile %s for %d bytes", _currentTailFileName, ffr->getSize()); + std::string logName = baseName + "." + std::to_string(_currentTailFilePosition) + "-" + std::to_string(_currentTailFilePosition + ffr->getSize()) + "." + extension; + ffr->updateKeyedAttribute(PATH, fileLocation); + ffr->addKeyedAttribute(ABSOLUTE_PATH, fullPath); + ffr->updateKeyedAttribute(FILENAME, logName); + session->transfer(ffr, Success); + this->_currentTailFilePosition += ffr->getSize() + 1; + storeState(); + } + + } else { + std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->create()); + if (!flowFile) + return; + flowFile->updateKeyedAttribute(PATH, fileLocation); + flowFile->addKeyedAttribute(ABSOLUTE_PATH, fullPath); + session->import(fullPath, flowFile, true, this->_currentTailFilePosition); + session->transfer(flowFile, Success); + logger_->log_info("TailFile %s for %d bytes", _currentTailFileName, flowFile->getSize()); + std::string logName = baseName + "." + std::to_string(_currentTailFilePosition) + "-" + std::to_string(_currentTailFilePosition + flowFile->getSize()) + "." + extension; + flowFile->updateKeyedAttribute(FILENAME, logName); + this->_currentTailFilePosition += flowFile->getSize(); + storeState(); + } + + } else { + logger_->log_warn("Unable to stat file %s", fullPath.c_str()); } } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b20da80e/libminifi/test/resources/TestTailFile.txt ---------------------------------------------------------------------- diff --git a/libminifi/test/resources/TestTailFile.txt b/libminifi/test/resources/TestTailFile.txt new file mode 100644 index 0000000..8b97da0 --- /dev/null +++ b/libminifi/test/resources/TestTailFile.txt @@ -0,0 +1,2 @@ +one,two,three +four,five,six, seven \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b20da80e/libminifi/test/unit/TailFileTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/TailFileTests.cpp b/libminifi/test/unit/TailFileTests.cpp new file mode 100644 index 0000000..e800b4c --- /dev/null +++ b/libminifi/test/unit/TailFileTests.cpp @@ -0,0 +1,171 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#define CATCH_CONFIG_MAIN // This tells Catch to provide a main() - only do this in one cpp file +#include <uuid/uuid.h> +#include <fstream> +#include <map> +#include <memory> +#include <utility> +#include <string> +#include <set> +#include "FlowController.h" +#include "../TestBase.h" +#include "core/Core.h" +#include "../../include/core/FlowFile.h" +#include "../unit/ProvenanceTestHelper.h" +#include "core/Processor.h" +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" +#include "core/ProcessorNode.h" +#include <iostream> + +static const char *NEWLINE_FILE = "" + "one,two,three\n" + "four,five,six, seven"; +static const char *TMP_FILE = "/tmp/minifi-tmpfile.txt"; +static const char *STATE_FILE = "/tmp/minifi-state-file.txt"; + +TEST_CASE("TailFileWithDelimiter", "[tailfiletest1]") { + try { + // Create and write to the test file + std::ofstream tmpfile; + tmpfile.open(TMP_FILE); + tmpfile << NEWLINE_FILE; + tmpfile.close(); + + TestController testController; + LogTestController::getInstance().setInfo<org::apache::nifi::minifi::processors::TailFile>(); + + std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>(); + + std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::TailFile>("tailfile"); + std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute"); + + uuid_t processoruuid; + REQUIRE(true == processor->getUUID(processoruuid)); + uuid_t logAttributeuuid; + REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid)); + + std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, "logattributeconnection"); + connection->setRelationship(core::Relationship("success", "TailFile successful output")); + + // link the connections so that we can test results at the end for this + connection->setDestination(connection); + + connection->setSourceUUID(processoruuid); + + processor->addConnection(connection); + + core::ProcessorNode node(processor); + + std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr; + core::ProcessContext context(node, controller_services_provider, repo); + context.setProperty(org::apache::nifi::minifi::processors::TailFile::Delimiter, "\n"); + context.setProperty(org::apache::nifi::minifi::processors::TailFile::FileName, TMP_FILE); + context.setProperty(org::apache::nifi::minifi::processors::TailFile::StateFile, STATE_FILE); + + core::ProcessSession session(&context); + + REQUIRE(processor->getName() == "tailfile"); + + core::ProcessSessionFactory factory(&context); + + std::shared_ptr<core::FlowFile> record; + processor->setScheduledState(core::ScheduledState::RUNNING); + processor->onSchedule(&context, &factory); + processor->onTrigger(&context, &session); + + provenance::ProvenanceReporter *reporter = session.getProvenanceReporter(); + std::set<provenance::ProvenanceEventRecord*> provRecords = reporter->getEvents(); + record = session.get(); + REQUIRE(record == nullptr); + std::shared_ptr<core::FlowFile> ff = session.get(); + REQUIRE(provRecords.size() == 4); // 2 creates and 2 modifies for flowfiles + + LogTestController::getInstance().reset(); + } catch (...) { } + + // Delete the test and state file. + std::remove(TMP_FILE); + std::remove(STATE_FILE); +} + + +TEST_CASE("TailFileWithoutDelimiter", "[tailfiletest2]") { + try { + // Create and write to the test file + std::ofstream tmpfile; + tmpfile.open(TMP_FILE); + tmpfile << NEWLINE_FILE; + tmpfile.close(); + + TestController testController; + LogTestController::getInstance().setInfo<org::apache::nifi::minifi::processors::TailFile>(); + + std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>(); + + std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::TailFile>("tailfile"); + std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute"); + + uuid_t processoruuid; + REQUIRE(true == processor->getUUID(processoruuid)); + uuid_t logAttributeuuid; + REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid)); + + std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, "logattributeconnection"); + connection->setRelationship(core::Relationship("success", "TailFile successful output")); + + // link the connections so that we can test results at the end for this + connection->setDestination(connection); + connection->setSourceUUID(processoruuid); + + processor->addConnection(connection); + + core::ProcessorNode node(processor); + + std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr; + core::ProcessContext context(node, controller_services_provider, repo); + context.setProperty(org::apache::nifi::minifi::processors::TailFile::FileName, TMP_FILE); + context.setProperty(org::apache::nifi::minifi::processors::TailFile::StateFile, STATE_FILE); + + core::ProcessSession session(&context); + + REQUIRE(processor->getName() == "tailfile"); + + core::ProcessSessionFactory factory(&context); + + std::shared_ptr<core::FlowFile> record; + processor->setScheduledState(core::ScheduledState::RUNNING); + processor->onSchedule(&context, &factory); + processor->onTrigger(&context, &session); + + provenance::ProvenanceReporter *reporter = session.getProvenanceReporter(); + std::set<provenance::ProvenanceEventRecord*> provRecords = reporter->getEvents(); + record = session.get(); + REQUIRE(record == nullptr); + std::shared_ptr<core::FlowFile> ff = session.get(); + REQUIRE(provRecords.size() == 2); + + LogTestController::getInstance().reset(); + } catch (...) { } + + // Delete the test and state file. + std::remove(TMP_FILE); + std::remove(STATE_FILE); +}