This is an automated email from the ASF dual-hosted git repository. aldrin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit f01953b8cd8814fa5b4faa0a7cb588f0545bb231 Author: Arpad Boda <ab...@hortonworks.com> AuthorDate: Mon Feb 18 23:19:16 2019 +0100 MINIFICPP-738 - EL should be able to access global properties --- extensions/expression-language/Expression.cpp | 8 ++++++-- extensions/expression-language/ProcessContextExpr.cpp | 10 ++++++++-- .../expression-language/impl/expression/Expression.h | 9 +++++++-- extensions/rocksdb-repos/DatabaseContentRepository.cpp | 1 + libminifi/include/core/ContentRepository.h | 5 +++++ libminifi/src/core/repository/FileSystemRepository.cpp | 1 + .../src/core/repository/VolatileContentRepository.cpp | 1 + libminifi/test/TestBase.h | 6 ++++-- .../ExpressionLanguageTests.cpp | 17 ++++++++++++++++- 9 files changed, 49 insertions(+), 9 deletions(-) diff --git a/extensions/expression-language/Expression.cpp b/extensions/expression-language/Expression.cpp index b031b04..07fd509 100644 --- a/extensions/expression-language/Expression.cpp +++ b/extensions/expression-language/Expression.cpp @@ -69,9 +69,13 @@ Expression make_dynamic_attr(const std::string &attribute_id) { const auto cur_flow_file = params.flow_file.lock(); if (cur_flow_file && cur_flow_file->getAttribute(attribute_id, result)) { return Value(result); - } else { - return Value(); + } else if (attribute_id.rfind("nifi.", 0) == 0) { + const auto config = params.configuration.lock(); + if (config && config->get(attribute_id, result)) { + return Value(result); + } } + return Value(); }); } diff --git a/extensions/expression-language/ProcessContextExpr.cpp b/extensions/expression-language/ProcessContextExpr.cpp index 5cceb77..71c0b36 100644 --- a/extensions/expression-language/ProcessContextExpr.cpp +++ b/extensions/expression-language/ProcessContextExpr.cpp @@ -34,7 +34,10 @@ bool ProcessContext::getProperty(const Property &property, std::string &value, c expressions_.emplace(name, expression::compile(expression_str)); } - value = expressions_[name]( { flow_file }).asString(); + minifi::expression::Parameters p(flow_file); + p.configuration = content_repo_->getConfig(); + + value = expressions_[name]( p ).asString(); return true; } @@ -50,7 +53,10 @@ bool ProcessContext::getDynamicProperty(const Property &property, std::string &v dynamic_property_expressions_.emplace(name, expression::compile(expression_str)); } - value = dynamic_property_expressions_[name]( { flow_file }).asString(); + minifi::expression::Parameters p(flow_file); + p.configuration = content_repo_->getConfig(); + + value = dynamic_property_expressions_[name]( p ).asString(); return true; } diff --git a/extensions/expression-language/impl/expression/Expression.h b/extensions/expression-language/impl/expression/Expression.h index 15f7558..826f588 100644 --- a/extensions/expression-language/impl/expression/Expression.h +++ b/extensions/expression-language/impl/expression/Expression.h @@ -45,9 +45,14 @@ namespace nifi { namespace minifi { namespace expression { -typedef struct { +struct Parameters { std::weak_ptr<core::FlowFile> flow_file; -} Parameters; + std::weak_ptr<minifi::Configure> configuration; + Parameters(std::shared_ptr<core::FlowFile> ff = nullptr) { + flow_file = ff; + } + +}; class Expression; diff --git a/extensions/rocksdb-repos/DatabaseContentRepository.cpp b/extensions/rocksdb-repos/DatabaseContentRepository.cpp index 5842bf2..29864f4 100644 --- a/extensions/rocksdb-repos/DatabaseContentRepository.cpp +++ b/extensions/rocksdb-repos/DatabaseContentRepository.cpp @@ -30,6 +30,7 @@ namespace core { namespace repository { bool DatabaseContentRepository::initialize(const std::shared_ptr<minifi::Configure> &configuration) { + configuration_ = configuration; std::string value; if (configuration->get(Configure::nifi_dbcontent_repository_directory_default, value)) { directory_ = value; diff --git a/libminifi/include/core/ContentRepository.h b/libminifi/include/core/ContentRepository.h index 51d2765..50c6de1 100644 --- a/libminifi/include/core/ContentRepository.h +++ b/libminifi/include/core/ContentRepository.h @@ -108,6 +108,10 @@ class ContentRepository : public StreamManager<minifi::ResourceClaim> { } } + virtual std::weak_ptr<Configure> getConfig(){ + return configuration_; + } + protected: std::string directory_; @@ -116,6 +120,7 @@ class ContentRepository : public StreamManager<minifi::ResourceClaim> { std::map<std::string, uint32_t> count_map_; + std::weak_ptr<Configure> configuration_; }; } /* namespace core */ diff --git a/libminifi/src/core/repository/FileSystemRepository.cpp b/libminifi/src/core/repository/FileSystemRepository.cpp index c0f1694..0a114d3 100644 --- a/libminifi/src/core/repository/FileSystemRepository.cpp +++ b/libminifi/src/core/repository/FileSystemRepository.cpp @@ -30,6 +30,7 @@ namespace core { namespace repository { bool FileSystemRepository::initialize(const std::shared_ptr<minifi::Configure> &configuration) { + configuration_ = configuration; std::string value; if (configuration->get(Configure::nifi_dbcontent_repository_directory_default, value)) { directory_ = value; diff --git a/libminifi/src/core/repository/VolatileContentRepository.cpp b/libminifi/src/core/repository/VolatileContentRepository.cpp index 674566b..1b27157 100644 --- a/libminifi/src/core/repository/VolatileContentRepository.cpp +++ b/libminifi/src/core/repository/VolatileContentRepository.cpp @@ -35,6 +35,7 @@ namespace repository { const char *VolatileContentRepository::minimal_locking = "minimal.locking"; bool VolatileContentRepository::initialize(const std::shared_ptr<Configure> &configure) { + configuration_ = configure; VolatileRepository::initialize(configure); resource_claim_comparator_ = [](std::shared_ptr<minifi::ResourceClaim> lhsPtr, std::shared_ptr<minifi::ResourceClaim> rhsPtr) { if (lhsPtr == nullptr || rhsPtr == nullptr) { diff --git a/libminifi/test/TestBase.h b/libminifi/test/TestBase.h index c12edeb..c8104d5 100644 --- a/libminifi/test/TestBase.h +++ b/libminifi/test/TestBase.h @@ -249,8 +249,10 @@ class TestController { flow_version_ = std::make_shared<minifi::state::response::FlowVersion>("test", "test", "test"); } - std::shared_ptr<TestPlan> createPlan() { - std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>(); + std::shared_ptr<TestPlan> createPlan(std::shared_ptr<minifi::Configure> configuration = nullptr) { + if(configuration == nullptr) { + configuration = std::make_shared<minifi::Configure>(); + } std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); content_repo->initialize(configuration); diff --git a/libminifi/test/expression-language-tests/ExpressionLanguageTests.cpp b/libminifi/test/expression-language-tests/ExpressionLanguageTests.cpp index 3ee6ec3..c96cd12 100644 --- a/libminifi/test/expression-language-tests/ExpressionLanguageTests.cpp +++ b/libminifi/test/expression-language-tests/ExpressionLanguageTests.cpp @@ -25,6 +25,7 @@ #include <ExtractText.h> #include <GetFile.h> #include <PutFile.h> +#include <UpdateAttribute.h> #include <LogAttribute.h> namespace expression = org::apache::nifi::minifi::expression; @@ -176,8 +177,13 @@ TEST_CASE("GetFile PutFile dynamic attribute", "[expressionLanguageTestGetFilePu LogTestController::getInstance().setTrace<processors::GetFile>(); LogTestController::getInstance().setTrace<processors::PutFile>(); LogTestController::getInstance().setTrace<processors::LogAttribute>(); + LogTestController::getInstance().setTrace<processors::UpdateAttribute>(); - auto plan = testController.createPlan(); + auto conf = std::make_shared<minifi::Configure>(); + + conf->set("nifi.my.own.property", "custom_value"); + + auto plan = testController.createPlan(conf); auto repo = std::make_shared<TestRepository>(); std::string in_dir("/tmp/gt.XXXXXX"); @@ -203,6 +209,12 @@ TEST_CASE("GetFile PutFile dynamic attribute", "[expressionLanguageTestGetFilePu get_file, processors::GetFile::KeepSourceFile.getName(), "false"); + auto update = plan->addProcessor( + "UpdateAttribute", + "UpdateAttribute", + core::Relationship("success", "description"), + true); + update->setDynamicProperty("prop_attr", "${'nifi.my.own.property'}_added"); plan->addProcessor( "LogAttribute", "LogAttribute", @@ -246,6 +258,7 @@ TEST_CASE("GetFile PutFile dynamic attribute", "[expressionLanguageTestGetFilePu } plan->runNextProcessor(); // GetFile + plan->runNextProcessor(); // Update plan->runNextProcessor(); // Log plan->runNextProcessor(); // ExtractText plan->runNextProcessor(); // Log @@ -258,6 +271,8 @@ TEST_CASE("GetFile PutFile dynamic attribute", "[expressionLanguageTestGetFilePu output_str << out_file_stream.rdbuf(); REQUIRE("extracted_attr" == output_str.str()); } + + REQUIRE(LogTestController::getInstance().contains("key:prop_attr value:custom_value_added")); } TEST_CASE("Substring 2 arg", "[expressionLanguageSubstring2]") { // NOLINT