Repository: nifi-minifi-cpp Updated Branches: refs/heads/master 0043155dc -> 2fb4021e1
MINIFICPP-654 - C API: failure callback improvements This closes #429. Signed-off-by: Marc Parisi <phroc...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/2fb4021e Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/2fb4021e Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/2fb4021e Branch: refs/heads/master Commit: 2fb4021e1b93eaa7ae491a13f88a17e466475a86 Parents: 0043155 Author: Arpad Boda <ab...@hortonworks.com> Authored: Wed Oct 24 15:14:05 2018 +0200 Committer: Marc Parisi <phroc...@apache.org> Committed: Tue Oct 30 07:29:30 2018 -0400 ---------------------------------------------------------------------- libminifi/include/capi/Plan.h | 77 ++++++++++++++++++++++------------ libminifi/include/capi/api.h | 13 +++++- libminifi/include/capi/cstructs.h | 6 ++- libminifi/src/capi/Plan.cpp | 31 ++++++++++++-- libminifi/src/capi/api.cpp | 49 +++++++++++----------- libminifi/test/capi/CAPITests.cpp | 12 ++++-- 6 files changed, 129 insertions(+), 59 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2fb4021e/libminifi/include/capi/Plan.h ---------------------------------------------------------------------- diff --git a/libminifi/include/capi/Plan.h b/libminifi/include/capi/Plan.h index 08ad68a..75330a0 100644 --- a/libminifi/include/capi/Plan.h +++ b/libminifi/include/capi/Plan.h @@ -46,6 +46,41 @@ #include "capi/cstructs.h" #include "capi/api.h" +using failure_callback_type = std::function<void(flow_file_record*)>; +using content_repo_sptr = std::shared_ptr<core::ContentRepository>; + +namespace { + + void failureStrategyAsIs(core::ProcessSession *session, failure_callback_type user_callback, content_repo_sptr cr_ptr) { + auto ff = session->get(); + if (ff == nullptr) { + return; + } + + auto claim = ff->getResourceClaim(); + + if (claim != nullptr && user_callback != nullptr) { + claim->increaseFlowFileRecordOwnedCount(); + // create a flow file. + auto path = claim->getContentFullPath(); + auto ffr = create_ff_object_na(path.c_str(), path.length(), ff->getSize()); + ffr->attributes = ff->getAttributesPtr(); + ffr->ffp = ff.get(); + auto content_repo_ptr = static_cast<std::shared_ptr<minifi::core::ContentRepository>*>(ffr->crp); + *content_repo_ptr = cr_ptr; + user_callback(ffr); + } + session->remove(ff); + } + + void failureStrategyRollback(core::ProcessSession *session, failure_callback_type user_callback, content_repo_sptr cr_ptr) { + session->rollback(); + failureStrategyAsIs(session, user_callback, cr_ptr); + } +} + +static const std::map<FailureStrategy, const std::function<void(core::ProcessSession*, failure_callback_type, content_repo_sptr)>> FailureStrategies = + { { FailureStrategy::AS_IS, failureStrategyAsIs }, {FailureStrategy::ROLLBACK, failureStrategyRollback } }; class ExecutionPlan { public: @@ -67,7 +102,9 @@ class ExecutionPlan { bool runNextProcessor(std::function<void(const std::shared_ptr<core::ProcessContext>, const std::shared_ptr<core::ProcessSession>)> verify = nullptr); - bool setFailureCallback(void (*onerror_callback)(const flow_file_record*)); + bool setFailureCallback(failure_callback_type onerror_callback); + + bool setFailureStrategy(FailureStrategy start); std::set<provenance::ProvenanceEventRecord*> getProvenanceRecords(); @@ -100,37 +137,25 @@ class ExecutionPlan { protected: class FailureHandler { public: - FailureHandler() { + FailureHandler(content_repo_sptr cr_ptr) { callback_ = nullptr; + strategy_ = FailureStrategy::AS_IS; + content_repo_ = cr_ptr; } - void setCallback(void (*onerror_callback)(const flow_file_record*)) { + void setCallback(failure_callback_type onerror_callback) { callback_=onerror_callback; } - void operator()(const processor_session* ps) - { + void setStrategy(FailureStrategy strat) { + strategy_ = strat; + } + void operator()(const processor_session* ps) { auto ses = static_cast<core::ProcessSession*>(ps->session); - - auto ff = ses->get(); - if (ff == nullptr) { - return; - } - auto claim = ff->getResourceClaim(); - - if (claim != nullptr && callback_ != nullptr) { - // create a flow file. - auto path = claim->getContentFullPath(); - auto ffr = create_ff_object_na(path.c_str(), path.length(), ff->getSize()); - ffr->attributes = ff->getAttributesPtr(); - ffr->ffp = ff.get(); - callback_(ffr); - } - // This deletes the content of the flowfile as ff gets out of scope - // It's the users responsibility to copy all the data - ses->remove(ff); - + FailureStrategies.at(strategy_)(ses, callback_, content_repo_); } private: - void (*callback_)(const flow_file_record*); + failure_callback_type callback_; + FailureStrategy strategy_; + content_repo_sptr content_repo_; }; void finalize(); @@ -142,7 +167,7 @@ class ExecutionPlan { std::shared_ptr<org::apache::nifi::minifi::io::StreamFactory> stream_factory; - std::shared_ptr<core::ContentRepository> content_repo_; + content_repo_sptr content_repo_; std::shared_ptr<core::Repository> flow_repo_; std::shared_ptr<core::Repository> prov_repo_; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2fb4021e/libminifi/include/capi/api.h ---------------------------------------------------------------------- diff --git a/libminifi/include/capi/api.h b/libminifi/include/capi/api.h index 4e319ef..e05c04e 100644 --- a/libminifi/include/capi/api.h +++ b/libminifi/include/capi/api.h @@ -83,10 +83,19 @@ processor *add_python_processor(flow *, void (*ontrigger_callback)(processor_ses /** * Register your callback to received flow files that the flow failed to process -* The flow file is deleted after the callback is executed, make sure to copy all the data you need! +* The flow file ownership is transferred to the caller! * The first callback should be registered before the flow is used. Can be changed later during runtime. */ -int add_failure_callback(flow *flow, void (*onerror_callback)(const flow_file_record*)); +int add_failure_callback(flow *flow, void (*onerror_callback)(flow_file_record*)); + + +/** +* Set failure strategy. Please use the enum defined in cstructs.h +* Return values: 0 (success), -1 (strategy cannot be set - no failure callback added?) +* Can be changed runtime. +* The defailt strategy is AS IS. +*/ +int set_failure_strategy(flow *flow, FailureStrategy strategy); int set_property(processor *, const char *, const char *); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2fb4021e/libminifi/include/capi/cstructs.h ---------------------------------------------------------------------- diff --git a/libminifi/include/capi/cstructs.h b/libminifi/include/capi/cstructs.h index 731fd55..55197d9 100644 --- a/libminifi/include/capi/cstructs.h +++ b/libminifi/include/capi/cstructs.h @@ -19,6 +19,8 @@ #ifndef LIBMINIFI_SRC_CAPI_CSTRUCTS_H_ #define LIBMINIFI_SRC_CAPI_CSTRUCTS_H_ +#include <stddef.h> +#include <stdint.h> /** * NiFi Port struct @@ -105,6 +107,8 @@ typedef struct { void * in; + void * crp; + char * contentLocation; /**< Filesystem location of this object */ void *attributes; /**< Hash map of attributes */ @@ -118,6 +122,6 @@ typedef struct { void *plan; } flow; - +typedef enum FS { AS_IS, ROLLBACK } FailureStrategy; #endif /* LIBMINIFI_SRC_CAPI_CSTRUCTS_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2fb4021e/libminifi/src/capi/Plan.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/capi/Plan.cpp b/libminifi/src/capi/Plan.cpp index 0abd63b..78b864c 100644 --- a/libminifi/src/capi/Plan.cpp +++ b/libminifi/src/capi/Plan.cpp @@ -23,6 +23,18 @@ #include <set> #include <string> +bool intToFailureStragey(int in, FailureStrategy *out) { + auto tmp = static_cast<FailureStrategy>(in); + switch (tmp) { + case AS_IS: + case ROLLBACK: + *out = tmp; + return true; + default: + return false; + } +} + std::shared_ptr<utils::IdGenerator> ExecutionPlan::id_generator_ = utils::IdGenerator::getIdGenerator(); ExecutionPlan::ExecutionPlan(std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::Repository> prov_repo) @@ -192,7 +204,12 @@ void ExecutionPlan::finalize() { callback_proc->setCallback(nullptr, std::bind(&FailureHandler::operator(), failure_handler_, std::placeholders::_1)); for (const auto& proc : processor_queue_) { - relationships_.push_back(connectProcessors(proc, failure_proc, core::Relationship("failure", "failure collector"), true)); + for (const auto& rel : proc->getSupportedRelationships()) { + if (rel.getName() == "failure") { + relationships_.push_back(connectProcessors(proc, failure_proc, core::Relationship("failure", "failure collector"), true)); + break; + } + } } std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(failure_proc); @@ -256,14 +273,22 @@ std::shared_ptr<minifi::Connection> ExecutionPlan::connectProcessors(std::shared return connection; } -bool ExecutionPlan::setFailureCallback(void (*onerror_callback)(const flow_file_record*)) { +bool ExecutionPlan::setFailureCallback(std::function<void(flow_file_record*)> onerror_callback) { if (finalized && !failure_handler_) { return false; // Already finalized the flow without failure handler processor } if (!failure_handler_) { - failure_handler_ = std::make_shared<FailureHandler>(); + failure_handler_ = std::make_shared<FailureHandler>(getContentRepo()); } failure_handler_->setCallback(onerror_callback); return true; } +bool ExecutionPlan::setFailureStrategy(FailureStrategy start) { + if (!failure_handler_) { + return false; + } + failure_handler_->setStrategy(start); + return true; +} + http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2fb4021e/libminifi/src/capi/api.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/capi/api.cpp b/libminifi/src/capi/api.cpp index 21010a1..58328ef 100644 --- a/libminifi/src/capi/api.cpp +++ b/libminifi/src/capi/api.cpp @@ -159,14 +159,9 @@ flow_file_record* create_ff_object(const char *file, const size_t len, const uin if (nullptr == file) { return nullptr; } - flow_file_record *new_ff = new flow_file_record; + flow_file_record *new_ff = create_ff_object_na(file, len, size); new_ff->attributes = new string_map(); new_ff->ffp = 0; - new_ff->contentLocation = new char[len + 1]; - snprintf(new_ff->contentLocation, len + 1, "%s", file); - std::ifstream in(file, std::ifstream::ate | std::ifstream::binary); - // set the size of the flow file. - new_ff->size = size; return new_ff; } @@ -175,9 +170,9 @@ flow_file_record* create_ff_object_na(const char *file, const size_t len, const new_ff->attributes = nullptr; new_ff->contentLocation = new char[len + 1]; snprintf(new_ff->contentLocation, len + 1, "%s", file); - std::ifstream in(file, std::ifstream::ate | std::ifstream::binary); // set the size of the flow file. new_ff->size = size; + new_ff->crp = static_cast<void*>(new std::shared_ptr<minifi::core::ContentRepository>); return new_ff; } /** @@ -185,21 +180,21 @@ flow_file_record* create_ff_object_na(const char *file, const size_t len, const * @param ff flow file record. */ void free_flowfile(flow_file_record *ff) { - if (ff != nullptr) { - if (ff->in != nullptr) { - auto instance = static_cast<nifi_instance*>(ff->in); - auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr); - auto content_repo = minifi_instance_ref->getContentRepository(); - std::shared_ptr<minifi::ResourceClaim> claim = std::make_shared<minifi::ResourceClaim>(ff->contentLocation, content_repo); - content_repo->remove(claim); - } - if (ff->ffp == nullptr) { - auto map = static_cast<string_map*>(ff->attributes); - delete map; - } - delete[] ff->contentLocation; - delete ff; + if (ff == nullptr) { + return; + } + auto content_repo_ptr = static_cast<std::shared_ptr<minifi::core::ContentRepository>*>(ff->crp); + if (content_repo_ptr->get()) { + std::shared_ptr<minifi::ResourceClaim> claim = std::make_shared<minifi::ResourceClaim>(ff->contentLocation, *content_repo_ptr); + (*content_repo_ptr)->remove(claim); } + if (ff->ffp == nullptr) { + auto map = static_cast<string_map*>(ff->attributes); + delete map; + } + delete[] ff->contentLocation; + delete ff; + delete content_repo_ptr; } /** @@ -405,11 +400,15 @@ processor *add_processor_with_linkage(flow *flow, const char *processor_name) { return nullptr; } -int add_failure_callback(flow *flow, void (*onerror_callback)(const flow_file_record*)) { +int add_failure_callback(flow *flow, void (*onerror_callback)(flow_file_record*)) { ExecutionPlan *plan = static_cast<ExecutionPlan*>(flow->plan); return plan->setFailureCallback(onerror_callback) ? 0 : 1; } +int set_failure_strategy(flow *flow, FailureStrategy strategy) { + return static_cast<ExecutionPlan*>(flow->plan)->setFailureStrategy(strategy) ? 0 : -1; +} + int set_property(processor *proc, const char *name, const char *value) { if (name != nullptr && value != nullptr && proc != nullptr) { core::Processor *p = static_cast<core::Processor*>(proc->processor_ptr); @@ -447,7 +446,8 @@ flow_file_record *get_next_flow_file(nifi_instance *instance, flow *flow) { auto ffr = create_ff_object_na(path.c_str(), path.length(), ff->getSize()); ffr->ffp = ff.get(); ffr->attributes = ff->getAttributesPtr(); - ffr->in = instance; + auto content_repo_ptr = static_cast<std::shared_ptr<minifi::core::ContentRepository>*>(ffr->crp); + *content_repo_ptr = execution_plan->getContentRepo(); return ffr; } else { return nullptr; @@ -489,7 +489,8 @@ flow_file_record *get(nifi_instance *instance, flow *flow, processor_session *se auto ffr = create_ff_object_na(path.c_str(), path.length(), ff->getSize()); ffr->attributes = ff->getAttributesPtr(); ffr->ffp = ff.get(); - ffr->in = instance; + auto content_repo_ptr = static_cast<std::shared_ptr<minifi::core::ContentRepository>*>(ffr->crp); + *content_repo_ptr = execution_plan->getContentRepo(); return ffr; } else { return nullptr; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2fb4021e/libminifi/test/capi/CAPITests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/capi/CAPITests.cpp b/libminifi/test/capi/CAPITests.cpp index d4e1c49..368c9a1 100644 --- a/libminifi/test/capi/CAPITests.cpp +++ b/libminifi/test/capi/CAPITests.cpp @@ -34,19 +34,22 @@ static nifi_instance *create_instance_obj(const char *name = "random_instance") { nifi_port port; - port.port_id = "12345"; + char port_str[] = "12345"; + port.port_id = port_str; return create_instance("random_instance", &port); } static int failure_count = 0; -void failure_counter(const flow_file_record * fr) { +void failure_counter(flow_file_record * fr) { failure_count++; REQUIRE(get_attribute_qty(fr) > 0); + free_flowfile(fr); } -void big_failure_counter(const flow_file_record * fr) { +void big_failure_counter(flow_file_record * fr) { failure_count += 100; + free_flowfile(fr); } TEST_CASE("Test Creation of instance, one processor", "[createInstanceAndFlow]") { @@ -226,6 +229,8 @@ TEST_CASE("Test error handling callback", "[errorHandling]") { flow *test_flow = create_flow(instance, nullptr); REQUIRE(test_flow != nullptr); + // Failure strategy cannot be set before a valid callback is added + REQUIRE(set_failure_strategy(test_flow, FailureStrategy::AS_IS) != 0); REQUIRE(add_failure_callback(test_flow, failure_counter) == 0); processor *get_proc = add_processor(test_flow, "GetFile"); @@ -251,6 +256,7 @@ TEST_CASE("Test error handling callback", "[errorHandling]") { // Failure handler function can be replaced runtime REQUIRE(add_failure_callback(test_flow, big_failure_counter) == 0); + REQUIRE(set_failure_strategy(test_flow, FailureStrategy::ROLLBACK) == 0); // Create new testfile to trigger failure again ss << "2";