This is an automated email from the ASF dual-hosted git repository. aboda pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/main by this push: new 4faf65c MINIFICPP-1407 - Fetch flow configuration file from C2 if not found locally 4faf65c is described below commit 4faf65c1fccd7e273a8da4931170f49a314443e6 Author: Adam Debreceni <adam.debrec...@protonmail.com> AuthorDate: Tue Nov 17 15:09:36 2020 +0100 MINIFICPP-1407 - Fetch flow configuration file from C2 if not found locally Signed-off-by: Arpad Boda <ab...@apache.org> This closes #957 --- encrypt-config/ConfigFile.cpp | 105 --------- encrypt-config/ConfigFile.h | 44 +--- encrypt-config/tests/ConfigFileEncryptorTests.cpp | 12 +- encrypt-config/tests/ConfigFileTests.cpp | 7 +- ...UpdateTest.cpp => C2FetchFlowIfMissingTest.cpp} | 24 +- extensions/http-curl/tests/C2UpdateAgentTest.cpp | 2 +- extensions/http-curl/tests/C2UpdateTest.cpp | 2 +- extensions/http-curl/tests/CMakeLists.txt | 1 + extensions/http-curl/tests/HTTPHandlers.h | 47 ++-- extensions/http-curl/tests/HTTPIntegrationBase.h | 32 +++ libminifi/CMakeLists.txt | 2 +- libminifi/include/FlowController.h | 126 +---------- libminifi/include/c2/C2Agent.h | 4 +- libminifi/include/c2/C2Client.h | 5 +- .../core/controller/ControllerServiceProvider.h | 2 +- .../ForwardingControllerServiceProvider.h | 136 ++++++++++++ libminifi/include/properties/Properties.h | 36 ++- .../include/properties/PropertiesFile.h | 68 +++--- libminifi/include/utils/file/FileUtils.h | 2 +- libminifi/src/FlowController.cpp | 207 ++++------------- libminifi/src/Properties.cpp | 245 --------------------- libminifi/src/c2/C2Agent.cpp | 2 +- libminifi/src/c2/C2Client.cpp | 9 + libminifi/src/core/logging/LoggerConfiguration.cpp | 2 +- libminifi/src/properties/Properties.cpp | 150 +++++++++++++ libminifi/src/properties/PropertiesFile.cpp | 158 +++++++++++++ libminifi/test/resources/TestEmpty.yml | 21 ++ 27 files changed, 674 insertions(+), 777 deletions(-) diff --git a/encrypt-config/ConfigFile.cpp b/encrypt-config/ConfigFile.cpp index 4d6d0ca..b361d6b 100644 --- a/encrypt-config/ConfigFile.cpp +++ b/encrypt-config/ConfigFile.cpp @@ -35,111 +35,6 @@ namespace nifi { namespace minifi { namespace encrypt_config { -ConfigLine::ConfigLine(std::string line) : line_(line) { - line = utils::StringUtils::trim(line); - if (line.empty() || line[0] == '#') { return; } - - size_t index_of_first_equals_sign = line.find('='); - if (index_of_first_equals_sign == std::string::npos) { return; } - - std::string key = utils::StringUtils::trim(line.substr(0, index_of_first_equals_sign)); - if (key.empty()) { return; } - - key_ = key; - value_ = utils::StringUtils::trim(line.substr(index_of_first_equals_sign + 1)); -} - -ConfigLine::ConfigLine(std::string key, std::string value) - : line_{utils::StringUtils::join_pack(key, "=", value)}, key_{std::move(key)}, value_{std::move(value)} { -} - -void ConfigLine::updateValue(const std::string& value) { - auto pos = line_.find('='); - if (pos != std::string::npos) { - line_.replace(pos + 1, std::string::npos, value); - value_ = value; - } else { - throw std::invalid_argument{"Cannot update value in config line: it does not contain an = sign!"}; - } -} - -ConfigFile::ConfigFile(std::istream& input_stream) { - std::string line; - while (std::getline(input_stream, line)) { - config_lines_.push_back(ConfigLine{line}); - } -} - -ConfigFile::Lines::const_iterator ConfigFile::findKey(const std::string& key) const { - return std::find_if(config_lines_.cbegin(), config_lines_.cend(), [&key](const ConfigLine& config_line) { - return config_line.getKey() == key; - }); -} - -ConfigFile::Lines::iterator ConfigFile::findKey(const std::string& key) { - return std::find_if(config_lines_.begin(), config_lines_.end(), [&key](const ConfigLine& config_line) { - return config_line.getKey() == key; - }); -} - -bool ConfigFile::hasValue(const std::string& key) const { - const auto it = findKey(key); - return (it != config_lines_.end()); -} - -utils::optional<std::string> ConfigFile::getValue(const std::string& key) const { - const auto it = findKey(key); - if (it != config_lines_.end()) { - return it->getValue(); - } else { - return utils::nullopt; - } -} - -void ConfigFile::update(const std::string& key, const std::string& value) { - auto it = findKey(key); - if (it != config_lines_.end()) { - it->updateValue(value); - } else { - throw std::invalid_argument{"Key " + key + " not found in the config file!"}; - } -} - -void ConfigFile::insertAfter(const std::string& after_key, const std::string& key, const std::string& value) { - auto it = findKey(after_key); - if (it != config_lines_.end()) { - ++it; - config_lines_.emplace(it, key, value); - } else { - throw std::invalid_argument{"Key " + after_key + " not found in the config file!"}; - } -} - -void ConfigFile::append(const std::string& key, const std::string& value) { - config_lines_.emplace_back(key, value); -} - -int ConfigFile::erase(const std::string& key) { - auto has_this_key = [&key](const ConfigLine& line) { return line.getKey() == key; }; - auto new_end = std::remove_if(config_lines_.begin(), config_lines_.end(), has_this_key); - auto num_removed = std::distance(new_end, config_lines_.end()); - config_lines_.erase(new_end, config_lines_.end()); - return gsl::narrow<int>(num_removed); -} - -void ConfigFile::writeTo(const std::string& file_path) const { - try { - std::ofstream file{file_path}; - file.exceptions(std::ios::failbit | std::ios::badbit); - - for (const auto& config_line : config_lines_) { - file << config_line.getLine() << '\n'; - } - } catch (const std::exception&) { - throw std::runtime_error{"Could not write to file " + file_path}; - } -} - std::vector<std::string> ConfigFile::getSensitiveProperties() const { std::vector<std::string> sensitive_properties(DEFAULT_SENSITIVE_PROPERTIES.begin(), DEFAULT_SENSITIVE_PROPERTIES.end()); const utils::optional<std::string> additional_sensitive_props_list = getValue(ADDITIONAL_SENSITIVE_PROPS_PROPERTY_NAME); diff --git a/encrypt-config/ConfigFile.h b/encrypt-config/ConfigFile.h index 8eba793..61b7a0f 100644 --- a/encrypt-config/ConfigFile.h +++ b/encrypt-config/ConfigFile.h @@ -22,6 +22,7 @@ #include "utils/EncryptionUtils.h" #include "utils/OptionalUtils.h" +#include "properties/PropertiesFile.h" namespace org { namespace apache { @@ -29,55 +30,17 @@ namespace nifi { namespace minifi { namespace encrypt_config { -class ConfigLine { +class ConfigFile : public PropertiesFile { public: - explicit ConfigLine(std::string line); - ConfigLine(std::string key, std::string value); - - void updateValue(const std::string& value); - - std::string getLine() const { return line_; } - std::string getKey() const { return key_; } - std::string getValue() const { return value_; } - - private: - // NOTE(fgerlits): having both line_ and { key_, value } is redundant in many cases, but - // * we need the original line_ in order to preserve formatting, comments and blank lines - // * we could get rid of key_ and value_ and parse them each time from line_, but I think the code is clearer this way - std::string line_; - std::string key_; - std::string value_; -}; - -class ConfigFile { - public: - explicit ConfigFile(std::istream& input_stream); - explicit ConfigFile(std::istream&& input_stream) : ConfigFile{input_stream} {} - - bool hasValue(const std::string& key) const; - utils::optional<std::string> getValue(const std::string& key) const; - void update(const std::string& key, const std::string& value); - void insertAfter(const std::string& after_key, const std::string& key, const std::string& value); - void append(const std::string& key, const std::string& value); - int erase(const std::string& key); - - void writeTo(const std::string& file_path) const; - - size_t size() const { return config_lines_.size(); } + using PropertiesFile::PropertiesFile; std::vector<std::string> getSensitiveProperties() const; private: friend class ConfigFileTestAccessor; friend bool operator==(const ConfigFile&, const ConfigFile&); - using Lines = std::vector<ConfigLine>; - - Lines::const_iterator findKey(const std::string& key) const; - Lines::iterator findKey(const std::string& key); static std::vector<std::string> mergeProperties(std::vector<std::string> properties, const std::vector<std::string>& additional_properties); - - Lines config_lines_; }; } // namespace encrypt_config @@ -85,3 +48,4 @@ class ConfigFile { } // namespace nifi } // namespace apache } // namespace org + diff --git a/encrypt-config/tests/ConfigFileEncryptorTests.cpp b/encrypt-config/tests/ConfigFileEncryptorTests.cpp index e20791a..8d07abb 100644 --- a/encrypt-config/tests/ConfigFileEncryptorTests.cpp +++ b/encrypt-config/tests/ConfigFileEncryptorTests.cpp @@ -46,15 +46,17 @@ namespace org { namespace apache { namespace nifi { namespace minifi { -namespace encrypt_config { // NOTE(fgerlits): these ==/!= operators are in the test file on purpose, and should not be part of production code, // as they take a varying amount of time depending on which character in the line differs, so they would open up // our code to timing attacks. If you need == in production code, make sure to compare all pairs of chars/lines. -bool operator==(const ConfigLine& left, const ConfigLine& right) { return left.getLine() == right.getLine(); } -bool operator!=(const ConfigLine& left, const ConfigLine& right) { return !(left == right); } -bool operator==(const ConfigFile& left, const ConfigFile& right) { return left.config_lines_ == right.config_lines_; } -bool operator!=(const ConfigFile& left, const ConfigFile& right) { return !(left == right); } +bool operator==(const PropertiesFile::Line& left, const PropertiesFile::Line& right) { return left.getLine() == right.getLine(); } + +namespace encrypt_config { + +bool operator!=(const ConfigFile::Line& left, const ConfigFile::Line& right) { return !(left == right); } +bool operator==(const ConfigFile& left, const ConfigFile& right) { return left.lines_ == right.lines_; } +bool operator!=(const ConfigFile& left, const ConfigFile& right) { return !(left == right); } } // namespace encrypt_config } // namespace minifi diff --git a/encrypt-config/tests/ConfigFileTests.cpp b/encrypt-config/tests/ConfigFileTests.cpp index 36cc214..04e7a94 100644 --- a/encrypt-config/tests/ConfigFileTests.cpp +++ b/encrypt-config/tests/ConfigFileTests.cpp @@ -43,11 +43,10 @@ class ConfigFileTestAccessor { using org::apache::nifi::minifi::encrypt_config::ConfigFile; using org::apache::nifi::minifi::encrypt_config::ConfigFileTestAccessor; -using org::apache::nifi::minifi::encrypt_config::ConfigLine; TEST_CASE("ConfigLine can be constructed from a line", "[encrypt-config][constructor]") { auto line_is_parsed_correctly = [](const std::string& line, const std::string& expected_key, const std::string& expected_value) { - ConfigLine config_line{line}; + ConfigFile::Line config_line{line}; return config_line.getKey() == expected_key && config_line.getValue() == expected_value; }; @@ -69,7 +68,7 @@ TEST_CASE("ConfigLine can be constructed from a line", "[encrypt-config][constru TEST_CASE("ConfigLine can be constructed from a key-value pair", "[encrypt-config][constructor]") { auto can_construct_from_kv = [](const std::string& key, const std::string& value, const std::string& expected_line) { - ConfigLine config_line{key, value}; + ConfigFile::Line config_line{key, value}; return config_line.getLine() == expected_line; }; @@ -79,7 +78,7 @@ TEST_CASE("ConfigLine can be constructed from a key-value pair", "[encrypt-confi TEST_CASE("ConfigLine can update the value", "[encrypt-config][updateValue]") { auto can_update_value = [](const std::string& original_line, const std::string& new_value, const std::string& expected_line) { - ConfigLine config_line{original_line}; + ConfigFile::Line config_line{original_line}; config_line.updateValue(new_value); return config_line.getLine() == expected_line; }; diff --git a/extensions/http-curl/tests/C2UpdateTest.cpp b/extensions/http-curl/tests/C2FetchFlowIfMissingTest.cpp similarity index 65% copy from extensions/http-curl/tests/C2UpdateTest.cpp copy to extensions/http-curl/tests/C2FetchFlowIfMissingTest.cpp index 24d4693..12133cc 100644 --- a/extensions/http-curl/tests/C2UpdateTest.cpp +++ b/extensions/http-curl/tests/C2FetchFlowIfMissingTest.cpp @@ -19,23 +19,27 @@ #undef NDEBUG #include "HTTPIntegrationBase.h" #include "HTTPHandlers.h" -#include "utils/gsl.h" #include "utils/IntegrationTestUtils.h" +#include "utils/file/PathUtils.h" int main(int argc, char **argv) { - const cmd_args args = parse_cmdline_args(argc, argv, "update"); - C2UpdateHandler handler(args.test_file); - VerifyC2Update harness(10000); + TestController controller; + char format[] = "/var/tmp/c2.XXXXXX"; + std::string minifi_home = controller.createTempDirectory(format); + const cmd_args args = parse_cmdline_args(argc, argv); + C2FlowProvider handler(args.test_file); + VerifyFlowFetched harness(10000); harness.setKeyDir(args.key_dir); harness.setUrl(args.url, &handler); - handler.setC2RestResponse(harness.getC2RestUrl(), "configuration"); + harness.setFlowUrl(harness.getC2RestUrl()); - const auto start = std::chrono::system_clock::now(); + std::string config_path = utils::file::PathUtils::concat_path(minifi_home, "config.yml"); - harness.run(args.test_file); + harness.run(config_path); + + // check existence of the config file + assert(std::ifstream{config_path}); - const auto then = std::chrono::system_clock::now(); - const auto seconds = std::chrono::duration_cast<std::chrono::seconds>(then - start).count(); - assert(handler.calls_ <= gsl::narrow<size_t>(seconds + 1)); return 0; } + diff --git a/extensions/http-curl/tests/C2UpdateAgentTest.cpp b/extensions/http-curl/tests/C2UpdateAgentTest.cpp index 5e61f7e..8c91363 100644 --- a/extensions/http-curl/tests/C2UpdateAgentTest.cpp +++ b/extensions/http-curl/tests/C2UpdateAgentTest.cpp @@ -35,6 +35,6 @@ int main(int argc, char **argv) { const auto then = std::chrono::system_clock::now(); const auto seconds = std::chrono::duration_cast<std::chrono::seconds>(then - start).count(); - assert(handler.calls_ <= gsl::narrow<size_t>(seconds + 2)); + assert(handler.getCallCount() <= gsl::narrow<size_t>(seconds + 2)); return 0; } diff --git a/extensions/http-curl/tests/C2UpdateTest.cpp b/extensions/http-curl/tests/C2UpdateTest.cpp index 24d4693..c63dfc3 100644 --- a/extensions/http-curl/tests/C2UpdateTest.cpp +++ b/extensions/http-curl/tests/C2UpdateTest.cpp @@ -36,6 +36,6 @@ int main(int argc, char **argv) { const auto then = std::chrono::system_clock::now(); const auto seconds = std::chrono::duration_cast<std::chrono::seconds>(then - start).count(); - assert(handler.calls_ <= gsl::narrow<size_t>(seconds + 1)); + assert(handler.getCallCount() <= gsl::narrow<size_t>(seconds + 1)); return 0; } diff --git a/extensions/http-curl/tests/CMakeLists.txt b/extensions/http-curl/tests/CMakeLists.txt index 04a9627..e6ff766 100644 --- a/extensions/http-curl/tests/CMakeLists.txt +++ b/extensions/http-curl/tests/CMakeLists.txt @@ -71,6 +71,7 @@ add_test(NAME HTTPStreamingCallbackTests COMMAND "HTTPStreamingCallbackTests" WO add_test(NAME HttpGetIntegrationTest COMMAND HttpGetIntegrationTest "${TEST_RESOURCES}/TestHTTPGet.yml" "${TEST_RESOURCES}/") add_test(NAME C2UpdateTest COMMAND C2UpdateTest "${TEST_RESOURCES}/TestHTTPGet.yml" "${TEST_RESOURCES}/") +add_test(NAME C2FetchFlowIfMissingTest COMMAND C2FetchFlowIfMissingTest "${TEST_RESOURCES}/TestEmpty.yml" "${TEST_RESOURCES}/") add_test(NAME C2ConfigEncryption COMMAND C2ConfigEncryption "${TEST_RESOURCES}/decrypted.config.yml" "${TEST_RESOURCES}/") add_test(NAME C2JstackTest COMMAND C2JstackTest "${TEST_RESOURCES}/TestHTTPGet.yml" "${TEST_RESOURCES}/") add_test(NAME C2DescribeManifestTest COMMAND C2DescribeManifestTest "${TEST_RESOURCES}/TestHTTPGet.yml" "${TEST_RESOURCES}/") diff --git a/extensions/http-curl/tests/HTTPHandlers.h b/extensions/http-curl/tests/HTTPHandlers.h index 4e7d62b..6867c77 100644 --- a/extensions/http-curl/tests/HTTPHandlers.h +++ b/extensions/http-curl/tests/HTTPHandlers.h @@ -448,20 +448,20 @@ class HeartbeatHandler : public ServerAwareHandler { } }; -class C2UpdateHandler : public ServerAwareHandler { +class C2FlowProvider : public ServerAwareHandler { public: - explicit C2UpdateHandler(const std::string& test_file_location) - : test_file_location_(test_file_location) { + explicit C2FlowProvider(std::string test_file_location) + : test_file_location_(std::move(test_file_location)) { } - bool handlePost(CivetServer* /*server*/, struct mg_connection *conn) override { - calls_++; - if (!response_.empty()) { + bool handleGet(CivetServer* /*server*/, struct mg_connection *conn) override { + std::ifstream myfile(test_file_location_.c_str(), std::ios::in | std::ios::binary); + if (myfile.good()) { + std::string str((std::istreambuf_iterator<char>(myfile)), (std::istreambuf_iterator<char>())); mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: " - "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n", - response_.length()); - mg_printf(conn, "%s", response_.c_str()); - response_.clear(); + "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n", + str.length()); + mg_printf(conn, "%s", str.c_str()); } else { mg_printf(conn, "HTTP/1.1 500 Internal Server Error\r\n"); } @@ -469,14 +469,22 @@ class C2UpdateHandler : public ServerAwareHandler { return true; } - bool handleGet(CivetServer* /*server*/, struct mg_connection *conn) override { - std::ifstream myfile(test_file_location_.c_str(), std::ios::in | std::ios::binary); - if (myfile.good()) { - std::string str((std::istreambuf_iterator<char>(myfile)), (std::istreambuf_iterator<char>())); + private: + const std::string test_file_location_; +}; + +class C2UpdateHandler : public C2FlowProvider { + public: + using C2FlowProvider::C2FlowProvider; + + bool handlePost(CivetServer* /*server*/, struct mg_connection *conn) override { + calls_++; + if (!response_.empty()) { mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: " "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n", - str.length()); - mg_printf(conn, "%s", str.c_str()); + response_.length()); + mg_printf(conn, "%s", response_.c_str()); + response_.clear(); } else { mg_printf(conn, "HTTP/1.1 500 Internal Server Error\r\n"); } @@ -499,9 +507,14 @@ class C2UpdateHandler : public ServerAwareHandler { "\"content\": " + content + "}]}"; } + size_t getCallCount() const { + return calls_; + } + + protected: std::atomic<size_t> calls_{0}; + private: - std::string test_file_location_; std::string response_; }; diff --git a/extensions/http-curl/tests/HTTPIntegrationBase.h b/extensions/http-curl/tests/HTTPIntegrationBase.h index 29e5672..0c01f04 100644 --- a/extensions/http-curl/tests/HTTPIntegrationBase.h +++ b/extensions/http-curl/tests/HTTPIntegrationBase.h @@ -189,6 +189,38 @@ public: } }; +class VerifyFlowFetched : public HTTPIntegrationBase { + public: + using HTTPIntegrationBase::HTTPIntegrationBase; + + void testSetup() override { + LogTestController::getInstance().setInfo<minifi::FlowController>(); + LogTestController::getInstance().setDebug<minifi::utils::HTTPClient>(); + LogTestController::getInstance().setDebug<minifi::c2::RESTSender>(); + LogTestController::getInstance().setDebug<minifi::c2::C2Agent>(); + } + + void configureC2() override { + configuration->set("nifi.c2.agent.protocol.class", "RESTSender"); + configuration->set("nifi.c2.enable", "true"); + configuration->set("nifi.c2.agent.class", "test"); + configuration->set("nifi.c2.agent.heartbeat.period", "1000"); + } + + void setFlowUrl(const std::string& url) { + configuration->set(minifi::Configure::nifi_c2_flow_url, url); + } + + void cleanup() override { + LogTestController::getInstance().reset(); + } + + void runAssertions() override { + using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime; + assert(verifyLogLinePresenceInPollTime(std::chrono::seconds(10), "Successfully fetched valid flow configuration")); + } +}; + class VerifyC2UpdateAgent : public VerifyC2Update { public: explicit VerifyC2UpdateAgent(uint64_t waitTime) diff --git a/libminifi/CMakeLists.txt b/libminifi/CMakeLists.txt index 9366ed6..c7140ca 100644 --- a/libminifi/CMakeLists.txt +++ b/libminifi/CMakeLists.txt @@ -89,7 +89,7 @@ if (NOT OPENSSL_OFF) set(TLS_SOURCES "src/utils/tls/*.cpp" "src/io/tls/*.cpp") endif() -file(GLOB SOURCES "src/utils/file/*.cpp" "src/sitetosite/*.cpp" "src/core/logging/*.cpp" "src/core/state/*.cpp" "src/core/state/nodes/*.cpp" "src/c2/protocols/*.cpp" "src/c2/triggers/*.cpp" "src/c2/*.cpp" "src/io/*.cpp" ${SOCKET_SOURCES} ${TLS_SOURCES} "src/core/controller/*.cpp" "src/controllers/*.cpp" "src/controllers/keyvalue/*.cpp" "src/core/*.cpp" "src/core/repository/*.cpp" "src/core/yaml/*.cpp" "src/core/reporting/*.cpp" "src/serialization/*.cpp" "src/provenance/*.cpp" "src/ut [...] +file(GLOB SOURCES "src/properties/*.cpp" "src/utils/file/*.cpp" "src/sitetosite/*.cpp" "src/core/logging/*.cpp" "src/core/state/*.cpp" "src/core/state/nodes/*.cpp" "src/c2/protocols/*.cpp" "src/c2/triggers/*.cpp" "src/c2/*.cpp" "src/io/*.cpp" ${SOCKET_SOURCES} ${TLS_SOURCES} "src/core/controller/*.cpp" "src/controllers/*.cpp" "src/controllers/keyvalue/*.cpp" "src/core/*.cpp" "src/core/repository/*.cpp" "src/core/yaml/*.cpp" "src/core/reporting/*.cpp" "src/serialization/*.cpp" "src/pro [...] # manually add this as it might not yet be present when this executes list(APPEND SOURCES "src/agent/agent_version.cpp") diff --git a/libminifi/include/FlowController.h b/libminifi/include/FlowController.h index 6c4a00e..9ed88a3 100644 --- a/libminifi/include/FlowController.h +++ b/libminifi/include/FlowController.h @@ -35,6 +35,7 @@ #include "Connection.h" #include "core/controller/ControllerServiceNode.h" #include "core/controller/ControllerServiceProvider.h" +#include "core/controller/ForwardingControllerServiceProvider.h" #include "core/FlowConfiguration.h" #include "core/logging/Logger.h" #include "core/ProcessContext.h" @@ -68,7 +69,7 @@ namespace minifi { * Flow Controller class. Generally used by FlowController factory * as a singleton. */ -class FlowController : public core::controller::ControllerServiceProvider, public state::StateMonitor, public c2::C2Client, public std::enable_shared_from_this<FlowController> { +class FlowController : public core::controller::ForwardingControllerServiceProvider, public state::StateMonitor, public c2::C2Client, public std::enable_shared_from_this<FlowController> { public: FlowController(std::shared_ptr<core::Repository> provenance_repo, std::shared_ptr<core::Repository> flow_file_repo, std::shared_ptr<Configure> configure, std::unique_ptr<core::FlowConfiguration> flow_configuration, @@ -182,119 +183,6 @@ class FlowController : public core::controller::ControllerServiceProvider, publ } /** - * Creates a controller service through the controller service provider impl. - * @param type class name - * @param id service identifier - * @param firstTimeAdded first time this CS was added - */ - std::shared_ptr<core::controller::ControllerServiceNode> createControllerService(const std::string &type, const std::string &fullType, const std::string &id, bool firstTimeAdded) override; - - /** - * controller service provider - */ - /** - * removes controller service - * @param serviceNode service node to be removed. - */ - - void removeControllerService(const std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) override; - - /** - * Enables the controller service services - * @param serviceNode service node which will be disabled, along with linked services. - */ - std::future<utils::TaskRescheduleInfo> enableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) override; - - /** - * Enables controller services - * @param serviceNoden vector of service nodes which will be enabled, along with linked services. - */ - void enableControllerServices(std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> serviceNodes) override; - - /** - * Disables controller services - * @param serviceNode service node which will be disabled, along with linked services. - */ - std::future<utils::TaskRescheduleInfo> disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) override; - - /** - * Removes all controller services. - */ - void clearControllerServices() override; - - /** - * Gets all controller services. - */ - std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> getAllControllerServices() override; - - std::shared_ptr<core::controller::ControllerService> getControllerService(const std::string &identifier) override; - - /** - * Gets controller service node specified by <code>id</code> - * @param id service identifier - * @return shared pointer to the controller service node or nullptr if it does not exist. - */ - std::shared_ptr<core::controller::ControllerServiceNode> getControllerServiceNode(const std::string &id) const override; - - void verifyCanStopReferencingComponents(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) override; - - /** - * Unschedules referencing components. - */ - std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> unscheduleReferencingComponents(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) override; - - /** - * Verify can disable referencing components - * @param serviceNode service node whose referenced components will be scheduled. - */ - void verifyCanDisableReferencingServices(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) override; - - /** - * Disables referencing components - * @param serviceNode service node whose referenced components will be scheduled. - */ - std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> disableReferencingServices(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) override; - - /** - * Verify can enable referencing components - * @param serviceNode service node whose referenced components will be scheduled. - */ - void verifyCanEnableReferencingServices(std::shared_ptr<core::controller::ControllerServiceNode>&) override; - - /** - * Determines if the controller service specified by identifier is enabled. - */ - bool isControllerServiceEnabled(const std::string &identifier) override; - - /** - * Enables referencing components - * @param serviceNode service node whose referenced components will be scheduled. - */ - std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> enableReferencingServices(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) override; - - /** - * Schedules referencing components - * @param serviceNode service node whose referenced components will be scheduled. - */ - std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> scheduleReferencingComponents(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) override; - - /** - * Returns controller service components referenced by serviceIdentifier from the embedded - * controller service provider; - */ - std::shared_ptr<core::controller::ControllerService> getControllerServiceForComponent(const std::string &serviceIdentifier, const utils::Identifier &componentId) override; - - /** - * Enables all controller services for the provider. - */ - void enableAllControllerServices() override; - - /** - * Disables all controller services for the provider. - */ - void disableAllControllerServices() override; - - /** * Retrieves the agent manifest to be sent as a response to C2 DESCRIBE manifest * @return the agent manifest response node */ @@ -304,6 +192,14 @@ class FlowController : public core::controller::ControllerServiceProvider, publ std::vector<BackTrace> getTraces() override; + private: + /** + * Loads the flow as specified in the flow config file or if not present + * tries to fetch it from the C2 server (if enabled). + * @return the built flow + */ + std::unique_ptr<core::ProcessGroup> loadInitialFlow(); + protected: // function to load the flow file repo. void loadFlowRepo(); @@ -339,8 +235,6 @@ class FlowController : public core::controller::ControllerServiceProvider, publ std::shared_ptr<CronDrivenSchedulingAgent> cron_scheduler_; // FlowControl Protocol std::unique_ptr<FlowControlProtocol> protocol_; - std::shared_ptr<core::controller::ControllerServiceMap> controller_service_map_; - std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider_; // metrics information std::chrono::steady_clock::time_point start_time_; diff --git a/libminifi/include/c2/C2Agent.h b/libminifi/include/c2/C2Agent.h index 4e61300..9cc6c28 100644 --- a/libminifi/include/c2/C2Agent.h +++ b/libminifi/include/c2/C2Agent.h @@ -87,6 +87,8 @@ class C2Agent : public state::UpdateController { return heart_beat_period_; } + utils::optional<std::string> fetchFlow(const std::string& uri) const; + protected: void restart_agent(); @@ -161,8 +163,6 @@ class C2Agent : public state::UpdateController { bool handleConfigurationUpdate(const C2ContentResponse &resp); - utils::optional<std::string> fetchFlow(const std::string& uri) const; - protected: std::timed_mutex metrics_mutex_; std::map<std::string, std::shared_ptr<state::response::ResponseNode>> metrics_map_; diff --git a/libminifi/include/c2/C2Client.h b/libminifi/include/c2/C2Client.h index 7f81761..982d6cf 100644 --- a/libminifi/include/c2/C2Client.h +++ b/libminifi/include/c2/C2Client.h @@ -23,7 +23,7 @@ #include <map> #include <string> #include <mutex> -#include "core/state/UpdateController.h" +#include "c2/C2Agent.h" #include "core/controller/ControllerServiceProvider.h" #include "properties/Configure.h" #include "core/logging/Logger.h" @@ -58,6 +58,7 @@ class C2Client : public core::Flow, public state::response::NodeReporter { protected: bool isC2Enabled() const; + utils::optional<std::string> fetchFlow(const std::string& uri) const; private: void initializeComponentMetrics(); @@ -67,9 +68,9 @@ class C2Client : public core::Flow, public state::response::NodeReporter { protected: std::shared_ptr<Configure> configuration_; std::shared_ptr<utils::file::FileSystem> filesystem_; - std::unique_ptr<state::UpdateController> c2_agent_; private: + std::unique_ptr<C2Agent> c2_agent_; std::atomic_bool initialized_{false}; std::shared_ptr<logging::Logger> logger_; diff --git a/libminifi/include/core/controller/ControllerServiceProvider.h b/libminifi/include/core/controller/ControllerServiceProvider.h index 3c99265..24861eb 100644 --- a/libminifi/include/core/controller/ControllerServiceProvider.h +++ b/libminifi/include/core/controller/ControllerServiceProvider.h @@ -225,7 +225,7 @@ class ControllerServiceProvider : public CoreComponent, public ConfigurableCompo virtual void disableAllControllerServices() = 0; - virtual bool supportsDynamicProperties() { + bool supportsDynamicProperties() final { return false; } diff --git a/libminifi/include/core/controller/ForwardingControllerServiceProvider.h b/libminifi/include/core/controller/ForwardingControllerServiceProvider.h new file mode 100644 index 0000000..9aa4114 --- /dev/null +++ b/libminifi/include/core/controller/ForwardingControllerServiceProvider.h @@ -0,0 +1,136 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <memory> +#include <vector> +#include <string> + +#include "ControllerServiceProvider.h" +#include "ControllerServiceNode.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { +namespace controller { + +class ForwardingControllerServiceProvider : public ControllerServiceProvider { + public: + using ControllerServiceProvider::ControllerServiceProvider; + + std::shared_ptr<ControllerServiceNode> createControllerService(const std::string &type, const std::string &longType, const std::string &id, bool firstTimeAdded) override { + return controller_service_provider_impl_->createControllerService(type, longType, id, firstTimeAdded); + } + + std::shared_ptr<ControllerServiceNode> getControllerServiceNode(const std::string &id) const override { + return controller_service_provider_impl_->getControllerServiceNode(id); + } + + void removeControllerService(const std::shared_ptr<ControllerServiceNode> &serviceNode) override { + return controller_service_provider_impl_->removeControllerService(serviceNode); + } + + std::future<utils::TaskRescheduleInfo> enableControllerService(std::shared_ptr<ControllerServiceNode> &serviceNode) override { + return controller_service_provider_impl_->enableControllerService(serviceNode); + } + + void enableControllerServices(std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> serviceNodes) override { + return controller_service_provider_impl_->enableControllerServices(serviceNodes); + } + + std::future<utils::TaskRescheduleInfo> disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) override { + return controller_service_provider_impl_->disableControllerService(serviceNode); + } + + void clearControllerServices() override { + return controller_service_provider_impl_->clearControllerServices(); + } + + std::shared_ptr<ControllerService> getControllerService(const std::string &identifier) override { + return controller_service_provider_impl_->getControllerService(identifier); + } + + std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> getAllControllerServices() override { + return controller_service_provider_impl_->getAllControllerServices(); + } + + std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> unscheduleReferencingComponents(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) override { + return controller_service_provider_impl_->unscheduleReferencingComponents(serviceNode); + } + + void verifyCanEnableReferencingServices(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) override { + return controller_service_provider_impl_->verifyCanEnableReferencingServices(serviceNode); + } + + void verifyCanDisableReferencingServices(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) override { + return controller_service_provider_impl_->verifyCanDisableReferencingServices(serviceNode); + } + + void verifyCanStopReferencingComponents(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) override { + return controller_service_provider_impl_->verifyCanStopReferencingComponents(serviceNode); + } + + std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> disableReferencingServices(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) override { + return controller_service_provider_impl_->disableReferencingServices(serviceNode); + } + + std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> enableReferencingServices(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) override { + return controller_service_provider_impl_->enableReferencingServices(serviceNode); + } + + std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> scheduleReferencingComponents(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) override { + return controller_service_provider_impl_->scheduleReferencingComponents(serviceNode); + } + + std::shared_ptr<ControllerService> getControllerServiceForComponent(const std::string &serviceIdentifier, const utils::Identifier &componentId) override { + return controller_service_provider_impl_->getControllerServiceForComponent(serviceIdentifier, componentId); + } + + bool isControllerServiceEnabled(const std::string &identifier) override { + return controller_service_provider_impl_->isControllerServiceEnabled(identifier); + } + + bool isControllerServiceEnabling(const std::string &identifier) override { + return controller_service_provider_impl_->isControllerServiceEnabling(identifier); + } + + const std::string getControllerServiceName(const std::string &identifier) override { + return controller_service_provider_impl_->getControllerServiceName(identifier); + } + + void enableAllControllerServices() override { + return controller_service_provider_impl_->enableAllControllerServices(); + } + + void disableAllControllerServices() override { + return controller_service_provider_impl_->disableAllControllerServices(); + } + + protected: + std::shared_ptr<ControllerServiceProvider> controller_service_provider_impl_; +}; + +} // namespace controller +} // namespace core +} // namespace minifi +} // namespace nifi +} // namespace apache +} // namespace org diff --git a/libminifi/include/properties/Properties.h b/libminifi/include/properties/Properties.h index 33f8a3a..1b3cc94 100644 --- a/libminifi/include/properties/Properties.h +++ b/libminifi/include/properties/Properties.h @@ -24,6 +24,7 @@ #include <vector> #include <string> #include <map> +#include <utility> #include "core/logging/Logger.h" #include "utils/OptionalUtils.h" @@ -34,6 +35,11 @@ namespace nifi { namespace minifi { class Properties { + struct PropertyValue { + std::string value; + bool changed; + }; + public: Properties(const std::string& name = ""); // NOLINT @@ -51,11 +57,11 @@ class Properties { // Set the config value void set(const std::string &key, const std::string &value) { std::lock_guard<std::mutex> lock(mutex_); - properties_[key] = value; + properties_[key] = PropertyValue{value, true}; dirty_ = true; } // Check whether the config value existed - bool has(std::string key) const { + bool has(const std::string& key) const { std::lock_guard<std::mutex> lock(mutex_); return properties_.count(key) > 0; } @@ -79,18 +85,7 @@ class Properties { * @param key key to look up * @returns the value if found, nullopt otherwise. */ - utils::optional<std::string> getString(const std::string& key) const { - std::string result; - const bool found = getString(key, result); - if (found) { - return result; - } else { - return utils::nullopt; - } - } - - // Parse one line in configure file like key=value - bool parseConfigureFileLine(char *buf, std::string &prop_key, std::string &prop_value); + utils::optional<std::string> getString(const std::string& key) const; /** * Load configure file @@ -100,7 +95,7 @@ class Properties { // Set the determined MINIFI_HOME void setHome(std::string minifiHome) { - minifi_home_ = minifiHome; + minifi_home_ = std::move(minifiHome); } std::vector<std::string> getConfiguredKeys() const { @@ -115,19 +110,16 @@ class Properties { std::string getHome() const { return minifi_home_; } - // Parse Command Line - void parseCommandLine(int argc, char **argv); bool persistProperties(); protected: - bool validateConfigurationFile(const std::string &file); - - std::map<std::string, std::string> properties_; - + std::map<std::string, std::string> getProperties() const; private: - std::atomic<bool> dirty_; + std::map<std::string, PropertyValue> properties_; + + bool dirty_{false}; std::string properties_file_; diff --git a/encrypt-config/ConfigFile.h b/libminifi/include/properties/PropertiesFile.h similarity index 55% copy from encrypt-config/ConfigFile.h copy to libminifi/include/properties/PropertiesFile.h index 8eba793..721d697 100644 --- a/encrypt-config/ConfigFile.h +++ b/libminifi/include/properties/PropertiesFile.h @@ -27,32 +27,34 @@ namespace org { namespace apache { namespace nifi { namespace minifi { -namespace encrypt_config { -class ConfigLine { +class PropertiesFile { public: - explicit ConfigLine(std::string line); - ConfigLine(std::string key, std::string value); - - void updateValue(const std::string& value); - - std::string getLine() const { return line_; } - std::string getKey() const { return key_; } - std::string getValue() const { return value_; } - - private: - // NOTE(fgerlits): having both line_ and { key_, value } is redundant in many cases, but - // * we need the original line_ in order to preserve formatting, comments and blank lines - // * we could get rid of key_ and value_ and parse them each time from line_, but I think the code is clearer this way - std::string line_; - std::string key_; - std::string value_; -}; + class Line { + public: + explicit Line(std::string line); + Line(std::string key, std::string value); -class ConfigFile { - public: - explicit ConfigFile(std::istream& input_stream); - explicit ConfigFile(std::istream&& input_stream) : ConfigFile{input_stream} {} + void updateValue(const std::string& value); + + std::string getLine() const { return line_; } + std::string getKey() const { return key_; } + std::string getValue() const { return value_; } + + static bool isValidKey(const std::string& key); + + private: + friend bool operator==(const Line&, const Line&); + // NOTE(fgerlits): having both line_ and { key_, value } is redundant in many cases, but + // * we need the original line_ in order to preserve formatting, comments and blank lines + // * we could get rid of key_ and value_ and parse them each time from line_, but I think the code is clearer this way + std::string line_; + std::string key_; + std::string value_; + }; + + explicit PropertiesFile(std::istream& input_stream); + explicit PropertiesFile(std::istream&& input_stream) : PropertiesFile{input_stream} {} bool hasValue(const std::string& key) const; utils::optional<std::string> getValue(const std::string& key) const; @@ -63,24 +65,22 @@ class ConfigFile { void writeTo(const std::string& file_path) const; - size_t size() const { return config_lines_.size(); } - - std::vector<std::string> getSensitiveProperties() const; + size_t size() const { return lines_.size(); } - private: - friend class ConfigFileTestAccessor; - friend bool operator==(const ConfigFile&, const ConfigFile&); - using Lines = std::vector<ConfigLine>; + protected: + using Lines = std::vector<Line>; Lines::const_iterator findKey(const std::string& key) const; Lines::iterator findKey(const std::string& key); - static std::vector<std::string> mergeProperties(std::vector<std::string> properties, - const std::vector<std::string>& additional_properties); - Lines config_lines_; + public: + Lines::const_iterator begin() const; + Lines::const_iterator end() const; + + protected: + Lines lines_; }; -} // namespace encrypt_config } // namespace minifi } // namespace nifi } // namespace apache diff --git a/libminifi/include/utils/file/FileUtils.h b/libminifi/include/utils/file/FileUtils.h index 721b455..c65473a 100644 --- a/libminifi/include/utils/file/FileUtils.h +++ b/libminifi/include/utils/file/FileUtils.h @@ -403,7 +403,7 @@ inline int create_dir(const std::string& path, bool recursive = true) { #endif } -inline int copy_file(const std::string &path_from, const std::string dest_path) { +inline int copy_file(const std::string &path_from, const std::string& dest_path) { std::ifstream src(path_from, std::ios::binary); if (!src.is_open()) return -1; diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp index fc92e36..25c7036 100644 --- a/libminifi/src/FlowController.cpp +++ b/libminifi/src/FlowController.cpp @@ -43,6 +43,7 @@ #include "core/ClassLoader.h" #include "SchedulingAgent.h" #include "core/controller/ControllerServiceProvider.h" +#include "core/controller/ForwardingControllerServiceProvider.h" #include "core/logging/LoggerConfiguration.h" #include "core/Connectable.h" #include "utils/file/PathUtils.h" @@ -61,13 +62,12 @@ FlowController::FlowController(std::shared_ptr<core::Repository> provenance_repo std::shared_ptr<Configure> configure, std::unique_ptr<core::FlowConfiguration> flow_configuration, std::shared_ptr<core::ContentRepository> content_repo, const std::string /*name*/, bool headless_mode, std::shared_ptr<utils::file::FileSystem> filesystem) - : core::controller::ControllerServiceProvider(core::getClassName<FlowController>()), + : core::controller::ForwardingControllerServiceProvider(core::getClassName<FlowController>()), c2::C2Client(std::move(configure), std::move(provenance_repo), std::move(flow_file_repo), std::move(content_repo), std::move(flow_configuration), std::move(filesystem)), running_(false), updating_(false), initialized_(false), - controller_service_map_(std::make_shared<core::controller::ControllerServiceMap>()), thread_pool_(2, false, nullptr, "Flowcontroller threadpool"), logger_(logging::LoggerFactory<FlowController>::getLogger()) { if (provenance_repo_ == nullptr) @@ -212,7 +212,7 @@ int16_t FlowController::stop() { this->flow_file_repo_->stop(); this->provenance_repo_->stop(); // stop the ControllerServices - this->controller_service_provider_->disableAllControllerServices(); + this->controller_service_provider_impl_->disableAllControllerServices(); running_ = false; } return 0; @@ -253,6 +253,37 @@ void FlowController::unload() { } } +std::unique_ptr<core::ProcessGroup> FlowController::loadInitialFlow() { + std::unique_ptr<core::ProcessGroup> root = flow_configuration_->getRoot(); + if (root) { + return root; + } + logger_->log_error("Couldn't load flow configuration file, trying to fetch it from C2 server"); + auto opt_flow_url = configuration_->get(Configure::nifi_c2_flow_url); + if (!opt_flow_url) { + logger_->log_error("No flow configuration url found"); + return nullptr; + } + // ensure that C2 connection is up and running + // since we don't have access to the flow definition, the C2 communication + // won't be able to use the services defined there, e.g. SSLContextService + controller_service_provider_impl_ = flow_configuration_->getControllerServiceProvider(); + C2Client::initialize(this, shared_from_this()); + auto opt_source = fetchFlow(*opt_flow_url); + if (!opt_source) { + logger_->log_error("Couldn't fetch flow configuration from C2 server"); + return nullptr; + } + root = flow_configuration_->updateFromPayload(*opt_flow_url, *opt_source); + if (root) { + logger_->log_info("Successfully fetched valid flow configuration"); + if (!flow_configuration_->persist(*opt_source)) { + logger_->log_info("Failed to write the fetched flow to disk"); + } + } + return root; +} + void FlowController::load(const std::shared_ptr<core::ProcessGroup> &root, bool reload) { std::lock_guard<std::recursive_mutex> flow_lock(mutex_); if (running_) { @@ -268,12 +299,12 @@ void FlowController::load(const std::shared_ptr<core::ProcessGroup> &root, bool this->root_ = root; } else { logger_->log_info("Instantiating new flow"); - this->root_ = std::shared_ptr<core::ProcessGroup>(flow_configuration_->getRoot()); + this->root_ = std::shared_ptr<core::ProcessGroup>(loadInitialFlow()); } logger_->log_info("Loaded root processor Group"); logger_->log_info("Initializing timers"); - controller_service_provider_ = flow_configuration_->getControllerServiceProvider(); + controller_service_provider_impl_ = flow_configuration_->getControllerServiceProvider(); auto base_shared_ptr = std::dynamic_pointer_cast<core::controller::ControllerServiceProvider>(shared_from_this()); if (!thread_pool_.isRunning() || reload) { @@ -287,8 +318,8 @@ void FlowController::load(const std::shared_ptr<core::ProcessGroup> &root, bool conditionalReloadScheduler<EventDrivenSchedulingAgent>(event_scheduler_, !event_scheduler_ || reload); conditionalReloadScheduler<CronDrivenSchedulingAgent>(cron_scheduler_, !cron_scheduler_ || reload); - std::static_pointer_cast<core::controller::StandardControllerServiceProvider>(controller_service_provider_)->setRootGroup(root_); - std::static_pointer_cast<core::controller::StandardControllerServiceProvider>(controller_service_provider_)->setSchedulingAgent( + std::static_pointer_cast<core::controller::StandardControllerServiceProvider>(controller_service_provider_impl_)->setRootGroup(root_); + std::static_pointer_cast<core::controller::StandardControllerServiceProvider>(controller_service_provider_impl_)->setSchedulingAgent( std::static_pointer_cast<minifi::SchedulingAgent>(event_scheduler_)); logger_->log_info("Loaded controller service provider"); @@ -337,7 +368,7 @@ int16_t FlowController::start() { } else { if (!running_) { logger_->log_info("Starting Flow Controller"); - controller_service_provider_->enableAllControllerServices(); + controller_service_provider_impl_->enableAllControllerServices(); this->timer_scheduler_->start(); this->event_scheduler_->start(); this->cron_scheduler_->start(); @@ -360,166 +391,6 @@ int16_t FlowController::start() { } } -/** - * Controller Service functions - * - */ - -/** - * Creates a controller service through the controller service provider impl. - * @param type class name - * @param id service identifier - * @param firstTimeAdded first time this CS was added - */ -std::shared_ptr<core::controller::ControllerServiceNode> FlowController::createControllerService(const std::string &type, const std::string &fullType, const std::string &id, bool firstTimeAdded) { - return controller_service_provider_->createControllerService(type, fullType, id, firstTimeAdded); -} - -/** - * controller service provider - */ -/** - * removes controller service - * @param serviceNode service node to be removed. - */ - -void FlowController::removeControllerService(const std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) { - controller_map_->removeControllerService(serviceNode); -} - -/** - * Enables the controller service services - * @param serviceNode service node which will be disabled, along with linked services. - */ -std::future<utils::TaskRescheduleInfo> FlowController::enableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) { - return controller_service_provider_->enableControllerService(serviceNode); -} - -/** - * Enables controller services - * @param serviceNoden vector of service nodes which will be enabled, along with linked services. - */ -void FlowController::enableControllerServices(std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> /*serviceNodes*/) { -} - -/** - * Disables controller services - * @param serviceNode service node which will be disabled, along with linked services. - */ -std::future<utils::TaskRescheduleInfo> FlowController::disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) { - return controller_service_provider_->disableControllerService(serviceNode); -} - -/** - * Removes all controller services. - */ -void FlowController::clearControllerServices() { - controller_service_provider_->clearControllerServices(); -} - -/** - * Gets all controller services. - */ -std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> FlowController::getAllControllerServices() { - return controller_service_provider_->getAllControllerServices(); -} - -/** - * Gets the controller service for <code>identifier</code> - * @param identifier service identifier - * @return shared pointer to teh controller service implementation or nullptr if it does not exist. - */ -std::shared_ptr<core::controller::ControllerService> FlowController::getControllerService(const std::string &identifier) { - return controller_service_provider_->getControllerService(identifier); -} -/** - * Gets controller service node specified by <code>id</code> - * @param id service identifier - * @return shared pointer to the controller service node or nullptr if it does not exist. - */ -std::shared_ptr<core::controller::ControllerServiceNode> FlowController::getControllerServiceNode(const std::string &id) const { - return controller_service_provider_->getControllerServiceNode(id); -} - -void FlowController::verifyCanStopReferencingComponents(std::shared_ptr<core::controller::ControllerServiceNode>& /*serviceNode*/) { -} - -/** - * Unschedules referencing components. - */ -std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> FlowController::unscheduleReferencingComponents(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) { - return controller_service_provider_->unscheduleReferencingComponents(serviceNode); -} - -/** - * Verify can disable referencing components - * @param serviceNode service node whose referenced components will be scheduled. - */ -void FlowController::verifyCanDisableReferencingServices(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) { - controller_service_provider_->verifyCanDisableReferencingServices(serviceNode); -} - -/** - * Disables referencing components - * @param serviceNode service node whose referenced components will be scheduled. - */ -std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> FlowController::disableReferencingServices(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) { - return controller_service_provider_->disableReferencingServices(serviceNode); -} - -/** - * Verify can enable referencing components - * @param serviceNode service node whose referenced components will be scheduled. - */ -void FlowController::verifyCanEnableReferencingServices(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) { - controller_service_provider_->verifyCanEnableReferencingServices(serviceNode); -} - -/** - * Determines if the controller service specified by identifier is enabled. - */ -bool FlowController::isControllerServiceEnabled(const std::string &identifier) { - return controller_service_provider_->isControllerServiceEnabled(identifier); -} - -/** - * Enables referencing components - * @param serviceNode service node whose referenced components will be scheduled. - */ -std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> FlowController::enableReferencingServices(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) { - return controller_service_provider_->enableReferencingServices(serviceNode); -} - -/** - * Schedules referencing components - * @param serviceNode service node whose referenced components will be scheduled. - */ -std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> FlowController::scheduleReferencingComponents(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) { - return controller_service_provider_->scheduleReferencingComponents(serviceNode); -} - -/** - * Returns controller service components referenced by serviceIdentifier from the embedded - * controller service provider; - */ -std::shared_ptr<core::controller::ControllerService> FlowController::getControllerServiceForComponent(const std::string &serviceIdentifier, const utils::Identifier &componentId) { - return controller_service_provider_->getControllerServiceForComponent(serviceIdentifier, componentId); -} - -/** - * Enables all controller services for the provider. - */ -void FlowController::enableAllControllerServices() { - controller_service_provider_->enableAllControllerServices(); -} - -/** - * Disables all controller services for the provider. - */ -void FlowController::disableAllControllerServices() { - controller_service_provider_->disableAllControllerServices(); -} - int16_t FlowController::applyUpdate(const std::string &source, const std::string &configuration, bool persist) { if (applyConfiguration(source, configuration)) { if (persist) { diff --git a/libminifi/src/Properties.cpp b/libminifi/src/Properties.cpp deleted file mode 100644 index 0e615f7..0000000 --- a/libminifi/src/Properties.cpp +++ /dev/null @@ -1,245 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include "properties/Properties.h" -#include <string> -#include "utils/StringUtils.h" -#include "utils/file/FileUtils.h" -#include "utils/file/PathUtils.h" -#include "core/Core.h" -#include "core/logging/LoggerConfiguration.h" - -namespace org { -namespace apache { -namespace nifi { -namespace minifi { - -#define TRACE_BUFFER_SIZE 512 - -Properties::Properties(const std::string& name) - : logger_(logging::LoggerFactory<Properties>::getLogger()), - name_(name) { -} - -// Get the config value -bool Properties::getString(const std::string &key, std::string &value) const { - std::lock_guard<std::mutex> lock(mutex_); - auto it = properties_.find(key); - - if (it != properties_.end()) { - value = it->second; - return true; - } else { - return false; - } -} - -int Properties::getInt(const std::string &key, int default_value) const { - std::lock_guard<std::mutex> lock(mutex_); - auto it = properties_.find(key); - - return it != properties_.end() ? std::stoi(it->second) : default_value; -} - -// Parse one line in configure file like key=value -bool Properties::parseConfigureFileLine(char *buf, std::string &prop_key, std::string &prop_value) { - char *line = buf; - - while ((line[0] == ' ') || (line[0] == '\t')) - ++line; - - char first = line[0]; - if ((first == '\0') || (first == '#') || (first == '[') || (first == '\r') || (first == '\n') || (first == '=')) { - return true; - } - - char *equal = strchr(line, '='); - if (equal == NULL) { - return false; // invalid property as this is not a comment or property line - } - - equal[0] = '\0'; - std::string key = line; - - equal++; - while ((equal[0] == ' ') || (equal[0] == '\t')) - ++equal; - - first = equal[0]; - if ((first == '\0') || (first == '\r') || (first == '\n')) { - return true; // empty properties are okay - } - - std::string value = equal; - value = org::apache::nifi::minifi::utils::StringUtils::replaceEnvironmentVariables(value); - prop_key = org::apache::nifi::minifi::utils::StringUtils::trimRight(key); - prop_value = org::apache::nifi::minifi::utils::StringUtils::trimRight(value); - return true; -} - -// Load Configure File -void Properties::loadConfigureFile(const char *fileName) { - if (NULL == fileName) { - logger_->log_error("Configuration file path for %s is a nullptr!", getName().c_str()); - return; - } - - properties_file_ = utils::file::getFullPath(utils::file::FileUtils::concat_path(getHome(), fileName)); - - logger_->log_info("Using configuration file to load configuration for %s from %s (located at %s)", getName().c_str(), fileName, properties_file_); - - std::ifstream file(properties_file_, std::ifstream::in); - if (!file.good()) { - logger_->log_error("load configure file failed %s", properties_file_); - return; - } - this->clear(); - - char buf[TRACE_BUFFER_SIZE]; - for (file.getline(buf, TRACE_BUFFER_SIZE); file.good(); file.getline(buf, TRACE_BUFFER_SIZE)) { - std::string key, value; - if (parseConfigureFileLine(buf, key, value)) { - set(key, value); - } - } - dirty_ = false; -} - -bool Properties::validateConfigurationFile(const std::string &configFile) { - std::ifstream file(configFile, std::ifstream::in); - if (!file.good()) { - logger_->log_error("Failed to load configuration file %s to configure %s", configFile, getName().c_str()); - return false; - } - - char buf[TRACE_BUFFER_SIZE]; - for (file.getline(buf, TRACE_BUFFER_SIZE); file.good(); file.getline(buf, TRACE_BUFFER_SIZE)) { - std::string key, value; - if (!parseConfigureFileLine(buf, key, value)) { - logger_->log_error("While loading configuration for %s found invalid line: %s", getName().c_str(), buf); - return false; - } - } - return true; -} - -bool Properties::persistProperties() { - std::lock_guard<std::mutex> lock(mutex_); - if (!dirty_) { - logger_->log_info("Attempt to persist, but properties are not updated"); - return true; - } - std::ifstream file(properties_file_, std::ifstream::in); - if (!file.good()) { - logger_->log_error("load configure file failed %s", properties_file_); - return false; - } - - std::map<std::string, std::string> properties_copy = properties_; - - std::string new_file = properties_file_ + ".new"; - - std::ofstream output_file(new_file, std::ios::out); - - char buf[TRACE_BUFFER_SIZE]; - for (file.getline(buf, TRACE_BUFFER_SIZE); file.good(); file.getline(buf, TRACE_BUFFER_SIZE)) { - char *line = buf; - - char first = line[0]; - - if ((first == '\0') || (first == '#') || (first == '[') || (first == '\r') || (first == '\n') || (first == '=')) { - // persist comments and newlines - output_file << line << std::endl; - continue; - } - - - - char *equal = strchr(line, '='); - if (equal == NULL) { - output_file << line << std::endl; - continue; - } - - equal[0] = '\0'; - std::string key = line; - - equal++; - while ((equal[0] == ' ') || (equal[0] == '\t')) - ++equal; - - first = equal[0]; - if ((first == '\0') || (first == '\r') || (first == '\n')) { - output_file << line << std::endl; - continue; - } - - key = org::apache::nifi::minifi::utils::StringUtils::trimRight(key); - std::string value = org::apache::nifi::minifi::utils::StringUtils::trimRight(equal); - auto hasIt = properties_copy.find(key); - if (hasIt != properties_copy.end() && !value.empty()) { - output_file << key << "=" << value << std::endl; - } - properties_copy.erase(key); - } - - for (const auto &kv : properties_copy) { - if (!kv.first.empty() && !kv.second.empty()) - output_file << kv.first << "=" << kv.second << std::endl; - } - output_file.close(); - - if (validateConfigurationFile(new_file)) { - const std::string backup = properties_file_ + ".bak"; - if (!utils::file::FileUtils::copy_file(properties_file_, backup) && !utils::file::FileUtils::copy_file(new_file, properties_file_)) { - logger_->log_info("Persisted %s", properties_file_); - return true; - } else { - logger_->log_error("Could not update %s", properties_file_); - } - } - - dirty_ = false; - - return false; -} - -// Parse Command Line -void Properties::parseCommandLine(int argc, char **argv) { - int i; - bool keyFound = false; - std::string key, value; - - for (i = 1; i < argc; i++) { - if (argv[i][0] == '-' && argv[i][1] != '\0') { - keyFound = true; - key = &argv[i][1]; - continue; - } - if (keyFound) { - value = argv[i]; - set(key, value); - keyFound = false; - } - } - return; -} - -} /* namespace minifi */ -} /* namespace nifi */ -} /* namespace apache */ -} /* namespace org */ diff --git a/libminifi/src/c2/C2Agent.cpp b/libminifi/src/c2/C2Agent.cpp index eb49f3f..8f25c07 100644 --- a/libminifi/src/c2/C2Agent.cpp +++ b/libminifi/src/c2/C2Agent.cpp @@ -707,7 +707,7 @@ utils::TaskRescheduleInfo C2Agent::produce() { logger_->log_error("Exception occurred while performing heartbeat. error: %s", e.what()); } catch (...) { - logger_->log_error("Unknonwn exception occurred while performing heartbeat."); + logger_->log_error("Unknown exception occurred while performing heartbeat."); } } diff --git a/libminifi/src/c2/C2Client.cpp b/libminifi/src/c2/C2Client.cpp index 70dd84d..d24fc2a 100644 --- a/libminifi/src/c2/C2Client.cpp +++ b/libminifi/src/c2/C2Client.cpp @@ -118,12 +118,21 @@ void C2Client::initialize(core::controller::ControllerServiceProvider *controlle loadC2ResponseConfiguration("nifi.c2.root.class.definitions"); if (!initialized_) { + // C2Agent is initialized once, meaning that a C2-triggered flow/configuration update + // might not be equal to a fresh restart c2_agent_ = std::unique_ptr<c2::C2Agent>(new c2::C2Agent(controller, update_sink, configuration_, filesystem_)); c2_agent_->start(); initialized_ = true; } } +utils::optional<std::string> C2Client::fetchFlow(const std::string& uri) const { + if (!c2_agent_) { + return {}; + } + return c2_agent_->fetchFlow(uri); +} + void C2Client::initializeComponentMetrics() { { std::lock_guard<std::mutex> guard(metrics_mutex_); diff --git a/libminifi/src/core/logging/LoggerConfiguration.cpp b/libminifi/src/core/logging/LoggerConfiguration.cpp index 79fe083..a53b669 100644 --- a/libminifi/src/core/logging/LoggerConfiguration.cpp +++ b/libminifi/src/core/logging/LoggerConfiguration.cpp @@ -63,7 +63,7 @@ const char* LoggerConfiguration::spdlog_default_pattern = "[%Y-%m-%d %H:%M:%S.%e std::vector<std::string> LoggerProperties::get_keys_of_type(const std::string &type) { std::vector<std::string> appenders; std::string prefix = type + "."; - for (auto const & entry : properties_) { + for (auto const & entry : getProperties()) { if (entry.first.rfind(prefix, 0) == 0 && entry.first.find(".", prefix.length() + 1) == std::string::npos) { appenders.push_back(entry.first); } diff --git a/libminifi/src/properties/Properties.cpp b/libminifi/src/properties/Properties.cpp new file mode 100644 index 0000000..a062fd0 --- /dev/null +++ b/libminifi/src/properties/Properties.cpp @@ -0,0 +1,150 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "properties/Properties.h" +#include <string> +#include "utils/StringUtils.h" +#include "utils/file/FileUtils.h" +#include "utils/file/PathUtils.h" +#include "core/Core.h" +#include "core/logging/LoggerConfiguration.h" +#include "properties/PropertiesFile.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { + +#define TRACE_BUFFER_SIZE 512 + +Properties::Properties(const std::string& name) + : logger_(logging::LoggerFactory<Properties>::getLogger()), + name_(name) { +} + +// Get the config value +bool Properties::getString(const std::string &key, std::string &value) const { + std::lock_guard<std::mutex> lock(mutex_); + auto it = properties_.find(key); + + if (it != properties_.end()) { + value = it->second.value; + return true; + } else { + return false; + } +} + +utils::optional<std::string> Properties::getString(const std::string& key) const { + std::string result; + const bool found = getString(key, result); + if (found) { + return result; + } else { + return utils::nullopt; + } +} + +int Properties::getInt(const std::string &key, int default_value) const { + std::lock_guard<std::mutex> lock(mutex_); + auto it = properties_.find(key); + + return it != properties_.end() ? std::stoi(it->second.value) : default_value; +} + +// Load Configure File +void Properties::loadConfigureFile(const char *fileName) { + std::lock_guard<std::mutex> lock(mutex_); + if (fileName == nullptr) { + logger_->log_error("Configuration file path for %s is a nullptr!", getName().c_str()); + return; + } + + properties_file_ = utils::file::getFullPath(utils::file::FileUtils::concat_path(getHome(), fileName)); + + logger_->log_info("Using configuration file to load configuration for %s from %s (located at %s)", + getName().c_str(), fileName, properties_file_); + + std::ifstream file(properties_file_, std::ifstream::in); + if (!file.good()) { + logger_->log_error("load configure file failed %s", properties_file_); + return; + } + properties_.clear(); + for (const auto& line : PropertiesFile{file}) { + properties_[line.getKey()] = {utils::StringUtils::replaceEnvironmentVariables(line.getValue()), false}; + } + dirty_ = false; +} + +bool Properties::persistProperties() { + std::lock_guard<std::mutex> lock(mutex_); + if (!dirty_) { + logger_->log_info("Attempt to persist, but properties are not updated"); + return true; + } + std::ifstream file(properties_file_, std::ifstream::in); + if (!file) { + logger_->log_error("load configure file failed %s", properties_file_); + return false; + } + + std::string new_file = properties_file_ + ".new"; + + PropertiesFile current_content{file}; + for (const auto& prop : properties_) { + if (!prop.second.changed) { + continue; + } + if (current_content.hasValue(prop.first)) { + current_content.update(prop.first, prop.second.value); + } else { + current_content.append(prop.first, prop.second.value); + } + } + + try { + current_content.writeTo(new_file); + } catch (const std::exception&) { + logger_->log_error("Could not update %s", properties_file_); + return false; + } + + const std::string backup = properties_file_ + ".bak"; + if (!utils::file::FileUtils::copy_file(properties_file_, backup) && !utils::file::FileUtils::copy_file(new_file, properties_file_)) { + logger_->log_info("Persisted %s", properties_file_); + dirty_ = false; + return true; + } + + logger_->log_error("Could not update %s", properties_file_); + return false; +} + +std::map<std::string, std::string> Properties::getProperties() const { + std::lock_guard<std::mutex> lock(mutex_); + std::map<std::string, std::string> properties; + for (const auto& prop : properties_) { + properties[prop.first] = prop.second.value; + } + return properties; +} + +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ diff --git a/libminifi/src/properties/PropertiesFile.cpp b/libminifi/src/properties/PropertiesFile.cpp new file mode 100644 index 0000000..be69d92 --- /dev/null +++ b/libminifi/src/properties/PropertiesFile.cpp @@ -0,0 +1,158 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "properties/PropertiesFile.h" + +#include <algorithm> +#include <fstream> +#include <utility> +#include "utils/StringUtils.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { + +PropertiesFile::Line::Line(std::string line) : line_(line) { + line = utils::StringUtils::trim(line); + if (line.empty() || line[0] == '#') { return; } + + size_t index_of_first_equals_sign = line.find('='); + if (index_of_first_equals_sign == std::string::npos) { return; } + + std::string key = utils::StringUtils::trim(line.substr(0, index_of_first_equals_sign)); + if (key.empty()) { return; } + + key_ = key; + value_ = utils::StringUtils::trim(line.substr(index_of_first_equals_sign + 1)); +} + +PropertiesFile::Line::Line(std::string key, std::string value) + : line_{utils::StringUtils::join_pack(key, "=", value)}, key_{std::move(key)}, value_{std::move(value)} { +} + +bool PropertiesFile::Line::isValidKey(const std::string &key) { + return !key.empty(); +} + +void PropertiesFile::Line::updateValue(const std::string& value) { + auto pos = line_.find('='); + if (pos != std::string::npos) { + line_.replace(pos + 1, std::string::npos, value); + value_ = value; + } else { + throw std::invalid_argument{"Cannot update value in config line: it does not contain an = sign!"}; + } +} + +PropertiesFile::PropertiesFile(std::istream& input_stream) { + std::string line; + while (std::getline(input_stream, line)) { + lines_.push_back(Line{line}); + } +} + +PropertiesFile::Lines::const_iterator PropertiesFile::findKey(const std::string& key) const { + if (!Line::isValidKey(key)) { + return lines_.cend(); + } + return std::find_if(lines_.cbegin(), lines_.cend(), [&key](const Line& line) { + return line.getKey() == key; + }); +} + +PropertiesFile::Lines::iterator PropertiesFile::findKey(const std::string& key) { + if (!Line::isValidKey(key)) { + return lines_.end(); + } + return std::find_if(lines_.begin(), lines_.end(), [&key](const Line& line) { + return line.getKey() == key; + }); +} + +bool PropertiesFile::hasValue(const std::string& key) const { + return findKey(key) != lines_.end(); +} + +utils::optional<std::string> PropertiesFile::getValue(const std::string& key) const { + const auto it = findKey(key); + if (it != lines_.end()) { + return it->getValue(); + } else { + return utils::nullopt; + } +} + +void PropertiesFile::update(const std::string& key, const std::string& value) { + auto it = findKey(key); + if (it != lines_.end()) { + it->updateValue(value); + } else { + throw std::invalid_argument{"Key " + key + " not found in the config file!"}; + } +} + +void PropertiesFile::insertAfter(const std::string& after_key, const std::string& key, const std::string& value) { + auto it = findKey(after_key); + if (it != lines_.end()) { + ++it; + lines_.emplace(it, key, value); + } else { + throw std::invalid_argument{"Key " + after_key + " not found in the config file!"}; + } +} + +void PropertiesFile::append(const std::string& key, const std::string& value) { + lines_.emplace_back(key, value); +} + +int PropertiesFile::erase(const std::string& key) { + if (!Line::isValidKey(key)) { + return 0; + } + auto has_this_key = [&key](const Line& line) { return line.getKey() == key; }; + auto new_end = std::remove_if(lines_.begin(), lines_.end(), has_this_key); + auto num_removed = std::distance(new_end, lines_.end()); + lines_.erase(new_end, lines_.end()); + return gsl::narrow<int>(num_removed); +} + +void PropertiesFile::writeTo(const std::string& file_path) const { + try { + std::ofstream file{file_path}; + file.exceptions(std::ios::failbit | std::ios::badbit); + + for (const auto& line : lines_) { + file << line.getLine() << '\n'; + } + } catch (const std::exception&) { + throw std::runtime_error{"Could not write to file " + file_path}; + } +} + +PropertiesFile::Lines::const_iterator PropertiesFile::begin() const { + return lines_.begin(); +} + +PropertiesFile::Lines::const_iterator PropertiesFile::end() const { + return lines_.end(); +} + +} // namespace minifi +} // namespace nifi +} // namespace apache +} // namespace org diff --git a/libminifi/test/resources/TestEmpty.yml b/libminifi/test/resources/TestEmpty.yml new file mode 100644 index 0000000..a3e1837 --- /dev/null +++ b/libminifi/test/resources/TestEmpty.yml @@ -0,0 +1,21 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the \"License\"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an \"AS IS\" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +Flow Controller: + name: MiNiFi Flow +Processors: [] +Connections: [] +Remote Processing Groups: [] +Provenance Reporting: