MINIFI-227: Add test case

Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/a94ac905
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/a94ac905
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/a94ac905

Branch: refs/heads/MINIFI-227
Commit: a94ac90587639927b0967b950a77278017cd2541
Parents: eac25c2
Author: Bin Qiu <benqiu2...@gmail.com>
Authored: Mon Apr 10 16:38:06 2017 -0700
Committer: Bin Qiu <benqiu2...@gmail.com>
Committed: Mon Apr 10 16:38:06 2017 -0700

----------------------------------------------------------------------
 .../include/provenance/ProvenanceTaskReport.h   |   7 +-
 .../src/provenance/ProvenanceTaskReport.cpp     | 106 ++++++++++---------
 libminifi/test/unit/ProcessorTests.cpp          |  98 ++++++++++-------
 libminifi/test/unit/ProvenanceTestHelper.h      |  44 +++++---
 4 files changed, 154 insertions(+), 101 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a94ac905/libminifi/include/provenance/ProvenanceTaskReport.h
----------------------------------------------------------------------
diff --git a/libminifi/include/provenance/ProvenanceTaskReport.h 
b/libminifi/include/provenance/ProvenanceTaskReport.h
index 4123593..932ed39 100644
--- a/libminifi/include/provenance/ProvenanceTaskReport.h
+++ b/libminifi/include/provenance/ProvenanceTaskReport.h
@@ -44,7 +44,8 @@ public:
        ProvenanceTaskReport(std::string name, uuid_t uuid = NULL) :
                        core::Processor(name, uuid) {
                logger_ = logging::Logger::getLogger();
-               uuid_copy(protocol_uuid_,uuid);
+               if (uuid)
+                 uuid_copy(protocol_uuid_,uuid);
                this->setTriggerWhenEmpty(true);
        }
        //! Destructor
@@ -62,6 +63,10 @@ public:
        static core::Relationship relation;
        static const char *ProvenanceAppStr;
 public:
+       //! Get provenance jason report
+       void getJasonReport(core::ProcessContext *context,
+           core::ProcessSession *session, std::vector < std::shared_ptr < 
ProvenanceEventRecord >> &records,
+           std::string &report);
        //! OnTrigger method, implemented by NiFi ProvenanceTaskReport
        virtual void onTrigger(core::ProcessContext *context, 
core::ProcessSession *session);
        //! Initialize, over write by NiFi ProvenanceTaskReport

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a94ac905/libminifi/src/provenance/ProvenanceTaskReport.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/provenance/ProvenanceTaskReport.cpp 
b/libminifi/src/provenance/ProvenanceTaskReport.cpp
index 082fced..dfe81e8 100644
--- a/libminifi/src/provenance/ProvenanceTaskReport.cpp
+++ b/libminifi/src/provenance/ProvenanceTaskReport.cpp
@@ -68,6 +68,58 @@ void ProvenanceTaskReport::initialize() {
   setSupportedRelationships(relationships);
 }
 
+void ProvenanceTaskReport::getJasonReport(core::ProcessContext *context,
+    core::ProcessSession *session,
+    std::vector<std::shared_ptr<ProvenanceEventRecord>> &records,
+    std::string &report) {
+
+  Json::Value array;
+  for (auto record : records) {
+    Json::Value recordJson;
+    Json::Value updatedAttributesJson;
+    Json::Value parentUuidJson;
+    Json::Value childUuidJson;
+    recordJson["eventId"] = record->getEventId().c_str();
+    recordJson["eventType"] =
+        ProvenanceEventRecord::ProvenanceEventTypeStr[record->getEventType()];
+    recordJson["timestampMillis"] = record->getEventTime();
+    recordJson["durationMillis"] = record->getEventDuration();
+    recordJson["lineageStart"] = record->getlineageStartDate();
+    recordJson["details"] = record->getDetails().c_str();
+    recordJson["componentId"] = record->getComponentId().c_str();
+    recordJson["componentType"] = record->getComponentType().c_str();
+    recordJson["entityId"] = record->getFlowFileUuid().c_str();
+    recordJson["entityType"] = "org.apache.nifi.flowfile.FlowFile";
+    recordJson["entitySize"] = record->getFileSize();
+    recordJson["entityOffset"] = record->getFileOffset();
+
+    for (auto attr : record->getAttributes()) {
+      updatedAttributesJson[attr.first] = attr.second;
+    }
+    recordJson["updatedAttributes"] = updatedAttributesJson;
+
+    for (auto parentUUID : record->getParentUuids()) {
+      parentUuidJson.append(parentUUID.c_str());
+    }
+    recordJson["parentIds"] = parentUuidJson;
+
+    for (auto childUUID : record->getChildrenUuids()) {
+      childUuidJson.append(childUUID.c_str());
+    }
+    recordJson["childIds"] = childUuidJson;
+    recordJson["transitUri"] = record->getTransitUri().c_str();
+    recordJson["remoteIdentifier"] =
+        record->getSourceSystemFlowFileIdentifier().c_str();
+    recordJson["alternateIdentifier"] =
+        record->getAlternateIdentifierUri().c_str();
+    recordJson["application"] = ProvenanceAppStr;
+    array.append(recordJson);
+  }
+
+  Json::StyledWriter writer;
+  report = writer.write(array);
+}
+
 void ProvenanceTaskReport::onTrigger(core::ProcessContext *context,
     core::ProcessSession *session) {
   std::string value;
@@ -106,69 +158,26 @@ void ProvenanceTaskReport::onTrigger(core::ProcessContext 
*context,
   }
 
   int64_t batch = 100;
-
   if (context->getProperty(batchSize.getName(), value)
       && core::Property::StringToInt(value, lvalue)) {
     batch = lvalue;
   }
-
   std::vector < std::shared_ptr < ProvenanceEventRecord >> records;
   std::shared_ptr<ProvenanceRepository> repo = std::static_pointer_cast
       < ProvenanceRepository > (context->getProvenanceRepository());
-
   repo->getProvenanceRecord(records, batch);
-
   if (records.size() <= 0) {
     returnSite2SiteProtocol(protocol_);
     return;
   }
 
-  Json::Value array;
-  for (auto record : records) {
-    Json::Value recordJson;
-    Json::Value updatedAttributesJson;
-    Json::Value parentUuidJson;
-    Json::Value childUuidJson;
-    recordJson["eventId"] = record->getEventId().c_str();
-    recordJson["eventType"] =
-        ProvenanceEventRecord::ProvenanceEventTypeStr[record->getEventType()];
-    recordJson["timestampMillis"] = record->getEventTime();
-    recordJson["durationMillis"] = record->getEventDuration();
-    recordJson["lineageStart"] = record->getlineageStartDate();
-    recordJson["details"] = record->getDetails().c_str();
-    recordJson["componentId"] = record->getComponentId().c_str();
-    recordJson["componentType"] = record->getComponentType().c_str();
-    recordJson["entityId"] = record->getFlowFileUuid().c_str();
-    recordJson["entityType"] = "org.apache.nifi.flowfile.FlowFile";
-    recordJson["entitySize"] = record->getFileSize();
-    recordJson["entityOffset"] = record->getFileOffset();
-
-    for (auto attr : record->getAttributes()) {
-      updatedAttributesJson[attr.first] = attr.second;
-    }
-    recordJson["updatedAttributes"] = updatedAttributesJson;
-
-    for (auto parentUUID : record->getParentUuids()) {
-      parentUuidJson.append(parentUUID.c_str());
-    }
-    recordJson["parentIds"] = parentUuidJson;
-
-    for (auto childUUID : record->getChildrenUuids()) {
-      childUuidJson.append(childUUID.c_str());
-    }
-    recordJson["childIds"] = childUuidJson;
-    recordJson["transitUri"] = record->getTransitUri().c_str();
-    recordJson["remoteIdentifier"] =
-        record->getSourceSystemFlowFileIdentifier().c_str();
-    recordJson["alternateIdentifier"] =
-        record->getAlternateIdentifierUri().c_str();
-    recordJson["application"] = ProvenanceAppStr;
-    array.append(recordJson);
+  std::string jsonStr;
+  this->getJasonReport(context, session, records, jsonStr);
+  if (jsonStr.length() <= 0) {
+    returnSite2SiteProtocol(protocol_);
+    return;
   }
 
-  Json::StyledWriter writer;
-  std::string jsonStr = writer.write(array);
-
   try {
     std::map < std::string, std::string > attributes;
     protocol_->transferString(context, session, jsonStr, attributes);
@@ -179,7 +188,6 @@ void ProvenanceTaskReport::onTrigger(core::ProcessContext 
*context,
 
   // we transfer the record, purge the record from DB
   repo->purgeProvenanceRecord(records);
-
   returnSite2SiteProtocol(protocol_);
 }
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a94ac905/libminifi/test/unit/ProcessorTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/ProcessorTests.cpp 
b/libminifi/test/unit/ProcessorTests.cpp
index 87f190c..1653e2d 100644
--- a/libminifi/test/unit/ProcessorTests.cpp
+++ b/libminifi/test/unit/ProcessorTests.cpp
@@ -41,16 +41,20 @@ TEST_CASE("Test Find file", "[getfileCreate2]") {
 
   testController.enableDebug();
 
-  std::shared_ptr<core::Processor> processor = std::make_shared<
-      org::apache::nifi::minifi::processors::GetFile>("getfileCreate2");
+  std::shared_ptr<core::Processor> processor = std::make_shared
+      < org::apache::nifi::minifi::processors::GetFile > ("getfileCreate2");
+
+  std::shared_ptr<core::Processor> processorReport = std::make_shared
+      < org::apache::nifi::minifi::provenance::ProvenanceTaskReport
+      > ("provenanceTaskReport");
 
   std::shared_ptr<core::Repository> test_repo =
       std::make_shared<TestRepository>();
 
-  std::shared_ptr<TestRepository> repo =
-      std::static_pointer_cast<TestRepository>(test_repo);
-  std::shared_ptr<minifi::FlowController> controller = std::make_shared<
-      TestFlowController>(test_repo, test_repo);
+  std::shared_ptr<TestRepository> repo = std::static_pointer_cast
+      < TestRepository > (test_repo);
+  std::shared_ptr<minifi::FlowController> controller = std::make_shared
+      < TestFlowController > (test_repo, test_repo);
 
   char format[] = "/tmp/gt.XXXXXX";
   char *dir = testController.createTempDirectory(format);
@@ -58,8 +62,8 @@ TEST_CASE("Test Find file", "[getfileCreate2]") {
   uuid_t processoruuid;
   REQUIRE(true == processor->getUUID(processoruuid));
 
-  std::shared_ptr<minifi::Connection> connection = std::make_shared<
-      minifi::Connection>(test_repo, "getfileCreate2Connection");
+  std::shared_ptr<minifi::Connection> connection = std::make_shared
+      < minifi::Connection > (test_repo, "getfileCreate2Connection");
   connection->setRelationship(core::Relationship("success", "description"));
 
   // link the connections so that we can test results at the end for this
@@ -77,7 +81,7 @@ TEST_CASE("Test Find file", "[getfileCreate2]") {
   core::ProcessContext context(node, test_repo);
   core::ProcessSessionFactory factory(&context);
   
context.setProperty(org::apache::nifi::minifi::processors::GetFile::Directory,
-                      dir);
+      dir);
   core::ProcessSession session(&context);
 
   processor->onSchedule(&context, &factory);
@@ -122,7 +126,7 @@ TEST_CASE("Test Find file", "[getfileCreate2]") {
   for (auto entry : repo->getRepoMap()) {
     provenance::ProvenanceEventRecord newRecord;
     newRecord.DeSerialize((uint8_t*) entry.second.data(),
-                          entry.second.length());
+        entry.second.length());
 
     bool found = false;
     for (auto provRec : records) {
@@ -141,6 +145,26 @@ TEST_CASE("Test Find file", "[getfileCreate2]") {
 
   }
 
+  core::ProcessorNode nodeReport(processorReport);
+  core::ProcessContext contextReport(nodeReport, test_repo);
+  core::ProcessSessionFactory factoryReport(&contextReport);
+  
contextReport.setProperty(org::apache::nifi::minifi::provenance::ProvenanceTaskReport::batchSize,
+      "1");
+  core::ProcessSession sessionReport(&contextReport);
+  processorReport->onSchedule(&contextReport, &factoryReport);
+  std::shared_ptr<org::apache::nifi::minifi::provenance::ProvenanceTaskReport> 
taskReport = std::static_pointer_cast
+        < org::apache::nifi::minifi::provenance::ProvenanceTaskReport > 
(processorReport);
+  std::vector < std::shared_ptr < provenance::ProvenanceEventRecord >> 
recordsReport;
+  processorReport->incrementActiveTasks();
+  processorReport->setScheduledState(core::ScheduledState::RUNNING);
+  std::string jsonStr;
+  repo->getProvenanceRecord(recordsReport, 1);
+  taskReport->getJasonReport(&contextReport, &sessionReport, recordsReport, 
jsonStr);
+  REQUIRE(recordsReport.size() == 1);
+  REQUIRE(taskReport->getName() == "provenanceTaskReport");
+  REQUIRE(jsonStr.find("\"componentType\" : \"getfileCreate2\"") != 
std::string::npos);
+  REQUIRE(jsonStr.find("\"filename\" : \"tstFile.ext\"") != std::string::npos);
+
 }
 
 TEST_CASE("Test GetFileLikeIt'sThreaded", "[getfileCreate3]") {
@@ -149,16 +173,16 @@ TEST_CASE("Test GetFileLikeIt'sThreaded", 
"[getfileCreate3]") {
 
   testController.enableDebug();
 
-  std::shared_ptr<core::Processor> processor = std::make_shared<
-      org::apache::nifi::minifi::processors::GetFile>("getfileCreate2");
+  std::shared_ptr<core::Processor> processor = std::make_shared
+      < org::apache::nifi::minifi::processors::GetFile > ("getfileCreate2");
 
   std::shared_ptr<core::Repository> test_repo =
       std::make_shared<TestRepository>();
 
-  std::shared_ptr<TestRepository> repo =
-      std::static_pointer_cast<TestRepository>(test_repo);
-  std::shared_ptr<minifi::FlowController> controller = std::make_shared<
-      TestFlowController>(test_repo, test_repo);
+  std::shared_ptr<TestRepository> repo = std::static_pointer_cast
+      < TestRepository > (test_repo);
+  std::shared_ptr<minifi::FlowController> controller = std::make_shared
+      < TestFlowController > (test_repo, test_repo);
 
   char format[] = "/tmp/gt.XXXXXX";
   char *dir = testController.createTempDirectory(format);
@@ -166,8 +190,8 @@ TEST_CASE("Test GetFileLikeIt'sThreaded", 
"[getfileCreate3]") {
   uuid_t processoruuid;
   REQUIRE(true == processor->getUUID(processoruuid));
 
-  std::shared_ptr<minifi::Connection> connection = std::make_shared<
-      minifi::Connection>(test_repo, "getfileCreate2Connection");
+  std::shared_ptr<minifi::Connection> connection = std::make_shared
+      < minifi::Connection > (test_repo, "getfileCreate2Connection");
   connection->setRelationship(core::Relationship("success", "description"));
 
   // link the connections so that we can test results at the end for this
@@ -184,7 +208,7 @@ TEST_CASE("Test GetFileLikeIt'sThreaded", 
"[getfileCreate3]") {
   core::ProcessContext context(node, test_repo);
   core::ProcessSessionFactory factory(&context);
   
context.setProperty(org::apache::nifi::minifi::processors::GetFile::Directory,
-                      dir);
+      dir);
   // replicate 10 threads
   processor->setScheduledState(core::ScheduledState::RUNNING);
   processor->onSchedule(&context, &factory);
@@ -229,9 +253,9 @@ TEST_CASE("Test GetFileLikeIt'sThreaded", 
"[getfileCreate3]") {
     session.commit();
     std::shared_ptr<core::FlowFile> ffr = session.get();
 
-    REQUIRE((repo->getRepoMap().size()%2) == 0);
-    REQUIRE(repo->getRepoMap().size() == (prev+2));
-    prev+=2;
+    REQUIRE((repo->getRepoMap().size() % 2) == 0);
+    REQUIRE(repo->getRepoMap().size() == (prev + 2));
+    prev += 2;
 
   }
 
@@ -239,10 +263,10 @@ TEST_CASE("Test GetFileLikeIt'sThreaded", 
"[getfileCreate3]") {
 
 TEST_CASE("LogAttributeTest", "[getfileCreate3]") {
   std::ostringstream oss;
-  std::unique_ptr<logging::BaseLogger> outputLogger = std::unique_ptr<
-      logging::BaseLogger>(
-      new org::apache::nifi::minifi::core::logging::OutputStreamAppender(oss,
-                                                                         0));
+  std::unique_ptr<logging::BaseLogger> outputLogger = std::unique_ptr
+      < logging::BaseLogger
+      > (new 
org::apache::nifi::minifi::core::logging::OutputStreamAppender(oss,
+          0));
   std::shared_ptr<logging::Logger> logger = logging::Logger::getLogger();
   logger->updateLogger(std::move(outputLogger));
 
@@ -252,11 +276,11 @@ TEST_CASE("LogAttributeTest", "[getfileCreate3]") {
 
   std::shared_ptr<core::Repository> repo = std::make_shared<TestRepository>();
 
-  std::shared_ptr<core::Processor> processor = std::make_shared<
-      org::apache::nifi::minifi::processors::GetFile>("getfileCreate2");
+  std::shared_ptr<core::Processor> processor = std::make_shared
+      < org::apache::nifi::minifi::processors::GetFile > ("getfileCreate2");
 
-  std::shared_ptr<core::Processor> logAttribute = std::make_shared<
-      org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
+  std::shared_ptr<core::Processor> logAttribute = std::make_shared
+      < org::apache::nifi::minifi::processors::LogAttribute > ("logattribute");
 
   char format[] = "/tmp/gt.XXXXXX";
   char *dir = testController.createTempDirectory(format);
@@ -267,12 +291,12 @@ TEST_CASE("LogAttributeTest", "[getfileCreate3]") {
   uuid_t logattribute_uuid;
   REQUIRE(true == logAttribute->getUUID(logattribute_uuid));
 
-  std::shared_ptr<minifi::Connection> connection = std::make_shared<
-      minifi::Connection>(repo, "getfileCreate2Connection");
+  std::shared_ptr<minifi::Connection> connection = std::make_shared
+      < minifi::Connection > (repo, "getfileCreate2Connection");
   connection->setRelationship(core::Relationship("success", "description"));
 
-  std::shared_ptr<minifi::Connection> connection2 = std::make_shared<
-      minifi::Connection>(repo, "logattribute");
+  std::shared_ptr<minifi::Connection> connection2 = std::make_shared
+      < minifi::Connection > (repo, "logattribute");
   connection2->setRelationship(core::Relationship("success", "description"));
 
   // link the connections so that we can test results at the end for this
@@ -298,7 +322,7 @@ TEST_CASE("LogAttributeTest", "[getfileCreate3]") {
   core::ProcessContext context(node, repo);
   core::ProcessContext context2(node2, repo);
   
context.setProperty(org::apache::nifi::minifi::processors::GetFile::Directory,
-                      dir);
+      dir);
   core::ProcessSession session(&context);
   core::ProcessSession session2(&context2);
 
@@ -357,8 +381,8 @@ TEST_CASE("LogAttributeTest", "[getfileCreate3]") {
       log_attribute_output.find("key:path value:" + std::string(dir))
           != std::string::npos);
 
-  outputLogger = std::unique_ptr<logging::BaseLogger>(
-      new org::apache::nifi::minifi::core::logging::NullAppender());
+  outputLogger = std::unique_ptr < logging::BaseLogger
+      > (new org::apache::nifi::minifi::core::logging::NullAppender());
   logger->updateLogger(std::move(outputLogger));
 
 }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a94ac905/libminifi/test/unit/ProvenanceTestHelper.h
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/ProvenanceTestHelper.h 
b/libminifi/test/unit/ProvenanceTestHelper.h
index 80d8642..67039bf 100644
--- a/libminifi/test/unit/ProvenanceTestHelper.h
+++ b/libminifi/test/unit/ProvenanceTestHelper.h
@@ -25,10 +25,10 @@
 /**
  * Test repository
  */
-class TestRepository : public core::Repository {
- public:
-  TestRepository()
-      : Repository("repo_name", "./dir", 1000, 100, 0) {
+class TestRepository: public core::Repository {
+public:
+  TestRepository() :
+      Repository("repo_name", "./dir", 1000, 100, 0) {
   }
   // initialize
   bool initialize() {
@@ -42,8 +42,8 @@ class TestRepository : public core::Repository {
 
   bool Put(std::string key, uint8_t *buf, int bufLen) {
     repositoryResults.insert(
-        std::pair<std::string, std::string>(
-            key, std::string((const char*) buf, bufLen)));
+        std::pair<std::string, std::string>(key,
+            std::string((const char*) buf, bufLen)));
     return true;
   }
   // Delete
@@ -66,19 +66,35 @@ class TestRepository : public core::Repository {
     return repositoryResults;
   }
 
+  void getProvenanceRecord(
+      std::vector<std::shared_ptr<provenance::ProvenanceEventRecord>> &records,
+      int maxSize) {
+    for (auto entry : repositoryResults) {
+      if (records.size() >= maxSize)
+        break;
+      std::shared_ptr<provenance::ProvenanceEventRecord> eventRead =
+          std::make_shared<provenance::ProvenanceEventRecord>();
+
+      if (eventRead->DeSerialize((uint8_t*) entry.second.data(),
+          entry.second.length())) {
+        records.push_back(eventRead);
+      }
+    }
+  }
+
   void run() {
     // do nothing
   }
- protected:
+protected:
   std::map<std::string, std::string> repositoryResults;
 };
 
-class TestFlowController : public minifi::FlowController {
+class TestFlowController: public minifi::FlowController {
 
- public:
+public:
   TestFlowController(std::shared_ptr<core::Repository> repo,
-                     std::shared_ptr<core::Repository> flow_file_repo)
-      : minifi::FlowController(repo, flow_file_repo, nullptr, "",true) {
+      std::shared_ptr<core::Repository> flow_file_repo) :
+      minifi::FlowController(repo, flow_file_repo, nullptr, "", true) {
   }
   ~TestFlowController() {
 
@@ -112,7 +128,7 @@ class TestFlowController : public minifi::FlowController {
   }
 
   std::shared_ptr<core::Processor> createProcessor(std::string name,
-                                                   uuid_t uuid) {
+      uuid_t uuid) {
     return 0;
   }
 
@@ -125,10 +141,10 @@ class TestFlowController : public minifi::FlowController {
   }
 
   std::shared_ptr<minifi::Connection> createConnection(std::string name,
-                                                       uuid_t uuid) {
+      uuid_t uuid) {
     return 0;
   }
- protected:
+protected:
   void initializePaths(const std::string &adjustedFilename) {
   }
 };

Reply via email to