MINIFI-227: Add test case
Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/a94ac905 Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/a94ac905 Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/a94ac905 Branch: refs/heads/MINIFI-227 Commit: a94ac90587639927b0967b950a77278017cd2541 Parents: eac25c2 Author: Bin Qiu <benqiu2...@gmail.com> Authored: Mon Apr 10 16:38:06 2017 -0700 Committer: Bin Qiu <benqiu2...@gmail.com> Committed: Mon Apr 10 16:38:06 2017 -0700 ---------------------------------------------------------------------- .../include/provenance/ProvenanceTaskReport.h | 7 +- .../src/provenance/ProvenanceTaskReport.cpp | 106 ++++++++++--------- libminifi/test/unit/ProcessorTests.cpp | 98 ++++++++++------- libminifi/test/unit/ProvenanceTestHelper.h | 44 +++++--- 4 files changed, 154 insertions(+), 101 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a94ac905/libminifi/include/provenance/ProvenanceTaskReport.h ---------------------------------------------------------------------- diff --git a/libminifi/include/provenance/ProvenanceTaskReport.h b/libminifi/include/provenance/ProvenanceTaskReport.h index 4123593..932ed39 100644 --- a/libminifi/include/provenance/ProvenanceTaskReport.h +++ b/libminifi/include/provenance/ProvenanceTaskReport.h @@ -44,7 +44,8 @@ public: ProvenanceTaskReport(std::string name, uuid_t uuid = NULL) : core::Processor(name, uuid) { logger_ = logging::Logger::getLogger(); - uuid_copy(protocol_uuid_,uuid); + if (uuid) + uuid_copy(protocol_uuid_,uuid); this->setTriggerWhenEmpty(true); } //! Destructor @@ -62,6 +63,10 @@ public: static core::Relationship relation; static const char *ProvenanceAppStr; public: + //! Get provenance jason report + void getJasonReport(core::ProcessContext *context, + core::ProcessSession *session, std::vector < std::shared_ptr < ProvenanceEventRecord >> &records, + std::string &report); //! OnTrigger method, implemented by NiFi ProvenanceTaskReport virtual void onTrigger(core::ProcessContext *context, core::ProcessSession *session); //! Initialize, over write by NiFi ProvenanceTaskReport http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a94ac905/libminifi/src/provenance/ProvenanceTaskReport.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/provenance/ProvenanceTaskReport.cpp b/libminifi/src/provenance/ProvenanceTaskReport.cpp index 082fced..dfe81e8 100644 --- a/libminifi/src/provenance/ProvenanceTaskReport.cpp +++ b/libminifi/src/provenance/ProvenanceTaskReport.cpp @@ -68,6 +68,58 @@ void ProvenanceTaskReport::initialize() { setSupportedRelationships(relationships); } +void ProvenanceTaskReport::getJasonReport(core::ProcessContext *context, + core::ProcessSession *session, + std::vector<std::shared_ptr<ProvenanceEventRecord>> &records, + std::string &report) { + + Json::Value array; + for (auto record : records) { + Json::Value recordJson; + Json::Value updatedAttributesJson; + Json::Value parentUuidJson; + Json::Value childUuidJson; + recordJson["eventId"] = record->getEventId().c_str(); + recordJson["eventType"] = + ProvenanceEventRecord::ProvenanceEventTypeStr[record->getEventType()]; + recordJson["timestampMillis"] = record->getEventTime(); + recordJson["durationMillis"] = record->getEventDuration(); + recordJson["lineageStart"] = record->getlineageStartDate(); + recordJson["details"] = record->getDetails().c_str(); + recordJson["componentId"] = record->getComponentId().c_str(); + recordJson["componentType"] = record->getComponentType().c_str(); + recordJson["entityId"] = record->getFlowFileUuid().c_str(); + recordJson["entityType"] = "org.apache.nifi.flowfile.FlowFile"; + recordJson["entitySize"] = record->getFileSize(); + recordJson["entityOffset"] = record->getFileOffset(); + + for (auto attr : record->getAttributes()) { + updatedAttributesJson[attr.first] = attr.second; + } + recordJson["updatedAttributes"] = updatedAttributesJson; + + for (auto parentUUID : record->getParentUuids()) { + parentUuidJson.append(parentUUID.c_str()); + } + recordJson["parentIds"] = parentUuidJson; + + for (auto childUUID : record->getChildrenUuids()) { + childUuidJson.append(childUUID.c_str()); + } + recordJson["childIds"] = childUuidJson; + recordJson["transitUri"] = record->getTransitUri().c_str(); + recordJson["remoteIdentifier"] = + record->getSourceSystemFlowFileIdentifier().c_str(); + recordJson["alternateIdentifier"] = + record->getAlternateIdentifierUri().c_str(); + recordJson["application"] = ProvenanceAppStr; + array.append(recordJson); + } + + Json::StyledWriter writer; + report = writer.write(array); +} + void ProvenanceTaskReport::onTrigger(core::ProcessContext *context, core::ProcessSession *session) { std::string value; @@ -106,69 +158,26 @@ void ProvenanceTaskReport::onTrigger(core::ProcessContext *context, } int64_t batch = 100; - if (context->getProperty(batchSize.getName(), value) && core::Property::StringToInt(value, lvalue)) { batch = lvalue; } - std::vector < std::shared_ptr < ProvenanceEventRecord >> records; std::shared_ptr<ProvenanceRepository> repo = std::static_pointer_cast < ProvenanceRepository > (context->getProvenanceRepository()); - repo->getProvenanceRecord(records, batch); - if (records.size() <= 0) { returnSite2SiteProtocol(protocol_); return; } - Json::Value array; - for (auto record : records) { - Json::Value recordJson; - Json::Value updatedAttributesJson; - Json::Value parentUuidJson; - Json::Value childUuidJson; - recordJson["eventId"] = record->getEventId().c_str(); - recordJson["eventType"] = - ProvenanceEventRecord::ProvenanceEventTypeStr[record->getEventType()]; - recordJson["timestampMillis"] = record->getEventTime(); - recordJson["durationMillis"] = record->getEventDuration(); - recordJson["lineageStart"] = record->getlineageStartDate(); - recordJson["details"] = record->getDetails().c_str(); - recordJson["componentId"] = record->getComponentId().c_str(); - recordJson["componentType"] = record->getComponentType().c_str(); - recordJson["entityId"] = record->getFlowFileUuid().c_str(); - recordJson["entityType"] = "org.apache.nifi.flowfile.FlowFile"; - recordJson["entitySize"] = record->getFileSize(); - recordJson["entityOffset"] = record->getFileOffset(); - - for (auto attr : record->getAttributes()) { - updatedAttributesJson[attr.first] = attr.second; - } - recordJson["updatedAttributes"] = updatedAttributesJson; - - for (auto parentUUID : record->getParentUuids()) { - parentUuidJson.append(parentUUID.c_str()); - } - recordJson["parentIds"] = parentUuidJson; - - for (auto childUUID : record->getChildrenUuids()) { - childUuidJson.append(childUUID.c_str()); - } - recordJson["childIds"] = childUuidJson; - recordJson["transitUri"] = record->getTransitUri().c_str(); - recordJson["remoteIdentifier"] = - record->getSourceSystemFlowFileIdentifier().c_str(); - recordJson["alternateIdentifier"] = - record->getAlternateIdentifierUri().c_str(); - recordJson["application"] = ProvenanceAppStr; - array.append(recordJson); + std::string jsonStr; + this->getJasonReport(context, session, records, jsonStr); + if (jsonStr.length() <= 0) { + returnSite2SiteProtocol(protocol_); + return; } - Json::StyledWriter writer; - std::string jsonStr = writer.write(array); - try { std::map < std::string, std::string > attributes; protocol_->transferString(context, session, jsonStr, attributes); @@ -179,7 +188,6 @@ void ProvenanceTaskReport::onTrigger(core::ProcessContext *context, // we transfer the record, purge the record from DB repo->purgeProvenanceRecord(records); - returnSite2SiteProtocol(protocol_); } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a94ac905/libminifi/test/unit/ProcessorTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/ProcessorTests.cpp b/libminifi/test/unit/ProcessorTests.cpp index 87f190c..1653e2d 100644 --- a/libminifi/test/unit/ProcessorTests.cpp +++ b/libminifi/test/unit/ProcessorTests.cpp @@ -41,16 +41,20 @@ TEST_CASE("Test Find file", "[getfileCreate2]") { testController.enableDebug(); - std::shared_ptr<core::Processor> processor = std::make_shared< - org::apache::nifi::minifi::processors::GetFile>("getfileCreate2"); + std::shared_ptr<core::Processor> processor = std::make_shared + < org::apache::nifi::minifi::processors::GetFile > ("getfileCreate2"); + + std::shared_ptr<core::Processor> processorReport = std::make_shared + < org::apache::nifi::minifi::provenance::ProvenanceTaskReport + > ("provenanceTaskReport"); std::shared_ptr<core::Repository> test_repo = std::make_shared<TestRepository>(); - std::shared_ptr<TestRepository> repo = - std::static_pointer_cast<TestRepository>(test_repo); - std::shared_ptr<minifi::FlowController> controller = std::make_shared< - TestFlowController>(test_repo, test_repo); + std::shared_ptr<TestRepository> repo = std::static_pointer_cast + < TestRepository > (test_repo); + std::shared_ptr<minifi::FlowController> controller = std::make_shared + < TestFlowController > (test_repo, test_repo); char format[] = "/tmp/gt.XXXXXX"; char *dir = testController.createTempDirectory(format); @@ -58,8 +62,8 @@ TEST_CASE("Test Find file", "[getfileCreate2]") { uuid_t processoruuid; REQUIRE(true == processor->getUUID(processoruuid)); - std::shared_ptr<minifi::Connection> connection = std::make_shared< - minifi::Connection>(test_repo, "getfileCreate2Connection"); + std::shared_ptr<minifi::Connection> connection = std::make_shared + < minifi::Connection > (test_repo, "getfileCreate2Connection"); connection->setRelationship(core::Relationship("success", "description")); // link the connections so that we can test results at the end for this @@ -77,7 +81,7 @@ TEST_CASE("Test Find file", "[getfileCreate2]") { core::ProcessContext context(node, test_repo); core::ProcessSessionFactory factory(&context); context.setProperty(org::apache::nifi::minifi::processors::GetFile::Directory, - dir); + dir); core::ProcessSession session(&context); processor->onSchedule(&context, &factory); @@ -122,7 +126,7 @@ TEST_CASE("Test Find file", "[getfileCreate2]") { for (auto entry : repo->getRepoMap()) { provenance::ProvenanceEventRecord newRecord; newRecord.DeSerialize((uint8_t*) entry.second.data(), - entry.second.length()); + entry.second.length()); bool found = false; for (auto provRec : records) { @@ -141,6 +145,26 @@ TEST_CASE("Test Find file", "[getfileCreate2]") { } + core::ProcessorNode nodeReport(processorReport); + core::ProcessContext contextReport(nodeReport, test_repo); + core::ProcessSessionFactory factoryReport(&contextReport); + contextReport.setProperty(org::apache::nifi::minifi::provenance::ProvenanceTaskReport::batchSize, + "1"); + core::ProcessSession sessionReport(&contextReport); + processorReport->onSchedule(&contextReport, &factoryReport); + std::shared_ptr<org::apache::nifi::minifi::provenance::ProvenanceTaskReport> taskReport = std::static_pointer_cast + < org::apache::nifi::minifi::provenance::ProvenanceTaskReport > (processorReport); + std::vector < std::shared_ptr < provenance::ProvenanceEventRecord >> recordsReport; + processorReport->incrementActiveTasks(); + processorReport->setScheduledState(core::ScheduledState::RUNNING); + std::string jsonStr; + repo->getProvenanceRecord(recordsReport, 1); + taskReport->getJasonReport(&contextReport, &sessionReport, recordsReport, jsonStr); + REQUIRE(recordsReport.size() == 1); + REQUIRE(taskReport->getName() == "provenanceTaskReport"); + REQUIRE(jsonStr.find("\"componentType\" : \"getfileCreate2\"") != std::string::npos); + REQUIRE(jsonStr.find("\"filename\" : \"tstFile.ext\"") != std::string::npos); + } TEST_CASE("Test GetFileLikeIt'sThreaded", "[getfileCreate3]") { @@ -149,16 +173,16 @@ TEST_CASE("Test GetFileLikeIt'sThreaded", "[getfileCreate3]") { testController.enableDebug(); - std::shared_ptr<core::Processor> processor = std::make_shared< - org::apache::nifi::minifi::processors::GetFile>("getfileCreate2"); + std::shared_ptr<core::Processor> processor = std::make_shared + < org::apache::nifi::minifi::processors::GetFile > ("getfileCreate2"); std::shared_ptr<core::Repository> test_repo = std::make_shared<TestRepository>(); - std::shared_ptr<TestRepository> repo = - std::static_pointer_cast<TestRepository>(test_repo); - std::shared_ptr<minifi::FlowController> controller = std::make_shared< - TestFlowController>(test_repo, test_repo); + std::shared_ptr<TestRepository> repo = std::static_pointer_cast + < TestRepository > (test_repo); + std::shared_ptr<minifi::FlowController> controller = std::make_shared + < TestFlowController > (test_repo, test_repo); char format[] = "/tmp/gt.XXXXXX"; char *dir = testController.createTempDirectory(format); @@ -166,8 +190,8 @@ TEST_CASE("Test GetFileLikeIt'sThreaded", "[getfileCreate3]") { uuid_t processoruuid; REQUIRE(true == processor->getUUID(processoruuid)); - std::shared_ptr<minifi::Connection> connection = std::make_shared< - minifi::Connection>(test_repo, "getfileCreate2Connection"); + std::shared_ptr<minifi::Connection> connection = std::make_shared + < minifi::Connection > (test_repo, "getfileCreate2Connection"); connection->setRelationship(core::Relationship("success", "description")); // link the connections so that we can test results at the end for this @@ -184,7 +208,7 @@ TEST_CASE("Test GetFileLikeIt'sThreaded", "[getfileCreate3]") { core::ProcessContext context(node, test_repo); core::ProcessSessionFactory factory(&context); context.setProperty(org::apache::nifi::minifi::processors::GetFile::Directory, - dir); + dir); // replicate 10 threads processor->setScheduledState(core::ScheduledState::RUNNING); processor->onSchedule(&context, &factory); @@ -229,9 +253,9 @@ TEST_CASE("Test GetFileLikeIt'sThreaded", "[getfileCreate3]") { session.commit(); std::shared_ptr<core::FlowFile> ffr = session.get(); - REQUIRE((repo->getRepoMap().size()%2) == 0); - REQUIRE(repo->getRepoMap().size() == (prev+2)); - prev+=2; + REQUIRE((repo->getRepoMap().size() % 2) == 0); + REQUIRE(repo->getRepoMap().size() == (prev + 2)); + prev += 2; } @@ -239,10 +263,10 @@ TEST_CASE("Test GetFileLikeIt'sThreaded", "[getfileCreate3]") { TEST_CASE("LogAttributeTest", "[getfileCreate3]") { std::ostringstream oss; - std::unique_ptr<logging::BaseLogger> outputLogger = std::unique_ptr< - logging::BaseLogger>( - new org::apache::nifi::minifi::core::logging::OutputStreamAppender(oss, - 0)); + std::unique_ptr<logging::BaseLogger> outputLogger = std::unique_ptr + < logging::BaseLogger + > (new org::apache::nifi::minifi::core::logging::OutputStreamAppender(oss, + 0)); std::shared_ptr<logging::Logger> logger = logging::Logger::getLogger(); logger->updateLogger(std::move(outputLogger)); @@ -252,11 +276,11 @@ TEST_CASE("LogAttributeTest", "[getfileCreate3]") { std::shared_ptr<core::Repository> repo = std::make_shared<TestRepository>(); - std::shared_ptr<core::Processor> processor = std::make_shared< - org::apache::nifi::minifi::processors::GetFile>("getfileCreate2"); + std::shared_ptr<core::Processor> processor = std::make_shared + < org::apache::nifi::minifi::processors::GetFile > ("getfileCreate2"); - std::shared_ptr<core::Processor> logAttribute = std::make_shared< - org::apache::nifi::minifi::processors::LogAttribute>("logattribute"); + std::shared_ptr<core::Processor> logAttribute = std::make_shared + < org::apache::nifi::minifi::processors::LogAttribute > ("logattribute"); char format[] = "/tmp/gt.XXXXXX"; char *dir = testController.createTempDirectory(format); @@ -267,12 +291,12 @@ TEST_CASE("LogAttributeTest", "[getfileCreate3]") { uuid_t logattribute_uuid; REQUIRE(true == logAttribute->getUUID(logattribute_uuid)); - std::shared_ptr<minifi::Connection> connection = std::make_shared< - minifi::Connection>(repo, "getfileCreate2Connection"); + std::shared_ptr<minifi::Connection> connection = std::make_shared + < minifi::Connection > (repo, "getfileCreate2Connection"); connection->setRelationship(core::Relationship("success", "description")); - std::shared_ptr<minifi::Connection> connection2 = std::make_shared< - minifi::Connection>(repo, "logattribute"); + std::shared_ptr<minifi::Connection> connection2 = std::make_shared + < minifi::Connection > (repo, "logattribute"); connection2->setRelationship(core::Relationship("success", "description")); // link the connections so that we can test results at the end for this @@ -298,7 +322,7 @@ TEST_CASE("LogAttributeTest", "[getfileCreate3]") { core::ProcessContext context(node, repo); core::ProcessContext context2(node2, repo); context.setProperty(org::apache::nifi::minifi::processors::GetFile::Directory, - dir); + dir); core::ProcessSession session(&context); core::ProcessSession session2(&context2); @@ -357,8 +381,8 @@ TEST_CASE("LogAttributeTest", "[getfileCreate3]") { log_attribute_output.find("key:path value:" + std::string(dir)) != std::string::npos); - outputLogger = std::unique_ptr<logging::BaseLogger>( - new org::apache::nifi::minifi::core::logging::NullAppender()); + outputLogger = std::unique_ptr < logging::BaseLogger + > (new org::apache::nifi::minifi::core::logging::NullAppender()); logger->updateLogger(std::move(outputLogger)); } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a94ac905/libminifi/test/unit/ProvenanceTestHelper.h ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/ProvenanceTestHelper.h b/libminifi/test/unit/ProvenanceTestHelper.h index 80d8642..67039bf 100644 --- a/libminifi/test/unit/ProvenanceTestHelper.h +++ b/libminifi/test/unit/ProvenanceTestHelper.h @@ -25,10 +25,10 @@ /** * Test repository */ -class TestRepository : public core::Repository { - public: - TestRepository() - : Repository("repo_name", "./dir", 1000, 100, 0) { +class TestRepository: public core::Repository { +public: + TestRepository() : + Repository("repo_name", "./dir", 1000, 100, 0) { } // initialize bool initialize() { @@ -42,8 +42,8 @@ class TestRepository : public core::Repository { bool Put(std::string key, uint8_t *buf, int bufLen) { repositoryResults.insert( - std::pair<std::string, std::string>( - key, std::string((const char*) buf, bufLen))); + std::pair<std::string, std::string>(key, + std::string((const char*) buf, bufLen))); return true; } // Delete @@ -66,19 +66,35 @@ class TestRepository : public core::Repository { return repositoryResults; } + void getProvenanceRecord( + std::vector<std::shared_ptr<provenance::ProvenanceEventRecord>> &records, + int maxSize) { + for (auto entry : repositoryResults) { + if (records.size() >= maxSize) + break; + std::shared_ptr<provenance::ProvenanceEventRecord> eventRead = + std::make_shared<provenance::ProvenanceEventRecord>(); + + if (eventRead->DeSerialize((uint8_t*) entry.second.data(), + entry.second.length())) { + records.push_back(eventRead); + } + } + } + void run() { // do nothing } - protected: +protected: std::map<std::string, std::string> repositoryResults; }; -class TestFlowController : public minifi::FlowController { +class TestFlowController: public minifi::FlowController { - public: +public: TestFlowController(std::shared_ptr<core::Repository> repo, - std::shared_ptr<core::Repository> flow_file_repo) - : minifi::FlowController(repo, flow_file_repo, nullptr, "",true) { + std::shared_ptr<core::Repository> flow_file_repo) : + minifi::FlowController(repo, flow_file_repo, nullptr, "", true) { } ~TestFlowController() { @@ -112,7 +128,7 @@ class TestFlowController : public minifi::FlowController { } std::shared_ptr<core::Processor> createProcessor(std::string name, - uuid_t uuid) { + uuid_t uuid) { return 0; } @@ -125,10 +141,10 @@ class TestFlowController : public minifi::FlowController { } std::shared_ptr<minifi::Connection> createConnection(std::string name, - uuid_t uuid) { + uuid_t uuid) { return 0; } - protected: +protected: void initializePaths(const std::string &adjustedFilename) { } };