Repository: nifi-minifi-cpp
Updated Branches:
  refs/heads/master dad324142 -> 5fca46f7c


MINIFI-359: Add PutFile test to test a variety of conditions for the user 
provided input

Signed-off-by: Bin Qiu <b...@apache.org>

This closes #122


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/5fca46f7
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/5fca46f7
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/5fca46f7

Branch: refs/heads/master
Commit: 5fca46f7c7ee9059feb1b4926f769b932a186964
Parents: dad3241
Author: Marc Parisi <phroc...@apache.org>
Authored: Mon Jul 31 10:57:28 2017 -0400
Committer: Bin Qiu <benqiu2...@gmail.com>
Committed: Wed Aug 2 14:10:56 2017 -0700

----------------------------------------------------------------------
 libminifi/src/processors/PutFile.cpp |  19 +-
 libminifi/test/TestBase.cpp          |   8 +-
 libminifi/test/TestBase.h            |   3 +
 libminifi/test/unit/PutFileTests.cpp | 326 ++++++++++++++++++++++++++++++
 4 files changed, 346 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/5fca46f7/libminifi/src/processors/PutFile.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/PutFile.cpp 
b/libminifi/src/processors/PutFile.cpp
index 824c7d1..80aea6c 100644
--- a/libminifi/src/processors/PutFile.cpp
+++ b/libminifi/src/processors/PutFile.cpp
@@ -18,7 +18,7 @@
  * limitations under the License.
  */
 
-#include "../../include/processors/PutFile.h"
+#include "processors/PutFile.h"
 
 #include <sys/stat.h>
 #include <unistd.h>
@@ -30,13 +30,13 @@
 #include <set>
 #include <string>
 
-#include "../../include/core/logging/Logger.h"
-#include "../../include/core/ProcessContext.h"
-#include "../../include/core/Property.h"
-#include "../../include/core/Relationship.h"
-#include "../../include/io/BaseStream.h"
-#include "../../include/io/DataStream.h"
-#include "../../include/io/validation.h"
+#include "core/logging/Logger.h"
+#include "core/ProcessContext.h"
+#include "core/Property.h"
+#include "core/Relationship.h"
+#include "io/BaseStream.h"
+#include "io/DataStream.h"
+#include "io/validation.h"
 
 namespace org {
 namespace apache {
@@ -82,7 +82,7 @@ void PutFile::onTrigger(core::ProcessContext *context, 
core::ProcessSession *ses
     return;
   }
 
-  std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast < 
FlowFileRecord > (session->get());
+  std::shared_ptr<FlowFileRecord> flowFile = 
std::static_pointer_cast<FlowFileRecord>(session->get());
 
   // Do nothing if there are no incoming files
   if (!flowFile) {
@@ -131,6 +131,7 @@ bool PutFile::putFile(core::ProcessSession *session, 
std::shared_ptr<FlowFileRec
   ReadCallback cb(tmpFile, destFile);
   session->read(flowFile, &cb);
 
+  logger_->log_info("Committing %s", destFile);
   if (cb.commit()) {
     session->transfer(flowFile, Success);
     return true;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/5fca46f7/libminifi/test/TestBase.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/TestBase.cpp b/libminifi/test/TestBase.cpp
index 4c0814d..a471afe 100644
--- a/libminifi/test/TestBase.cpp
+++ b/libminifi/test/TestBase.cpp
@@ -29,7 +29,8 @@ TestPlan::TestPlan(std::shared_ptr<core::ContentRepository> 
content_repo, std::s
       prov_repo_(prov_repo),
       location(-1),
       finalized(false),
-      current_flowfile_(nullptr) {
+      current_flowfile_(nullptr),
+      logger_(logging::LoggerFactory<TestPlan>::getLogger()) {
 }
 
 std::shared_ptr<core::Processor> TestPlan::addProcessor(const 
std::shared_ptr<core::Processor> &processor, const std::string &name, 
core::Relationship relationship,
@@ -59,6 +60,7 @@ bool linkToPrevious) {
 
     std::stringstream connection_name;
     connection_name << last->getUUIDStr() << "-to-" << processor->getUUIDStr();
+    logger_->log_info("Creating %s connection for proc 
%d",connection_name.str(),processor_queue_.size()+1);
     std::shared_ptr<minifi::Connection> connection = 
std::make_shared<minifi::Connection>(flow_repo_, content_repo_, 
connection_name.str());
     connection->setRelationship(relationship);
 
@@ -114,11 +116,13 @@ bool linkToPrevious) {
 bool TestPlan::setProperty(const std::shared_ptr<core::Processor> proc, const 
std::string &prop, const std::string &value) {
   std::lock_guard<std::recursive_mutex> guard(mutex);
   int i = 0;
+  logger_->log_info("Attempting to set property %s %s for 
%s",prop,value,proc->getName());
   for (i = 0; i < processor_queue_.size(); i++) {
     if (processor_queue_.at(i) == proc) {
       break;
     }
   }
+
   if (i >= processor_queue_.size() || i < 0 || i >= 
processor_contexts_.size()) {
     return false;
   }
@@ -142,6 +146,7 @@ bool 
TestPlan::runNextProcessor(std::function<void(core::ProcessContext*, core::
   if (!finalized) {
     finalize();
   }
+  logger_->log_info("Running next processor %d, processor_queue_.size %d, 
processor_contexts_.size %d", location, processor_queue_.size(), 
processor_contexts_.size());
   std::lock_guard<std::recursive_mutex> guard(mutex);
   location++;
   std::shared_ptr<core::Processor> processor = processor_queue_.at(location);
@@ -159,6 +164,7 @@ bool 
TestPlan::runNextProcessor(std::function<void(core::ProcessContext*, core::
   if (verify != nullptr) {
     verify(context.get(), current_session.get());
   } else {
+    logger_->log_info("Running %s", processor->getName());
     processor->onTrigger(context.get(), current_session.get());
   }
   current_session->commit();

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/5fca46f7/libminifi/test/TestBase.h
----------------------------------------------------------------------
diff --git a/libminifi/test/TestBase.h b/libminifi/test/TestBase.h
index 47db4c3..7e2a5d7 100644
--- a/libminifi/test/TestBase.h
+++ b/libminifi/test/TestBase.h
@@ -184,6 +184,9 @@ class TestPlan {
 
  protected:
 
+
+  std::shared_ptr<logging::Logger> logger_;
+
   void finalize();
 
   std::shared_ptr<minifi::Connection> 
buildFinalConnection(std::shared_ptr<core::Processor> processor, bool setDest = 
false);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/5fca46f7/libminifi/test/unit/PutFileTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/PutFileTests.cpp 
b/libminifi/test/unit/PutFileTests.cpp
new file mode 100644
index 0000000..c18c72c
--- /dev/null
+++ b/libminifi/test/unit/PutFileTests.cpp
@@ -0,0 +1,326 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#define CATCH_CONFIG_MAIN  // This tells Catch to provide a main() - only do 
this in one cpp file
+#include <uuid/uuid.h>
+#include <sys/stat.h>
+#include <utility>
+#include <memory>
+#include <string>
+#include <vector>
+#include <set>
+#include <fstream>
+
+#include "../TestBase.h"
+#include "processors/ListenHTTP.h"
+#include "processors/LogAttribute.h"
+#include "processors/GetFile.h"
+#include "processors/PutFile.h"
+#include "../unit/ProvenanceTestHelper.h"
+#include "core/Core.h"
+#include "core/FlowFile.h"
+#include "core/Processor.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/ProcessorNode.h"
+#include "core/reporting/SiteToSiteProvenanceReportingTask.h"
+
+TEST_CASE("Test Creation of PutFile", "[getfileCreate]") {
+  TestController testController;
+  std::shared_ptr<core::Processor> processor = 
std::make_shared<org::apache::nifi::minifi::processors::PutFile>("processorname");
+  REQUIRE(processor->getName() == "processorname");
+}
+
+uint64_t getModificationTime(std::string filename) {
+  struct stat result;
+  if (stat(filename.c_str(), &result) == 0) {
+#if !defined(_POSIX_C_SOURCE) || defined(_DARWIN_C_SOURCE)
+    return result.st_mtimespec.tv_sec;
+#else
+    return result.st_mtime;
+#endif
+  }
+  return 0;
+}
+
+TEST_CASE("PutFileTest", "[getfileputpfile]") {
+  TestController testController;
+
+  LogTestController::getInstance().setDebug<minifi::processors::GetFile>();
+  LogTestController::getInstance().setDebug<TestPlan>();
+  LogTestController::getInstance().setDebug<minifi::processors::PutFile>();
+  
LogTestController::getInstance().setDebug<minifi::processors::PutFile::ReadCallback>();
+  
LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>();
+
+  std::shared_ptr<TestPlan> plan = testController.createPlan();
+
+  std::shared_ptr<core::Processor> getfile = plan->addProcessor("GetFile", 
"getfileCreate2");
+
+  std::shared_ptr<core::Processor> putfile = plan->addProcessor("PutFile", 
"putfile", core::Relationship("success", "description"), true);
+
+  plan->addProcessor("LogAttribute", "logattribute", 
core::Relationship("success", "description"), true);
+
+  char format[] = "/tmp/gt.XXXXXX";
+  char *dir = testController.createTempDirectory(format);
+  char format2[] = "/tmp/ft.XXXXXX";
+  char *putfiledir = testController.createTempDirectory(format2);
+  plan->setProperty(getfile, 
org::apache::nifi::minifi::processors::GetFile::Directory.getName(), dir);
+  plan->setProperty(putfile, 
org::apache::nifi::minifi::processors::PutFile::Directory.getName(), 
putfiledir);
+
+  testController.runSession(plan, false);
+
+  std::set<provenance::ProvenanceEventRecord*> records = 
plan->getProvenanceRecords();
+  std::shared_ptr<core::FlowFile> record = plan->getCurrentFlowFile();
+  REQUIRE(record == nullptr);
+  REQUIRE(records.size() == 0);
+
+  std::fstream file;
+  std::stringstream ss;
+  ss << dir << "/" << "tstFile.ext";
+  file.open(ss.str(), std::ios::out);
+  file << "tempFile";
+  file.close();
+  plan->reset();
+
+  testController.runSession(plan, false);
+
+  testController.runSession(plan, false);
+
+  records = plan->getProvenanceRecords();
+  record = plan->getCurrentFlowFile();
+  testController.runSession(plan, false);
+
+  unlink(ss.str().c_str());
+
+  REQUIRE(true == LogTestController::getInstance().contains("key:absolute.path 
value:" + ss.str()));
+  REQUIRE(true == LogTestController::getInstance().contains("Size:8 
Offset:0"));
+  REQUIRE(true == LogTestController::getInstance().contains("key:path value:" 
+ std::string(dir)));
+  // verify that the fle was moved
+  REQUIRE(false == std::ifstream(ss.str()).good());
+  std::stringstream movedFile;
+  movedFile << putfiledir << "/" << "tstFile.ext";
+  REQUIRE(true == std::ifstream(movedFile.str()).good());
+
+  file.open(movedFile.str(), std::ios::in);
+  std::string contents((std::istreambuf_iterator<char>(file)),
+                       std::istreambuf_iterator<char>());
+  REQUIRE("tempFile" == contents);
+  file.close();
+  LogTestController::getInstance().reset();
+}
+
+TEST_CASE("PutFileTestFileExists", "[getfileputpfile]") {
+  TestController testController;
+
+  LogTestController::getInstance().setDebug<minifi::processors::GetFile>();
+  LogTestController::getInstance().setDebug<TestPlan>();
+  LogTestController::getInstance().setDebug<minifi::processors::PutFile>();
+  
LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>();
+
+  std::shared_ptr<TestPlan> plan = testController.createPlan();
+
+  std::shared_ptr<core::Processor> getfile = plan->addProcessor("GetFile", 
"getfileCreate2");
+
+  std::shared_ptr<core::Processor> putfile = plan->addProcessor("PutFile", 
"putfile", core::Relationship("success", "description"), true);
+
+  plan->addProcessor("LogAttribute", "logattribute", 
core::Relationship("failure", "description"), true);
+
+  char format[] = "/tmp/gt.XXXXXX";
+  char *dir = testController.createTempDirectory(format);
+  char format2[] = "/tmp/ft.XXXXXX";
+  char *putfiledir = testController.createTempDirectory(format2);
+  plan->setProperty(getfile, 
org::apache::nifi::minifi::processors::GetFile::Directory.getName(), dir);
+  plan->setProperty(putfile, 
org::apache::nifi::minifi::processors::PutFile::Directory.getName(), 
putfiledir);
+
+  testController.runSession(plan, false);
+
+  std::set<provenance::ProvenanceEventRecord*> records = 
plan->getProvenanceRecords();
+  std::shared_ptr<core::FlowFile> record = plan->getCurrentFlowFile();
+  REQUIRE(record == nullptr);
+  REQUIRE(records.size() == 0);
+
+  std::fstream file;
+  std::stringstream ss;
+  ss << dir << "/" << "tstFile.ext";
+  file.open(ss.str(), std::ios::out);
+  file << "tempFile";
+  file.close();
+//
+  std::stringstream movedFile;
+  movedFile << putfiledir << "/" << "tstFile.ext";
+  file.open(movedFile.str(), std::ios::out);
+  file << "tempFile";
+  file.close();
+
+  plan->reset();
+
+  testController.runSession(plan, false);
+
+  testController.runSession(plan, false);
+
+  records = plan->getProvenanceRecords();
+  record = plan->getCurrentFlowFile();
+  testController.runSession(plan, false);
+
+  unlink(ss.str().c_str());
+
+  REQUIRE(true == LogTestController::getInstance().contains("key:absolute.path 
value:" + ss.str()));
+  REQUIRE(true == LogTestController::getInstance().contains("Size:8 
Offset:0"));
+  REQUIRE(true == LogTestController::getInstance().contains("key:path value:" 
+ std::string(dir)));
+  // verify that the fle was moved
+  REQUIRE(false == std::ifstream(ss.str()).good());
+  REQUIRE(true == std::ifstream(movedFile.str()).good());
+
+  LogTestController::getInstance().reset();
+}
+
+TEST_CASE("PutFileTestFileExistsIgnore", "[getfileputpfile]") {
+  TestController testController;
+
+  LogTestController::getInstance().setDebug<minifi::processors::GetFile>();
+  LogTestController::getInstance().setDebug<TestPlan>();
+  LogTestController::getInstance().setDebug<minifi::processors::PutFile>();
+  
LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>();
+
+  std::shared_ptr<TestPlan> plan = testController.createPlan();
+
+  std::shared_ptr<core::Processor> getfile = plan->addProcessor("GetFile", 
"getfileCreate2");
+
+  std::shared_ptr<core::Processor> putfile = plan->addProcessor("PutFile", 
"putfile", core::Relationship("success", "description"), true);
+
+  plan->addProcessor("LogAttribute", "logattribute", 
core::Relationship("success", "description"), true);
+
+  char format[] = "/tmp/gt.XXXXXX";
+  char *dir = testController.createTempDirectory(format);
+  char format2[] = "/tmp/ft.XXXXXX";
+  char *putfiledir = testController.createTempDirectory(format2);
+  plan->setProperty(getfile, 
org::apache::nifi::minifi::processors::GetFile::Directory.getName(), dir);
+  plan->setProperty(putfile, 
org::apache::nifi::minifi::processors::PutFile::Directory.getName(), 
putfiledir);
+  plan->setProperty(putfile, 
org::apache::nifi::minifi::processors::PutFile::ConflictResolution.getName(), 
"ignore");
+
+  testController.runSession(plan, false);
+
+  std::set<provenance::ProvenanceEventRecord*> records = 
plan->getProvenanceRecords();
+  std::shared_ptr<core::FlowFile> record = plan->getCurrentFlowFile();
+  REQUIRE(record == nullptr);
+  REQUIRE(records.size() == 0);
+
+  std::fstream file;
+  std::stringstream ss;
+  ss << dir << "/" << "tstFile.ext";
+  file.open(ss.str(), std::ios::out);
+  file << "tempFile";
+  file.close();
+//
+  std::stringstream movedFile;
+  movedFile << putfiledir << "/" << "tstFile.ext";
+  file.open(movedFile.str(), std::ios::out);
+  file << "tempFile";
+  file.close();
+  uint64_t filemodtime = getModificationTime(movedFile.str());
+
+  std::this_thread::sleep_for(std::chrono::milliseconds(1000));
+  plan->reset();
+
+  testController.runSession(plan, false);
+
+  testController.runSession(plan, false);
+
+  records = plan->getProvenanceRecords();
+  record = plan->getCurrentFlowFile();
+  testController.runSession(plan, false);
+
+  unlink(ss.str().c_str());
+
+  REQUIRE(true == LogTestController::getInstance().contains("key:absolute.path 
value:" + ss.str()));
+  REQUIRE(true == LogTestController::getInstance().contains("Size:8 
Offset:0"));
+  REQUIRE(true == LogTestController::getInstance().contains("key:path value:" 
+ std::string(dir)));
+  // verify that the fle was moved
+  REQUIRE(false == std::ifstream(ss.str()).good());
+  REQUIRE(true == std::ifstream(movedFile.str()).good());
+  REQUIRE(filemodtime == getModificationTime(movedFile.str()));
+  LogTestController::getInstance().reset();
+}
+
+TEST_CASE("PutFileTestFileExistsReplace", "[getfileputpfile]") {
+  TestController testController;
+
+  LogTestController::getInstance().setDebug<minifi::processors::GetFile>();
+  LogTestController::getInstance().setDebug<TestPlan>();
+  LogTestController::getInstance().setDebug<minifi::processors::PutFile>();
+  
LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>();
+
+  std::shared_ptr<TestPlan> plan = testController.createPlan();
+
+  std::shared_ptr<core::Processor> getfile = plan->addProcessor("GetFile", 
"getfileCreate2");
+
+  std::shared_ptr<core::Processor> putfile = plan->addProcessor("PutFile", 
"putfile", core::Relationship("success", "description"), true);
+
+  plan->addProcessor("LogAttribute", "logattribute", 
core::Relationship("success", "description"), true);
+
+  char format[] = "/tmp/gt.XXXXXX";
+  char *dir = testController.createTempDirectory(format);
+  char format2[] = "/tmp/ft.XXXXXX";
+  char *putfiledir = testController.createTempDirectory(format2);
+  plan->setProperty(getfile, 
org::apache::nifi::minifi::processors::GetFile::Directory.getName(), dir);
+  plan->setProperty(putfile, 
org::apache::nifi::minifi::processors::PutFile::Directory.getName(), 
putfiledir);
+  plan->setProperty(putfile, 
org::apache::nifi::minifi::processors::PutFile::ConflictResolution.getName(), 
"replace");
+
+  testController.runSession(plan, false);
+
+  std::set<provenance::ProvenanceEventRecord*> records = 
plan->getProvenanceRecords();
+  std::shared_ptr<core::FlowFile> record = plan->getCurrentFlowFile();
+  REQUIRE(record == nullptr);
+  REQUIRE(records.size() == 0);
+
+  std::fstream file;
+  std::stringstream ss;
+  ss << dir << "/" << "tstFile.ext";
+  file.open(ss.str(), std::ios::out);
+  file << "tempFile";
+  file.close();
+//
+  std::stringstream movedFile;
+  movedFile << putfiledir << "/" << "tstFile.ext";
+  file.open(movedFile.str(), std::ios::out);
+  file << "tempFile";
+  file.close();
+  uint64_t filemodtime = getModificationTime(movedFile.str());
+
+  std::this_thread::sleep_for(std::chrono::milliseconds(1000));
+  plan->reset();
+
+  testController.runSession(plan, false);
+
+  testController.runSession(plan, false);
+
+  records = plan->getProvenanceRecords();
+  record = plan->getCurrentFlowFile();
+  testController.runSession(plan, false);
+
+  unlink(ss.str().c_str());
+
+  REQUIRE(true == LogTestController::getInstance().contains("key:absolute.path 
value:" + ss.str()));
+  REQUIRE(true == LogTestController::getInstance().contains("Size:8 
Offset:0"));
+  REQUIRE(true == LogTestController::getInstance().contains("key:path value:" 
+ std::string(dir)));
+  // verify that the fle was moved
+  REQUIRE(false == std::ifstream(ss.str()).good());
+  REQUIRE(true == std::ifstream(movedFile.str()).good());
+  REQUIRE(filemodtime != getModificationTime(movedFile.str()));
+  LogTestController::getInstance().reset();
+}
+

Reply via email to