Repository: nifi-minifi-cpp Updated Branches: refs/heads/master dad324142 -> 5fca46f7c
MINIFI-359: Add PutFile test to test a variety of conditions for the user provided input Signed-off-by: Bin Qiu <b...@apache.org> This closes #122 Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/5fca46f7 Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/5fca46f7 Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/5fca46f7 Branch: refs/heads/master Commit: 5fca46f7c7ee9059feb1b4926f769b932a186964 Parents: dad3241 Author: Marc Parisi <phroc...@apache.org> Authored: Mon Jul 31 10:57:28 2017 -0400 Committer: Bin Qiu <benqiu2...@gmail.com> Committed: Wed Aug 2 14:10:56 2017 -0700 ---------------------------------------------------------------------- libminifi/src/processors/PutFile.cpp | 19 +- libminifi/test/TestBase.cpp | 8 +- libminifi/test/TestBase.h | 3 + libminifi/test/unit/PutFileTests.cpp | 326 ++++++++++++++++++++++++++++++ 4 files changed, 346 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/5fca46f7/libminifi/src/processors/PutFile.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/processors/PutFile.cpp b/libminifi/src/processors/PutFile.cpp index 824c7d1..80aea6c 100644 --- a/libminifi/src/processors/PutFile.cpp +++ b/libminifi/src/processors/PutFile.cpp @@ -18,7 +18,7 @@ * limitations under the License. */ -#include "../../include/processors/PutFile.h" +#include "processors/PutFile.h" #include <sys/stat.h> #include <unistd.h> @@ -30,13 +30,13 @@ #include <set> #include <string> -#include "../../include/core/logging/Logger.h" -#include "../../include/core/ProcessContext.h" -#include "../../include/core/Property.h" -#include "../../include/core/Relationship.h" -#include "../../include/io/BaseStream.h" -#include "../../include/io/DataStream.h" -#include "../../include/io/validation.h" +#include "core/logging/Logger.h" +#include "core/ProcessContext.h" +#include "core/Property.h" +#include "core/Relationship.h" +#include "io/BaseStream.h" +#include "io/DataStream.h" +#include "io/validation.h" namespace org { namespace apache { @@ -82,7 +82,7 @@ void PutFile::onTrigger(core::ProcessContext *context, core::ProcessSession *ses return; } - std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast < FlowFileRecord > (session->get()); + std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->get()); // Do nothing if there are no incoming files if (!flowFile) { @@ -131,6 +131,7 @@ bool PutFile::putFile(core::ProcessSession *session, std::shared_ptr<FlowFileRec ReadCallback cb(tmpFile, destFile); session->read(flowFile, &cb); + logger_->log_info("Committing %s", destFile); if (cb.commit()) { session->transfer(flowFile, Success); return true; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/5fca46f7/libminifi/test/TestBase.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/TestBase.cpp b/libminifi/test/TestBase.cpp index 4c0814d..a471afe 100644 --- a/libminifi/test/TestBase.cpp +++ b/libminifi/test/TestBase.cpp @@ -29,7 +29,8 @@ TestPlan::TestPlan(std::shared_ptr<core::ContentRepository> content_repo, std::s prov_repo_(prov_repo), location(-1), finalized(false), - current_flowfile_(nullptr) { + current_flowfile_(nullptr), + logger_(logging::LoggerFactory<TestPlan>::getLogger()) { } std::shared_ptr<core::Processor> TestPlan::addProcessor(const std::shared_ptr<core::Processor> &processor, const std::string &name, core::Relationship relationship, @@ -59,6 +60,7 @@ bool linkToPrevious) { std::stringstream connection_name; connection_name << last->getUUIDStr() << "-to-" << processor->getUUIDStr(); + logger_->log_info("Creating %s connection for proc %d",connection_name.str(),processor_queue_.size()+1); std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(flow_repo_, content_repo_, connection_name.str()); connection->setRelationship(relationship); @@ -114,11 +116,13 @@ bool linkToPrevious) { bool TestPlan::setProperty(const std::shared_ptr<core::Processor> proc, const std::string &prop, const std::string &value) { std::lock_guard<std::recursive_mutex> guard(mutex); int i = 0; + logger_->log_info("Attempting to set property %s %s for %s",prop,value,proc->getName()); for (i = 0; i < processor_queue_.size(); i++) { if (processor_queue_.at(i) == proc) { break; } } + if (i >= processor_queue_.size() || i < 0 || i >= processor_contexts_.size()) { return false; } @@ -142,6 +146,7 @@ bool TestPlan::runNextProcessor(std::function<void(core::ProcessContext*, core:: if (!finalized) { finalize(); } + logger_->log_info("Running next processor %d, processor_queue_.size %d, processor_contexts_.size %d", location, processor_queue_.size(), processor_contexts_.size()); std::lock_guard<std::recursive_mutex> guard(mutex); location++; std::shared_ptr<core::Processor> processor = processor_queue_.at(location); @@ -159,6 +164,7 @@ bool TestPlan::runNextProcessor(std::function<void(core::ProcessContext*, core:: if (verify != nullptr) { verify(context.get(), current_session.get()); } else { + logger_->log_info("Running %s", processor->getName()); processor->onTrigger(context.get(), current_session.get()); } current_session->commit(); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/5fca46f7/libminifi/test/TestBase.h ---------------------------------------------------------------------- diff --git a/libminifi/test/TestBase.h b/libminifi/test/TestBase.h index 47db4c3..7e2a5d7 100644 --- a/libminifi/test/TestBase.h +++ b/libminifi/test/TestBase.h @@ -184,6 +184,9 @@ class TestPlan { protected: + + std::shared_ptr<logging::Logger> logger_; + void finalize(); std::shared_ptr<minifi::Connection> buildFinalConnection(std::shared_ptr<core::Processor> processor, bool setDest = false); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/5fca46f7/libminifi/test/unit/PutFileTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/PutFileTests.cpp b/libminifi/test/unit/PutFileTests.cpp new file mode 100644 index 0000000..c18c72c --- /dev/null +++ b/libminifi/test/unit/PutFileTests.cpp @@ -0,0 +1,326 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#define CATCH_CONFIG_MAIN // This tells Catch to provide a main() - only do this in one cpp file +#include <uuid/uuid.h> +#include <sys/stat.h> +#include <utility> +#include <memory> +#include <string> +#include <vector> +#include <set> +#include <fstream> + +#include "../TestBase.h" +#include "processors/ListenHTTP.h" +#include "processors/LogAttribute.h" +#include "processors/GetFile.h" +#include "processors/PutFile.h" +#include "../unit/ProvenanceTestHelper.h" +#include "core/Core.h" +#include "core/FlowFile.h" +#include "core/Processor.h" +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" +#include "core/ProcessorNode.h" +#include "core/reporting/SiteToSiteProvenanceReportingTask.h" + +TEST_CASE("Test Creation of PutFile", "[getfileCreate]") { + TestController testController; + std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::PutFile>("processorname"); + REQUIRE(processor->getName() == "processorname"); +} + +uint64_t getModificationTime(std::string filename) { + struct stat result; + if (stat(filename.c_str(), &result) == 0) { +#if !defined(_POSIX_C_SOURCE) || defined(_DARWIN_C_SOURCE) + return result.st_mtimespec.tv_sec; +#else + return result.st_mtime; +#endif + } + return 0; +} + +TEST_CASE("PutFileTest", "[getfileputpfile]") { + TestController testController; + + LogTestController::getInstance().setDebug<minifi::processors::GetFile>(); + LogTestController::getInstance().setDebug<TestPlan>(); + LogTestController::getInstance().setDebug<minifi::processors::PutFile>(); + LogTestController::getInstance().setDebug<minifi::processors::PutFile::ReadCallback>(); + LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>(); + + std::shared_ptr<TestPlan> plan = testController.createPlan(); + + std::shared_ptr<core::Processor> getfile = plan->addProcessor("GetFile", "getfileCreate2"); + + std::shared_ptr<core::Processor> putfile = plan->addProcessor("PutFile", "putfile", core::Relationship("success", "description"), true); + + plan->addProcessor("LogAttribute", "logattribute", core::Relationship("success", "description"), true); + + char format[] = "/tmp/gt.XXXXXX"; + char *dir = testController.createTempDirectory(format); + char format2[] = "/tmp/ft.XXXXXX"; + char *putfiledir = testController.createTempDirectory(format2); + plan->setProperty(getfile, org::apache::nifi::minifi::processors::GetFile::Directory.getName(), dir); + plan->setProperty(putfile, org::apache::nifi::minifi::processors::PutFile::Directory.getName(), putfiledir); + + testController.runSession(plan, false); + + std::set<provenance::ProvenanceEventRecord*> records = plan->getProvenanceRecords(); + std::shared_ptr<core::FlowFile> record = plan->getCurrentFlowFile(); + REQUIRE(record == nullptr); + REQUIRE(records.size() == 0); + + std::fstream file; + std::stringstream ss; + ss << dir << "/" << "tstFile.ext"; + file.open(ss.str(), std::ios::out); + file << "tempFile"; + file.close(); + plan->reset(); + + testController.runSession(plan, false); + + testController.runSession(plan, false); + + records = plan->getProvenanceRecords(); + record = plan->getCurrentFlowFile(); + testController.runSession(plan, false); + + unlink(ss.str().c_str()); + + REQUIRE(true == LogTestController::getInstance().contains("key:absolute.path value:" + ss.str())); + REQUIRE(true == LogTestController::getInstance().contains("Size:8 Offset:0")); + REQUIRE(true == LogTestController::getInstance().contains("key:path value:" + std::string(dir))); + // verify that the fle was moved + REQUIRE(false == std::ifstream(ss.str()).good()); + std::stringstream movedFile; + movedFile << putfiledir << "/" << "tstFile.ext"; + REQUIRE(true == std::ifstream(movedFile.str()).good()); + + file.open(movedFile.str(), std::ios::in); + std::string contents((std::istreambuf_iterator<char>(file)), + std::istreambuf_iterator<char>()); + REQUIRE("tempFile" == contents); + file.close(); + LogTestController::getInstance().reset(); +} + +TEST_CASE("PutFileTestFileExists", "[getfileputpfile]") { + TestController testController; + + LogTestController::getInstance().setDebug<minifi::processors::GetFile>(); + LogTestController::getInstance().setDebug<TestPlan>(); + LogTestController::getInstance().setDebug<minifi::processors::PutFile>(); + LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>(); + + std::shared_ptr<TestPlan> plan = testController.createPlan(); + + std::shared_ptr<core::Processor> getfile = plan->addProcessor("GetFile", "getfileCreate2"); + + std::shared_ptr<core::Processor> putfile = plan->addProcessor("PutFile", "putfile", core::Relationship("success", "description"), true); + + plan->addProcessor("LogAttribute", "logattribute", core::Relationship("failure", "description"), true); + + char format[] = "/tmp/gt.XXXXXX"; + char *dir = testController.createTempDirectory(format); + char format2[] = "/tmp/ft.XXXXXX"; + char *putfiledir = testController.createTempDirectory(format2); + plan->setProperty(getfile, org::apache::nifi::minifi::processors::GetFile::Directory.getName(), dir); + plan->setProperty(putfile, org::apache::nifi::minifi::processors::PutFile::Directory.getName(), putfiledir); + + testController.runSession(plan, false); + + std::set<provenance::ProvenanceEventRecord*> records = plan->getProvenanceRecords(); + std::shared_ptr<core::FlowFile> record = plan->getCurrentFlowFile(); + REQUIRE(record == nullptr); + REQUIRE(records.size() == 0); + + std::fstream file; + std::stringstream ss; + ss << dir << "/" << "tstFile.ext"; + file.open(ss.str(), std::ios::out); + file << "tempFile"; + file.close(); +// + std::stringstream movedFile; + movedFile << putfiledir << "/" << "tstFile.ext"; + file.open(movedFile.str(), std::ios::out); + file << "tempFile"; + file.close(); + + plan->reset(); + + testController.runSession(plan, false); + + testController.runSession(plan, false); + + records = plan->getProvenanceRecords(); + record = plan->getCurrentFlowFile(); + testController.runSession(plan, false); + + unlink(ss.str().c_str()); + + REQUIRE(true == LogTestController::getInstance().contains("key:absolute.path value:" + ss.str())); + REQUIRE(true == LogTestController::getInstance().contains("Size:8 Offset:0")); + REQUIRE(true == LogTestController::getInstance().contains("key:path value:" + std::string(dir))); + // verify that the fle was moved + REQUIRE(false == std::ifstream(ss.str()).good()); + REQUIRE(true == std::ifstream(movedFile.str()).good()); + + LogTestController::getInstance().reset(); +} + +TEST_CASE("PutFileTestFileExistsIgnore", "[getfileputpfile]") { + TestController testController; + + LogTestController::getInstance().setDebug<minifi::processors::GetFile>(); + LogTestController::getInstance().setDebug<TestPlan>(); + LogTestController::getInstance().setDebug<minifi::processors::PutFile>(); + LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>(); + + std::shared_ptr<TestPlan> plan = testController.createPlan(); + + std::shared_ptr<core::Processor> getfile = plan->addProcessor("GetFile", "getfileCreate2"); + + std::shared_ptr<core::Processor> putfile = plan->addProcessor("PutFile", "putfile", core::Relationship("success", "description"), true); + + plan->addProcessor("LogAttribute", "logattribute", core::Relationship("success", "description"), true); + + char format[] = "/tmp/gt.XXXXXX"; + char *dir = testController.createTempDirectory(format); + char format2[] = "/tmp/ft.XXXXXX"; + char *putfiledir = testController.createTempDirectory(format2); + plan->setProperty(getfile, org::apache::nifi::minifi::processors::GetFile::Directory.getName(), dir); + plan->setProperty(putfile, org::apache::nifi::minifi::processors::PutFile::Directory.getName(), putfiledir); + plan->setProperty(putfile, org::apache::nifi::minifi::processors::PutFile::ConflictResolution.getName(), "ignore"); + + testController.runSession(plan, false); + + std::set<provenance::ProvenanceEventRecord*> records = plan->getProvenanceRecords(); + std::shared_ptr<core::FlowFile> record = plan->getCurrentFlowFile(); + REQUIRE(record == nullptr); + REQUIRE(records.size() == 0); + + std::fstream file; + std::stringstream ss; + ss << dir << "/" << "tstFile.ext"; + file.open(ss.str(), std::ios::out); + file << "tempFile"; + file.close(); +// + std::stringstream movedFile; + movedFile << putfiledir << "/" << "tstFile.ext"; + file.open(movedFile.str(), std::ios::out); + file << "tempFile"; + file.close(); + uint64_t filemodtime = getModificationTime(movedFile.str()); + + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + plan->reset(); + + testController.runSession(plan, false); + + testController.runSession(plan, false); + + records = plan->getProvenanceRecords(); + record = plan->getCurrentFlowFile(); + testController.runSession(plan, false); + + unlink(ss.str().c_str()); + + REQUIRE(true == LogTestController::getInstance().contains("key:absolute.path value:" + ss.str())); + REQUIRE(true == LogTestController::getInstance().contains("Size:8 Offset:0")); + REQUIRE(true == LogTestController::getInstance().contains("key:path value:" + std::string(dir))); + // verify that the fle was moved + REQUIRE(false == std::ifstream(ss.str()).good()); + REQUIRE(true == std::ifstream(movedFile.str()).good()); + REQUIRE(filemodtime == getModificationTime(movedFile.str())); + LogTestController::getInstance().reset(); +} + +TEST_CASE("PutFileTestFileExistsReplace", "[getfileputpfile]") { + TestController testController; + + LogTestController::getInstance().setDebug<minifi::processors::GetFile>(); + LogTestController::getInstance().setDebug<TestPlan>(); + LogTestController::getInstance().setDebug<minifi::processors::PutFile>(); + LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>(); + + std::shared_ptr<TestPlan> plan = testController.createPlan(); + + std::shared_ptr<core::Processor> getfile = plan->addProcessor("GetFile", "getfileCreate2"); + + std::shared_ptr<core::Processor> putfile = plan->addProcessor("PutFile", "putfile", core::Relationship("success", "description"), true); + + plan->addProcessor("LogAttribute", "logattribute", core::Relationship("success", "description"), true); + + char format[] = "/tmp/gt.XXXXXX"; + char *dir = testController.createTempDirectory(format); + char format2[] = "/tmp/ft.XXXXXX"; + char *putfiledir = testController.createTempDirectory(format2); + plan->setProperty(getfile, org::apache::nifi::minifi::processors::GetFile::Directory.getName(), dir); + plan->setProperty(putfile, org::apache::nifi::minifi::processors::PutFile::Directory.getName(), putfiledir); + plan->setProperty(putfile, org::apache::nifi::minifi::processors::PutFile::ConflictResolution.getName(), "replace"); + + testController.runSession(plan, false); + + std::set<provenance::ProvenanceEventRecord*> records = plan->getProvenanceRecords(); + std::shared_ptr<core::FlowFile> record = plan->getCurrentFlowFile(); + REQUIRE(record == nullptr); + REQUIRE(records.size() == 0); + + std::fstream file; + std::stringstream ss; + ss << dir << "/" << "tstFile.ext"; + file.open(ss.str(), std::ios::out); + file << "tempFile"; + file.close(); +// + std::stringstream movedFile; + movedFile << putfiledir << "/" << "tstFile.ext"; + file.open(movedFile.str(), std::ios::out); + file << "tempFile"; + file.close(); + uint64_t filemodtime = getModificationTime(movedFile.str()); + + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + plan->reset(); + + testController.runSession(plan, false); + + testController.runSession(plan, false); + + records = plan->getProvenanceRecords(); + record = plan->getCurrentFlowFile(); + testController.runSession(plan, false); + + unlink(ss.str().c_str()); + + REQUIRE(true == LogTestController::getInstance().contains("key:absolute.path value:" + ss.str())); + REQUIRE(true == LogTestController::getInstance().contains("Size:8 Offset:0")); + REQUIRE(true == LogTestController::getInstance().contains("key:path value:" + std::string(dir))); + // verify that the fle was moved + REQUIRE(false == std::ifstream(ss.str()).good()); + REQUIRE(true == std::ifstream(movedFile.str()).good()); + REQUIRE(filemodtime != getModificationTime(movedFile.str())); + LogTestController::getInstance().reset(); +} +