This is an automated email from the ASF dual-hosted git repository.

aboda pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 4faf65c  MINIFICPP-1407 - Fetch flow configuration file from C2 if not 
found locally
4faf65c is described below

commit 4faf65c1fccd7e273a8da4931170f49a314443e6
Author: Adam Debreceni <adam.debrec...@protonmail.com>
AuthorDate: Tue Nov 17 15:09:36 2020 +0100

    MINIFICPP-1407 - Fetch flow configuration file from C2 if not found locally
    
    Signed-off-by: Arpad Boda <ab...@apache.org>
    
    This closes #957
---
 encrypt-config/ConfigFile.cpp                      | 105 ---------
 encrypt-config/ConfigFile.h                        |  44 +---
 encrypt-config/tests/ConfigFileEncryptorTests.cpp  |  12 +-
 encrypt-config/tests/ConfigFileTests.cpp           |   7 +-
 ...UpdateTest.cpp => C2FetchFlowIfMissingTest.cpp} |  24 +-
 extensions/http-curl/tests/C2UpdateAgentTest.cpp   |   2 +-
 extensions/http-curl/tests/C2UpdateTest.cpp        |   2 +-
 extensions/http-curl/tests/CMakeLists.txt          |   1 +
 extensions/http-curl/tests/HTTPHandlers.h          |  47 ++--
 extensions/http-curl/tests/HTTPIntegrationBase.h   |  32 +++
 libminifi/CMakeLists.txt                           |   2 +-
 libminifi/include/FlowController.h                 | 126 +----------
 libminifi/include/c2/C2Agent.h                     |   4 +-
 libminifi/include/c2/C2Client.h                    |   5 +-
 .../core/controller/ControllerServiceProvider.h    |   2 +-
 .../ForwardingControllerServiceProvider.h          | 136 ++++++++++++
 libminifi/include/properties/Properties.h          |  36 ++-
 .../include/properties/PropertiesFile.h            |  68 +++---
 libminifi/include/utils/file/FileUtils.h           |   2 +-
 libminifi/src/FlowController.cpp                   | 207 ++++-------------
 libminifi/src/Properties.cpp                       | 245 ---------------------
 libminifi/src/c2/C2Agent.cpp                       |   2 +-
 libminifi/src/c2/C2Client.cpp                      |   9 +
 libminifi/src/core/logging/LoggerConfiguration.cpp |   2 +-
 libminifi/src/properties/Properties.cpp            | 150 +++++++++++++
 libminifi/src/properties/PropertiesFile.cpp        | 158 +++++++++++++
 libminifi/test/resources/TestEmpty.yml             |  21 ++
 27 files changed, 674 insertions(+), 777 deletions(-)

diff --git a/encrypt-config/ConfigFile.cpp b/encrypt-config/ConfigFile.cpp
index 4d6d0ca..b361d6b 100644
--- a/encrypt-config/ConfigFile.cpp
+++ b/encrypt-config/ConfigFile.cpp
@@ -35,111 +35,6 @@ namespace nifi {
 namespace minifi {
 namespace encrypt_config {
 
-ConfigLine::ConfigLine(std::string line) : line_(line) {
-  line = utils::StringUtils::trim(line);
-  if (line.empty() || line[0] == '#') { return; }
-
-  size_t index_of_first_equals_sign = line.find('=');
-  if (index_of_first_equals_sign == std::string::npos) { return; }
-
-  std::string key = utils::StringUtils::trim(line.substr(0, 
index_of_first_equals_sign));
-  if (key.empty()) { return; }
-
-  key_ = key;
-  value_ = utils::StringUtils::trim(line.substr(index_of_first_equals_sign + 
1));
-}
-
-ConfigLine::ConfigLine(std::string key, std::string value)
-  : line_{utils::StringUtils::join_pack(key, "=", value)}, 
key_{std::move(key)}, value_{std::move(value)} {
-}
-
-void ConfigLine::updateValue(const std::string& value) {
-  auto pos = line_.find('=');
-  if (pos != std::string::npos) {
-    line_.replace(pos + 1, std::string::npos, value);
-    value_ = value;
-  } else {
-    throw std::invalid_argument{"Cannot update value in config line: it does 
not contain an = sign!"};
-  }
-}
-
-ConfigFile::ConfigFile(std::istream& input_stream) {
-  std::string line;
-  while (std::getline(input_stream, line)) {
-    config_lines_.push_back(ConfigLine{line});
-  }
-}
-
-ConfigFile::Lines::const_iterator ConfigFile::findKey(const std::string& key) 
const {
-  return std::find_if(config_lines_.cbegin(), config_lines_.cend(), 
[&key](const ConfigLine& config_line) {
-    return config_line.getKey() == key;
-  });
-}
-
-ConfigFile::Lines::iterator ConfigFile::findKey(const std::string& key) {
-  return std::find_if(config_lines_.begin(), config_lines_.end(), [&key](const 
ConfigLine& config_line) {
-    return config_line.getKey() == key;
-  });
-}
-
-bool ConfigFile::hasValue(const std::string& key) const {
-  const auto it = findKey(key);
-  return (it != config_lines_.end());
-}
-
-utils::optional<std::string> ConfigFile::getValue(const std::string& key) 
const {
-  const auto it = findKey(key);
-  if (it != config_lines_.end()) {
-    return it->getValue();
-  } else {
-    return utils::nullopt;
-  }
-}
-
-void ConfigFile::update(const std::string& key, const std::string& value) {
-  auto it = findKey(key);
-  if (it != config_lines_.end()) {
-    it->updateValue(value);
-  } else {
-    throw std::invalid_argument{"Key " + key + " not found in the config 
file!"};
-  }
-}
-
-void ConfigFile::insertAfter(const std::string& after_key, const std::string& 
key, const std::string& value) {
-  auto it = findKey(after_key);
-  if (it != config_lines_.end()) {
-    ++it;
-    config_lines_.emplace(it, key, value);
-  } else {
-    throw std::invalid_argument{"Key " + after_key + " not found in the config 
file!"};
-  }
-}
-
-void ConfigFile::append(const std::string& key, const std::string& value) {
-  config_lines_.emplace_back(key, value);
-}
-
-int ConfigFile::erase(const std::string& key) {
-  auto has_this_key = [&key](const ConfigLine& line) { return line.getKey() == 
key; };
-  auto new_end = std::remove_if(config_lines_.begin(), config_lines_.end(), 
has_this_key);
-  auto num_removed = std::distance(new_end, config_lines_.end());
-  config_lines_.erase(new_end, config_lines_.end());
-  return gsl::narrow<int>(num_removed);
-}
-
-void ConfigFile::writeTo(const std::string& file_path) const {
-  try {
-    std::ofstream file{file_path};
-    file.exceptions(std::ios::failbit | std::ios::badbit);
-
-    for (const auto& config_line : config_lines_) {
-      file << config_line.getLine() << '\n';
-    }
-  } catch (const std::exception&) {
-    throw std::runtime_error{"Could not write to file " + file_path};
-  }
-}
-
 std::vector<std::string> ConfigFile::getSensitiveProperties() const {
   std::vector<std::string> 
sensitive_properties(DEFAULT_SENSITIVE_PROPERTIES.begin(), 
DEFAULT_SENSITIVE_PROPERTIES.end());
   const utils::optional<std::string> additional_sensitive_props_list = 
getValue(ADDITIONAL_SENSITIVE_PROPS_PROPERTY_NAME);
diff --git a/encrypt-config/ConfigFile.h b/encrypt-config/ConfigFile.h
index 8eba793..61b7a0f 100644
--- a/encrypt-config/ConfigFile.h
+++ b/encrypt-config/ConfigFile.h
@@ -22,6 +22,7 @@
 
 #include "utils/EncryptionUtils.h"
 #include "utils/OptionalUtils.h"
+#include "properties/PropertiesFile.h"
 
 namespace org {
 namespace apache {
@@ -29,55 +30,17 @@ namespace nifi {
 namespace minifi {
 namespace encrypt_config {
 
-class ConfigLine {
+class ConfigFile : public PropertiesFile {
  public:
-  explicit ConfigLine(std::string line);
-  ConfigLine(std::string key, std::string value);
-
-  void updateValue(const std::string& value);
-
-  std::string getLine() const { return line_; }
-  std::string getKey() const { return key_; }
-  std::string getValue() const { return value_; }
-
- private:
-  // NOTE(fgerlits): having both line_ and { key_, value } is redundant in 
many cases, but
-  // * we need the original line_ in order to preserve formatting, comments 
and blank lines
-  // * we could get rid of key_ and value_ and parse them each time from 
line_, but I think the code is clearer this way
-  std::string line_;
-  std::string key_;
-  std::string value_;
-};
-
-class ConfigFile {
- public:
-  explicit ConfigFile(std::istream& input_stream);
-  explicit ConfigFile(std::istream&& input_stream) : ConfigFile{input_stream} 
{}
-
-  bool hasValue(const std::string& key) const;
-  utils::optional<std::string> getValue(const std::string& key) const;
-  void update(const std::string& key, const std::string& value);
-  void insertAfter(const std::string& after_key, const std::string& key, const 
std::string& value);
-  void append(const std::string& key, const std::string& value);
-  int erase(const std::string& key);
-
-  void writeTo(const std::string& file_path) const;
-
-  size_t size() const { return config_lines_.size(); }
+  using PropertiesFile::PropertiesFile;
 
   std::vector<std::string> getSensitiveProperties() const;
 
  private:
   friend class ConfigFileTestAccessor;
   friend bool operator==(const ConfigFile&, const ConfigFile&);
-  using Lines = std::vector<ConfigLine>;
-
-  Lines::const_iterator findKey(const std::string& key) const;
-  Lines::iterator findKey(const std::string& key);
   static std::vector<std::string> mergeProperties(std::vector<std::string> 
properties,
                                                   const 
std::vector<std::string>& additional_properties);
-
-  Lines config_lines_;
 };
 
 }  // namespace encrypt_config
@@ -85,3 +48,4 @@ class ConfigFile {
 }  // namespace nifi
 }  // namespace apache
 }  // namespace org
+
diff --git a/encrypt-config/tests/ConfigFileEncryptorTests.cpp 
b/encrypt-config/tests/ConfigFileEncryptorTests.cpp
index e20791a..8d07abb 100644
--- a/encrypt-config/tests/ConfigFileEncryptorTests.cpp
+++ b/encrypt-config/tests/ConfigFileEncryptorTests.cpp
@@ -46,15 +46,17 @@ namespace org {
 namespace apache {
 namespace nifi {
 namespace minifi {
-namespace encrypt_config {
 
 // NOTE(fgerlits): these ==/!= operators are in the test file on purpose, and 
should not be part of production code,
 // as they take a varying amount of time depending on which character in the 
line differs, so they would open up
 // our code to timing attacks.  If you need == in production code, make sure 
to compare all pairs of chars/lines.
-bool operator==(const ConfigLine& left, const ConfigLine& right) { return 
left.getLine() == right.getLine(); }
-bool operator!=(const ConfigLine& left, const ConfigLine& right) { return 
!(left == right); }
-bool operator==(const ConfigFile& left, const ConfigFile& right) { return 
left.config_lines_ == right.config_lines_; }
-bool operator!=(const ConfigFile& left, const ConfigFile& right) { return 
!(left == right); }
+bool operator==(const PropertiesFile::Line& left, const PropertiesFile::Line& 
right) { return left.getLine() == right.getLine(); }
+
+namespace encrypt_config {
+
+bool operator!=(const ConfigFile::Line& left, const ConfigFile::Line& right) { 
return !(left == right); }
+bool operator==(const  ConfigFile& left, const  ConfigFile& right) { return 
left.lines_ == right.lines_; }
+bool operator!=(const  ConfigFile& left, const  ConfigFile& right) { return 
!(left == right); }
 
 }  // namespace encrypt_config
 }  // namespace minifi
diff --git a/encrypt-config/tests/ConfigFileTests.cpp 
b/encrypt-config/tests/ConfigFileTests.cpp
index 36cc214..04e7a94 100644
--- a/encrypt-config/tests/ConfigFileTests.cpp
+++ b/encrypt-config/tests/ConfigFileTests.cpp
@@ -43,11 +43,10 @@ class ConfigFileTestAccessor {
 
 using org::apache::nifi::minifi::encrypt_config::ConfigFile;
 using org::apache::nifi::minifi::encrypt_config::ConfigFileTestAccessor;
-using org::apache::nifi::minifi::encrypt_config::ConfigLine;
 
 TEST_CASE("ConfigLine can be constructed from a line", 
"[encrypt-config][constructor]") {
   auto line_is_parsed_correctly = [](const std::string& line, const 
std::string& expected_key, const std::string& expected_value) {
-    ConfigLine config_line{line};
+    ConfigFile::Line config_line{line};
     return config_line.getKey() == expected_key && config_line.getValue() == 
expected_value;
   };
 
@@ -69,7 +68,7 @@ TEST_CASE("ConfigLine can be constructed from a line", 
"[encrypt-config][constru
 
 TEST_CASE("ConfigLine can be constructed from a key-value pair", 
"[encrypt-config][constructor]") {
   auto can_construct_from_kv = [](const std::string& key, const std::string& 
value, const std::string& expected_line) {
-    ConfigLine config_line{key, value};
+    ConfigFile::Line config_line{key, value};
     return config_line.getLine() == expected_line;
   };
 
@@ -79,7 +78,7 @@ TEST_CASE("ConfigLine can be constructed from a key-value 
pair", "[encrypt-confi
 
 TEST_CASE("ConfigLine can update the value", "[encrypt-config][updateValue]") {
   auto can_update_value = [](const std::string& original_line, const 
std::string& new_value, const std::string& expected_line) {
-    ConfigLine config_line{original_line};
+    ConfigFile::Line config_line{original_line};
     config_line.updateValue(new_value);
     return config_line.getLine() == expected_line;
   };
diff --git a/extensions/http-curl/tests/C2UpdateTest.cpp 
b/extensions/http-curl/tests/C2FetchFlowIfMissingTest.cpp
similarity index 65%
copy from extensions/http-curl/tests/C2UpdateTest.cpp
copy to extensions/http-curl/tests/C2FetchFlowIfMissingTest.cpp
index 24d4693..12133cc 100644
--- a/extensions/http-curl/tests/C2UpdateTest.cpp
+++ b/extensions/http-curl/tests/C2FetchFlowIfMissingTest.cpp
@@ -19,23 +19,27 @@
 #undef NDEBUG
 #include "HTTPIntegrationBase.h"
 #include "HTTPHandlers.h"
-#include "utils/gsl.h"
 #include "utils/IntegrationTestUtils.h"
+#include "utils/file/PathUtils.h"
 
 int main(int argc, char **argv) {
-  const cmd_args args = parse_cmdline_args(argc, argv, "update");
-  C2UpdateHandler handler(args.test_file);
-  VerifyC2Update harness(10000);
+  TestController controller;
+  char format[] = "/var/tmp/c2.XXXXXX";
+  std::string minifi_home = controller.createTempDirectory(format);
+  const cmd_args args = parse_cmdline_args(argc, argv);
+  C2FlowProvider handler(args.test_file);
+  VerifyFlowFetched harness(10000);
   harness.setKeyDir(args.key_dir);
   harness.setUrl(args.url, &handler);
-  handler.setC2RestResponse(harness.getC2RestUrl(), "configuration");
+  harness.setFlowUrl(harness.getC2RestUrl());
 
-  const auto start = std::chrono::system_clock::now();
+  std::string config_path = utils::file::PathUtils::concat_path(minifi_home, 
"config.yml");
 
-  harness.run(args.test_file);
+  harness.run(config_path);
+
+  // check existence of the config file
+  assert(std::ifstream{config_path});
 
-  const auto then = std::chrono::system_clock::now();
-  const auto seconds = std::chrono::duration_cast<std::chrono::seconds>(then - 
start).count();
-  assert(handler.calls_ <= gsl::narrow<size_t>(seconds + 1));
   return 0;
 }
+
diff --git a/extensions/http-curl/tests/C2UpdateAgentTest.cpp 
b/extensions/http-curl/tests/C2UpdateAgentTest.cpp
index 5e61f7e..8c91363 100644
--- a/extensions/http-curl/tests/C2UpdateAgentTest.cpp
+++ b/extensions/http-curl/tests/C2UpdateAgentTest.cpp
@@ -35,6 +35,6 @@ int main(int argc, char **argv) {
 
   const auto then = std::chrono::system_clock::now();
   const auto seconds = std::chrono::duration_cast<std::chrono::seconds>(then - 
start).count();
-  assert(handler.calls_ <= gsl::narrow<size_t>(seconds + 2));
+  assert(handler.getCallCount() <= gsl::narrow<size_t>(seconds + 2));
   return 0;
 }
diff --git a/extensions/http-curl/tests/C2UpdateTest.cpp 
b/extensions/http-curl/tests/C2UpdateTest.cpp
index 24d4693..c63dfc3 100644
--- a/extensions/http-curl/tests/C2UpdateTest.cpp
+++ b/extensions/http-curl/tests/C2UpdateTest.cpp
@@ -36,6 +36,6 @@ int main(int argc, char **argv) {
 
   const auto then = std::chrono::system_clock::now();
   const auto seconds = std::chrono::duration_cast<std::chrono::seconds>(then - 
start).count();
-  assert(handler.calls_ <= gsl::narrow<size_t>(seconds + 1));
+  assert(handler.getCallCount() <= gsl::narrow<size_t>(seconds + 1));
   return 0;
 }
diff --git a/extensions/http-curl/tests/CMakeLists.txt 
b/extensions/http-curl/tests/CMakeLists.txt
index 04a9627..e6ff766 100644
--- a/extensions/http-curl/tests/CMakeLists.txt
+++ b/extensions/http-curl/tests/CMakeLists.txt
@@ -71,6 +71,7 @@ add_test(NAME HTTPStreamingCallbackTests COMMAND 
"HTTPStreamingCallbackTests" WO
 
 add_test(NAME HttpGetIntegrationTest COMMAND HttpGetIntegrationTest 
"${TEST_RESOURCES}/TestHTTPGet.yml"  "${TEST_RESOURCES}/")
 add_test(NAME C2UpdateTest COMMAND C2UpdateTest 
"${TEST_RESOURCES}/TestHTTPGet.yml"  "${TEST_RESOURCES}/")
+add_test(NAME C2FetchFlowIfMissingTest COMMAND C2FetchFlowIfMissingTest 
"${TEST_RESOURCES}/TestEmpty.yml"  "${TEST_RESOURCES}/")
 add_test(NAME C2ConfigEncryption COMMAND C2ConfigEncryption 
"${TEST_RESOURCES}/decrypted.config.yml"  "${TEST_RESOURCES}/")
 add_test(NAME C2JstackTest COMMAND C2JstackTest 
"${TEST_RESOURCES}/TestHTTPGet.yml"  "${TEST_RESOURCES}/")
 add_test(NAME C2DescribeManifestTest COMMAND C2DescribeManifestTest 
"${TEST_RESOURCES}/TestHTTPGet.yml"  "${TEST_RESOURCES}/")
diff --git a/extensions/http-curl/tests/HTTPHandlers.h 
b/extensions/http-curl/tests/HTTPHandlers.h
index 4e7d62b..6867c77 100644
--- a/extensions/http-curl/tests/HTTPHandlers.h
+++ b/extensions/http-curl/tests/HTTPHandlers.h
@@ -448,20 +448,20 @@ class HeartbeatHandler : public ServerAwareHandler {
   }
 };
 
-class C2UpdateHandler : public ServerAwareHandler {
+class C2FlowProvider : public ServerAwareHandler {
  public:
-  explicit C2UpdateHandler(const std::string& test_file_location)
-    : test_file_location_(test_file_location) {
+  explicit C2FlowProvider(std::string test_file_location)
+      : test_file_location_(std::move(test_file_location)) {
   }
 
-  bool handlePost(CivetServer* /*server*/, struct mg_connection *conn) 
override {
-    calls_++;
-    if (!response_.empty()) {
+  bool handleGet(CivetServer* /*server*/, struct mg_connection *conn) override 
{
+    std::ifstream myfile(test_file_location_.c_str(), std::ios::in | 
std::ios::binary);
+    if (myfile.good()) {
+      std::string str((std::istreambuf_iterator<char>(myfile)), 
(std::istreambuf_iterator<char>()));
       mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
-                "text/plain\r\nContent-Length: %lu\r\nConnection: 
close\r\n\r\n",
-                response_.length());
-      mg_printf(conn, "%s", response_.c_str());
-      response_.clear();
+                      "text/plain\r\nContent-Length: %lu\r\nConnection: 
close\r\n\r\n",
+                str.length());
+      mg_printf(conn, "%s", str.c_str());
     } else {
       mg_printf(conn, "HTTP/1.1 500 Internal Server Error\r\n");
     }
@@ -469,14 +469,22 @@ class C2UpdateHandler : public ServerAwareHandler {
     return true;
   }
 
-  bool handleGet(CivetServer* /*server*/, struct mg_connection *conn) override 
{
-    std::ifstream myfile(test_file_location_.c_str(), std::ios::in | 
std::ios::binary);
-    if (myfile.good()) {
-      std::string str((std::istreambuf_iterator<char>(myfile)), 
(std::istreambuf_iterator<char>()));
+ private:
+  const std::string test_file_location_;
+};
+
+class C2UpdateHandler : public C2FlowProvider {
+ public:
+  using C2FlowProvider::C2FlowProvider;
+
+  bool handlePost(CivetServer* /*server*/, struct mg_connection *conn) 
override {
+    calls_++;
+    if (!response_.empty()) {
       mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
                 "text/plain\r\nContent-Length: %lu\r\nConnection: 
close\r\n\r\n",
-                str.length());
-      mg_printf(conn, "%s", str.c_str());
+                response_.length());
+      mg_printf(conn, "%s", response_.c_str());
+      response_.clear();
     } else {
       mg_printf(conn, "HTTP/1.1 500 Internal Server Error\r\n");
     }
@@ -499,9 +507,14 @@ class C2UpdateHandler : public ServerAwareHandler {
             "\"content\": " + content + "}]}";
   }
 
+  size_t getCallCount() const {
+    return calls_;
+  }
+
+ protected:
   std::atomic<size_t> calls_{0};
+
  private:
-  std::string test_file_location_;
   std::string response_;
 };
 
diff --git a/extensions/http-curl/tests/HTTPIntegrationBase.h 
b/extensions/http-curl/tests/HTTPIntegrationBase.h
index 29e5672..0c01f04 100644
--- a/extensions/http-curl/tests/HTTPIntegrationBase.h
+++ b/extensions/http-curl/tests/HTTPIntegrationBase.h
@@ -189,6 +189,38 @@ public:
   }
 };
 
+class VerifyFlowFetched : public HTTPIntegrationBase {
+ public:
+  using HTTPIntegrationBase::HTTPIntegrationBase;
+
+  void testSetup() override {
+    LogTestController::getInstance().setInfo<minifi::FlowController>();
+    LogTestController::getInstance().setDebug<minifi::utils::HTTPClient>();
+    LogTestController::getInstance().setDebug<minifi::c2::RESTSender>();
+    LogTestController::getInstance().setDebug<minifi::c2::C2Agent>();
+  }
+
+  void configureC2() override {
+    configuration->set("nifi.c2.agent.protocol.class", "RESTSender");
+    configuration->set("nifi.c2.enable", "true");
+    configuration->set("nifi.c2.agent.class", "test");
+    configuration->set("nifi.c2.agent.heartbeat.period", "1000");
+  }
+
+  void setFlowUrl(const std::string& url) {
+    configuration->set(minifi::Configure::nifi_c2_flow_url, url);
+  }
+
+  void cleanup() override {
+    LogTestController::getInstance().reset();
+  }
+
+  void runAssertions() override {
+    using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
+    assert(verifyLogLinePresenceInPollTime(std::chrono::seconds(10), 
"Successfully fetched valid flow configuration"));
+  }
+};
+
 class VerifyC2UpdateAgent : public VerifyC2Update {
  public:
   explicit VerifyC2UpdateAgent(uint64_t waitTime)
diff --git a/libminifi/CMakeLists.txt b/libminifi/CMakeLists.txt
index 9366ed6..c7140ca 100644
--- a/libminifi/CMakeLists.txt
+++ b/libminifi/CMakeLists.txt
@@ -89,7 +89,7 @@ if (NOT OPENSSL_OFF)
        set(TLS_SOURCES "src/utils/tls/*.cpp" "src/io/tls/*.cpp")
 endif()
 
-file(GLOB SOURCES  "src/utils/file/*.cpp" "src/sitetosite/*.cpp"  
"src/core/logging/*.cpp"  "src/core/state/*.cpp" "src/core/state/nodes/*.cpp" 
"src/c2/protocols/*.cpp" "src/c2/triggers/*.cpp" "src/c2/*.cpp" "src/io/*.cpp" 
${SOCKET_SOURCES} ${TLS_SOURCES} "src/core/controller/*.cpp" 
"src/controllers/*.cpp" "src/controllers/keyvalue/*.cpp" "src/core/*.cpp"  
"src/core/repository/*.cpp" "src/core/yaml/*.cpp" "src/core/reporting/*.cpp" 
"src/serialization/*.cpp" "src/provenance/*.cpp" "src/ut [...]
+file(GLOB SOURCES "src/properties/*.cpp" "src/utils/file/*.cpp" 
"src/sitetosite/*.cpp"  "src/core/logging/*.cpp"  "src/core/state/*.cpp" 
"src/core/state/nodes/*.cpp" "src/c2/protocols/*.cpp" "src/c2/triggers/*.cpp" 
"src/c2/*.cpp" "src/io/*.cpp" ${SOCKET_SOURCES} ${TLS_SOURCES} 
"src/core/controller/*.cpp" "src/controllers/*.cpp" 
"src/controllers/keyvalue/*.cpp" "src/core/*.cpp"  "src/core/repository/*.cpp" 
"src/core/yaml/*.cpp" "src/core/reporting/*.cpp" "src/serialization/*.cpp" 
"src/pro [...]
 # manually add this as it might not yet be present when this executes
 list(APPEND SOURCES "src/agent/agent_version.cpp")
 
diff --git a/libminifi/include/FlowController.h 
b/libminifi/include/FlowController.h
index 6c4a00e..9ed88a3 100644
--- a/libminifi/include/FlowController.h
+++ b/libminifi/include/FlowController.h
@@ -35,6 +35,7 @@
 #include "Connection.h"
 #include "core/controller/ControllerServiceNode.h"
 #include "core/controller/ControllerServiceProvider.h"
+#include "core/controller/ForwardingControllerServiceProvider.h"
 #include "core/FlowConfiguration.h"
 #include "core/logging/Logger.h"
 #include "core/ProcessContext.h"
@@ -68,7 +69,7 @@ namespace minifi {
  * Flow Controller class. Generally used by FlowController factory
  * as a singleton.
  */
-class FlowController : public core::controller::ControllerServiceProvider,  
public state::StateMonitor, public c2::C2Client, public 
std::enable_shared_from_this<FlowController> {
+class FlowController : public 
core::controller::ForwardingControllerServiceProvider,  public 
state::StateMonitor, public c2::C2Client, public 
std::enable_shared_from_this<FlowController> {
  public:
   FlowController(std::shared_ptr<core::Repository> provenance_repo, 
std::shared_ptr<core::Repository> flow_file_repo,
                  std::shared_ptr<Configure> configure, 
std::unique_ptr<core::FlowConfiguration> flow_configuration,
@@ -182,119 +183,6 @@ class FlowController : public 
core::controller::ControllerServiceProvider,  publ
   }
 
   /**
-   * Creates a controller service through the controller service provider impl.
-   * @param type class name
-   * @param id service identifier
-   * @param firstTimeAdded first time this CS was added
-   */
-  std::shared_ptr<core::controller::ControllerServiceNode> 
createControllerService(const std::string &type, const std::string &fullType, 
const std::string &id, bool firstTimeAdded) override;
-
-  /**
-   * controller service provider
-   */
-  /**
-   * removes controller service
-   * @param serviceNode service node to be removed.
-   */
-
-  void removeControllerService(const 
std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) override;
-
-  /**
-   * Enables the controller service services
-   * @param serviceNode service node which will be disabled, along with linked 
services.
-   */
-  std::future<utils::TaskRescheduleInfo> 
enableControllerService(std::shared_ptr<core::controller::ControllerServiceNode>
 &serviceNode) override;
-
-  /**
-   * Enables controller services
-   * @param serviceNoden vector of service nodes which will be enabled, along 
with linked services.
-   */
-  void 
enableControllerServices(std::vector<std::shared_ptr<core::controller::ControllerServiceNode>>
 serviceNodes) override;
-
-  /**
-   * Disables controller services
-   * @param serviceNode service node which will be disabled, along with linked 
services.
-   */
-  std::future<utils::TaskRescheduleInfo> 
disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode>
 &serviceNode) override;
-
-  /**
-   * Removes all controller services.
-   */
-  void clearControllerServices() override;
-
-  /**
-   * Gets all controller services.
-   */
-  std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> 
getAllControllerServices() override;
-
-  std::shared_ptr<core::controller::ControllerService> 
getControllerService(const std::string &identifier) override;
-
-  /**
-   * Gets controller service node specified by <code>id</code>
-   * @param id service identifier
-   * @return shared pointer to the controller service node or nullptr if it 
does not exist.
-   */
-  std::shared_ptr<core::controller::ControllerServiceNode> 
getControllerServiceNode(const std::string &id) const override;
-
-  void 
verifyCanStopReferencingComponents(std::shared_ptr<core::controller::ControllerServiceNode>
 &serviceNode) override;
-
-  /**
-   * Unschedules referencing components.
-   */
-  std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> 
unscheduleReferencingComponents(std::shared_ptr<core::controller::ControllerServiceNode>
 &serviceNode) override;
-
-  /**
-   * Verify can disable referencing components
-   * @param serviceNode service node whose referenced components will be 
scheduled.
-   */
-  void 
verifyCanDisableReferencingServices(std::shared_ptr<core::controller::ControllerServiceNode>
 &serviceNode) override;
-
-  /**
-   * Disables referencing components
-   * @param serviceNode service node whose referenced components will be 
scheduled.
-   */
-  std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> 
disableReferencingServices(std::shared_ptr<core::controller::ControllerServiceNode>
 &serviceNode) override;
-
-  /**
-   * Verify can enable referencing components
-   * @param serviceNode service node whose referenced components will be 
scheduled.
-   */
-  void 
verifyCanEnableReferencingServices(std::shared_ptr<core::controller::ControllerServiceNode>&)
 override;
-
-  /**
-   * Determines if the controller service specified by identifier is enabled.
-   */
-  bool isControllerServiceEnabled(const std::string &identifier) override;
-
-  /**
-   * Enables referencing components
-   * @param serviceNode service node whose referenced components will be 
scheduled.
-   */
-  std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> 
enableReferencingServices(std::shared_ptr<core::controller::ControllerServiceNode>
 &serviceNode) override;
-
-  /**
-   * Schedules referencing components
-   * @param serviceNode service node whose referenced components will be 
scheduled.
-   */
-  std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> 
scheduleReferencingComponents(std::shared_ptr<core::controller::ControllerServiceNode>
 &serviceNode) override;
-
-  /**
-   * Returns controller service components referenced by serviceIdentifier 
from the embedded
-   * controller service provider;
-   */
-  std::shared_ptr<core::controller::ControllerService> 
getControllerServiceForComponent(const std::string &serviceIdentifier, const 
utils::Identifier &componentId) override;
-
-  /**
-   * Enables all controller services for the provider.
-   */
-  void enableAllControllerServices() override;
-
-  /**
-   * Disables all controller services for the provider.
-   */
-  void disableAllControllerServices() override;
-
-  /**
    * Retrieves the agent manifest to be sent as a response to C2 DESCRIBE 
manifest
    * @return the agent manifest response node
    */
@@ -304,6 +192,14 @@ class FlowController : public 
core::controller::ControllerServiceProvider,  publ
 
   std::vector<BackTrace> getTraces() override;
 
+ private:
+  /**
+   * Loads the flow as specified in the flow config file or if not present
+   * tries to fetch it from the C2 server (if enabled).
+   * @return the built flow
+   */
+  std::unique_ptr<core::ProcessGroup> loadInitialFlow();
+
  protected:
   // function to load the flow file repo.
   void loadFlowRepo();
@@ -339,8 +235,6 @@ class FlowController : public 
core::controller::ControllerServiceProvider,  publ
   std::shared_ptr<CronDrivenSchedulingAgent> cron_scheduler_;
   // FlowControl Protocol
   std::unique_ptr<FlowControlProtocol> protocol_;
-  std::shared_ptr<core::controller::ControllerServiceMap> 
controller_service_map_;
-  std::shared_ptr<core::controller::ControllerServiceProvider> 
controller_service_provider_;
   // metrics information
   std::chrono::steady_clock::time_point start_time_;
 
diff --git a/libminifi/include/c2/C2Agent.h b/libminifi/include/c2/C2Agent.h
index 4e61300..9cc6c28 100644
--- a/libminifi/include/c2/C2Agent.h
+++ b/libminifi/include/c2/C2Agent.h
@@ -87,6 +87,8 @@ class C2Agent : public state::UpdateController {
     return heart_beat_period_;
   }
 
+  utils::optional<std::string> fetchFlow(const std::string& uri) const;
+
  protected:
   void restart_agent();
 
@@ -161,8 +163,6 @@ class C2Agent : public state::UpdateController {
 
   bool handleConfigurationUpdate(const C2ContentResponse &resp);
 
-  utils::optional<std::string> fetchFlow(const std::string& uri) const;
-
  protected:
   std::timed_mutex metrics_mutex_;
   std::map<std::string, std::shared_ptr<state::response::ResponseNode>> 
metrics_map_;
diff --git a/libminifi/include/c2/C2Client.h b/libminifi/include/c2/C2Client.h
index 7f81761..982d6cf 100644
--- a/libminifi/include/c2/C2Client.h
+++ b/libminifi/include/c2/C2Client.h
@@ -23,7 +23,7 @@
 #include <map>
 #include <string>
 #include <mutex>
-#include "core/state/UpdateController.h"
+#include "c2/C2Agent.h"
 #include "core/controller/ControllerServiceProvider.h"
 #include "properties/Configure.h"
 #include "core/logging/Logger.h"
@@ -58,6 +58,7 @@ class C2Client : public core::Flow, public 
state::response::NodeReporter {
 
  protected:
   bool isC2Enabled() const;
+  utils::optional<std::string> fetchFlow(const std::string& uri) const;
 
  private:
   void initializeComponentMetrics();
@@ -67,9 +68,9 @@ class C2Client : public core::Flow, public 
state::response::NodeReporter {
  protected:
   std::shared_ptr<Configure> configuration_;
   std::shared_ptr<utils::file::FileSystem> filesystem_;
-  std::unique_ptr<state::UpdateController> c2_agent_;
 
  private:
+  std::unique_ptr<C2Agent> c2_agent_;
   std::atomic_bool initialized_{false};
   std::shared_ptr<logging::Logger> logger_;
 
diff --git a/libminifi/include/core/controller/ControllerServiceProvider.h 
b/libminifi/include/core/controller/ControllerServiceProvider.h
index 3c99265..24861eb 100644
--- a/libminifi/include/core/controller/ControllerServiceProvider.h
+++ b/libminifi/include/core/controller/ControllerServiceProvider.h
@@ -225,7 +225,7 @@ class ControllerServiceProvider : public CoreComponent, 
public ConfigurableCompo
 
   virtual void disableAllControllerServices() = 0;
 
-  virtual bool supportsDynamicProperties() {
+  bool supportsDynamicProperties() final {
     return false;
   }
 
diff --git 
a/libminifi/include/core/controller/ForwardingControllerServiceProvider.h 
b/libminifi/include/core/controller/ForwardingControllerServiceProvider.h
new file mode 100644
index 0000000..9aa4114
--- /dev/null
+++ b/libminifi/include/core/controller/ForwardingControllerServiceProvider.h
@@ -0,0 +1,136 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <vector>
+#include <string>
+
+#include "ControllerServiceProvider.h"
+#include "ControllerServiceNode.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+namespace controller {
+
+class ForwardingControllerServiceProvider : public ControllerServiceProvider {
+ public:
+  using ControllerServiceProvider::ControllerServiceProvider;
+
+  std::shared_ptr<ControllerServiceNode> createControllerService(const 
std::string &type, const std::string &longType, const std::string &id, bool 
firstTimeAdded) override {
+    return controller_service_provider_impl_->createControllerService(type, 
longType, id, firstTimeAdded);
+  }
+
+  std::shared_ptr<ControllerServiceNode> getControllerServiceNode(const 
std::string &id) const override {
+    return controller_service_provider_impl_->getControllerServiceNode(id);
+  }
+
+  void removeControllerService(const std::shared_ptr<ControllerServiceNode> 
&serviceNode) override {
+    return 
controller_service_provider_impl_->removeControllerService(serviceNode);
+  }
+
+  std::future<utils::TaskRescheduleInfo> 
enableControllerService(std::shared_ptr<ControllerServiceNode> &serviceNode) 
override {
+    return 
controller_service_provider_impl_->enableControllerService(serviceNode);
+  }
+
+  void 
enableControllerServices(std::vector<std::shared_ptr<core::controller::ControllerServiceNode>>
 serviceNodes) override {
+    return 
controller_service_provider_impl_->enableControllerServices(serviceNodes);
+  }
+
+  std::future<utils::TaskRescheduleInfo> 
disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode>
 &serviceNode) override {
+    return 
controller_service_provider_impl_->disableControllerService(serviceNode);
+  }
+
+  void clearControllerServices() override {
+    return controller_service_provider_impl_->clearControllerServices();
+  }
+
+  std::shared_ptr<ControllerService> getControllerService(const std::string 
&identifier) override {
+    return controller_service_provider_impl_->getControllerService(identifier);
+  }
+
+  std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> 
getAllControllerServices() override {
+    return controller_service_provider_impl_->getAllControllerServices();
+  }
+
+  std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> 
unscheduleReferencingComponents(std::shared_ptr<core::controller::ControllerServiceNode>
 &serviceNode) override {
+    return 
controller_service_provider_impl_->unscheduleReferencingComponents(serviceNode);
+  }
+
+  void 
verifyCanEnableReferencingServices(std::shared_ptr<core::controller::ControllerServiceNode>
 &serviceNode) override {
+    return 
controller_service_provider_impl_->verifyCanEnableReferencingServices(serviceNode);
+  }
+
+  void 
verifyCanDisableReferencingServices(std::shared_ptr<core::controller::ControllerServiceNode>
 &serviceNode) override {
+    return 
controller_service_provider_impl_->verifyCanDisableReferencingServices(serviceNode);
+  }
+
+  void 
verifyCanStopReferencingComponents(std::shared_ptr<core::controller::ControllerServiceNode>
 &serviceNode) override {
+    return 
controller_service_provider_impl_->verifyCanStopReferencingComponents(serviceNode);
+  }
+
+  std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> 
disableReferencingServices(std::shared_ptr<core::controller::ControllerServiceNode>
 &serviceNode) override {
+    return 
controller_service_provider_impl_->disableReferencingServices(serviceNode);
+  }
+
+  std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> 
enableReferencingServices(std::shared_ptr<core::controller::ControllerServiceNode>
 &serviceNode) override {
+    return 
controller_service_provider_impl_->enableReferencingServices(serviceNode);
+  }
+
+  std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> 
scheduleReferencingComponents(std::shared_ptr<core::controller::ControllerServiceNode>
 &serviceNode) override {
+    return 
controller_service_provider_impl_->scheduleReferencingComponents(serviceNode);
+  }
+
+  std::shared_ptr<ControllerService> getControllerServiceForComponent(const 
std::string &serviceIdentifier, const utils::Identifier &componentId) override {
+    return 
controller_service_provider_impl_->getControllerServiceForComponent(serviceIdentifier,
 componentId);
+  }
+
+  bool isControllerServiceEnabled(const std::string &identifier) override {
+    return 
controller_service_provider_impl_->isControllerServiceEnabled(identifier);
+  }
+
+  bool isControllerServiceEnabling(const std::string &identifier) override {
+    return 
controller_service_provider_impl_->isControllerServiceEnabling(identifier);
+  }
+
+  const std::string getControllerServiceName(const std::string &identifier) 
override {
+    return 
controller_service_provider_impl_->getControllerServiceName(identifier);
+  }
+
+  void enableAllControllerServices() override {
+    return controller_service_provider_impl_->enableAllControllerServices();
+  }
+
+  void disableAllControllerServices() override {
+    return controller_service_provider_impl_->disableAllControllerServices();
+  }
+
+ protected:
+  std::shared_ptr<ControllerServiceProvider> controller_service_provider_impl_;
+};
+
+}  // namespace controller
+}  // namespace core
+}  // namespace minifi
+}  // namespace nifi
+}  // namespace apache
+}  // namespace org
diff --git a/libminifi/include/properties/Properties.h 
b/libminifi/include/properties/Properties.h
index 33f8a3a..1b3cc94 100644
--- a/libminifi/include/properties/Properties.h
+++ b/libminifi/include/properties/Properties.h
@@ -24,6 +24,7 @@
 #include <vector>
 #include <string>
 #include <map>
+#include <utility>
 
 #include "core/logging/Logger.h"
 #include "utils/OptionalUtils.h"
@@ -34,6 +35,11 @@ namespace nifi {
 namespace minifi {
 
 class Properties {
+  struct PropertyValue {
+    std::string value;
+    bool changed;
+  };
+
  public:
   Properties(const std::string& name = ""); // NOLINT
 
@@ -51,11 +57,11 @@ class Properties {
   // Set the config value
   void set(const std::string &key, const std::string &value) {
     std::lock_guard<std::mutex> lock(mutex_);
-    properties_[key] = value;
+    properties_[key] = PropertyValue{value, true};
     dirty_ = true;
   }
   // Check whether the config value existed
-  bool has(std::string key) const {
+  bool has(const std::string& key) const {
     std::lock_guard<std::mutex> lock(mutex_);
     return properties_.count(key) > 0;
   }
@@ -79,18 +85,7 @@ class Properties {
    * @param key key to look up
    * @returns the value if found, nullopt otherwise.
    */
-  utils::optional<std::string> getString(const std::string& key) const {
-    std::string result;
-    const bool found = getString(key, result);
-    if (found) {
-      return result;
-    } else {
-      return utils::nullopt;
-    }
-  }
-
-  // Parse one line in configure file like key=value
-  bool parseConfigureFileLine(char *buf, std::string &prop_key, std::string 
&prop_value);
+  utils::optional<std::string> getString(const std::string& key) const;
 
   /**
    * Load configure file
@@ -100,7 +95,7 @@ class Properties {
 
   // Set the determined MINIFI_HOME
   void setHome(std::string minifiHome) {
-    minifi_home_ = minifiHome;
+    minifi_home_ = std::move(minifiHome);
   }
 
   std::vector<std::string> getConfiguredKeys() const {
@@ -115,19 +110,16 @@ class Properties {
   std::string getHome() const {
     return minifi_home_;
   }
-  // Parse Command Line
-  void parseCommandLine(int argc, char **argv);
 
   bool persistProperties();
 
  protected:
-  bool validateConfigurationFile(const std::string &file);
-
-  std::map<std::string, std::string> properties_;
-
+  std::map<std::string, std::string> getProperties() const;
 
  private:
-  std::atomic<bool> dirty_;
+  std::map<std::string, PropertyValue> properties_;
+
+  bool dirty_{false};
 
   std::string properties_file_;
 
diff --git a/encrypt-config/ConfigFile.h 
b/libminifi/include/properties/PropertiesFile.h
similarity index 55%
copy from encrypt-config/ConfigFile.h
copy to libminifi/include/properties/PropertiesFile.h
index 8eba793..721d697 100644
--- a/encrypt-config/ConfigFile.h
+++ b/libminifi/include/properties/PropertiesFile.h
@@ -27,32 +27,34 @@ namespace org {
 namespace apache {
 namespace nifi {
 namespace minifi {
-namespace encrypt_config {
 
-class ConfigLine {
+class PropertiesFile {
  public:
-  explicit ConfigLine(std::string line);
-  ConfigLine(std::string key, std::string value);
-
-  void updateValue(const std::string& value);
-
-  std::string getLine() const { return line_; }
-  std::string getKey() const { return key_; }
-  std::string getValue() const { return value_; }
-
- private:
-  // NOTE(fgerlits): having both line_ and { key_, value } is redundant in 
many cases, but
-  // * we need the original line_ in order to preserve formatting, comments 
and blank lines
-  // * we could get rid of key_ and value_ and parse them each time from 
line_, but I think the code is clearer this way
-  std::string line_;
-  std::string key_;
-  std::string value_;
-};
+  class Line {
+   public:
+    explicit Line(std::string line);
+    Line(std::string key, std::string value);
 
-class ConfigFile {
- public:
-  explicit ConfigFile(std::istream& input_stream);
-  explicit ConfigFile(std::istream&& input_stream) : ConfigFile{input_stream} 
{}
+    void updateValue(const std::string& value);
+
+    std::string getLine() const { return line_; }
+    std::string getKey() const { return key_; }
+    std::string getValue() const { return value_; }
+
+    static bool isValidKey(const std::string& key);
+
+   private:
+    friend bool operator==(const Line&, const Line&);
+    // NOTE(fgerlits): having both line_ and { key_, value } is redundant in 
many cases, but
+    // * we need the original line_ in order to preserve formatting, comments 
and blank lines
+    // * we could get rid of key_ and value_ and parse them each time from 
line_, but I think the code is clearer this way
+    std::string line_;
+    std::string key_;
+    std::string value_;
+  };
+
+  explicit PropertiesFile(std::istream& input_stream);
+  explicit PropertiesFile(std::istream&& input_stream) : 
PropertiesFile{input_stream} {}
 
   bool hasValue(const std::string& key) const;
   utils::optional<std::string> getValue(const std::string& key) const;
@@ -63,24 +65,22 @@ class ConfigFile {
 
   void writeTo(const std::string& file_path) const;
 
-  size_t size() const { return config_lines_.size(); }
-
-  std::vector<std::string> getSensitiveProperties() const;
+  size_t size() const { return lines_.size(); }
 
- private:
-  friend class ConfigFileTestAccessor;
-  friend bool operator==(const ConfigFile&, const ConfigFile&);
-  using Lines = std::vector<ConfigLine>;
+ protected:
+  using Lines = std::vector<Line>;
 
   Lines::const_iterator findKey(const std::string& key) const;
   Lines::iterator findKey(const std::string& key);
-  static std::vector<std::string> mergeProperties(std::vector<std::string> 
properties,
-                                                  const 
std::vector<std::string>& additional_properties);
 
-  Lines config_lines_;
+ public:
+  Lines::const_iterator begin() const;
+  Lines::const_iterator end() const;
+
+ protected:
+  Lines lines_;
 };
 
-}  // namespace encrypt_config
 }  // namespace minifi
 }  // namespace nifi
 }  // namespace apache
diff --git a/libminifi/include/utils/file/FileUtils.h 
b/libminifi/include/utils/file/FileUtils.h
index 721b455..c65473a 100644
--- a/libminifi/include/utils/file/FileUtils.h
+++ b/libminifi/include/utils/file/FileUtils.h
@@ -403,7 +403,7 @@ inline int create_dir(const std::string& path, bool 
recursive = true) {
 #endif
 }
 
-inline int copy_file(const std::string &path_from, const std::string 
dest_path) {
+inline int copy_file(const std::string &path_from, const std::string& 
dest_path) {
   std::ifstream src(path_from, std::ios::binary);
   if (!src.is_open())
     return -1;
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index fc92e36..25c7036 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -43,6 +43,7 @@
 #include "core/ClassLoader.h"
 #include "SchedulingAgent.h"
 #include "core/controller/ControllerServiceProvider.h"
+#include "core/controller/ForwardingControllerServiceProvider.h"
 #include "core/logging/LoggerConfiguration.h"
 #include "core/Connectable.h"
 #include "utils/file/PathUtils.h"
@@ -61,13 +62,12 @@ 
FlowController::FlowController(std::shared_ptr<core::Repository> provenance_repo
                                std::shared_ptr<Configure> configure, 
std::unique_ptr<core::FlowConfiguration> flow_configuration,
                                std::shared_ptr<core::ContentRepository> 
content_repo, const std::string /*name*/, bool headless_mode,
                                std::shared_ptr<utils::file::FileSystem> 
filesystem)
-    : 
core::controller::ControllerServiceProvider(core::getClassName<FlowController>()),
+    : 
core::controller::ForwardingControllerServiceProvider(core::getClassName<FlowController>()),
       c2::C2Client(std::move(configure), std::move(provenance_repo), 
std::move(flow_file_repo),
                    std::move(content_repo), std::move(flow_configuration), 
std::move(filesystem)),
       running_(false),
       updating_(false),
       initialized_(false),
-      
controller_service_map_(std::make_shared<core::controller::ControllerServiceMap>()),
       thread_pool_(2, false, nullptr, "Flowcontroller threadpool"),
       logger_(logging::LoggerFactory<FlowController>::getLogger()) {
   if (provenance_repo_ == nullptr)
@@ -212,7 +212,7 @@ int16_t FlowController::stop() {
     this->flow_file_repo_->stop();
     this->provenance_repo_->stop();
     // stop the ControllerServices
-    this->controller_service_provider_->disableAllControllerServices();
+    this->controller_service_provider_impl_->disableAllControllerServices();
     running_ = false;
   }
   return 0;
@@ -253,6 +253,37 @@ void FlowController::unload() {
   }
 }
 
+std::unique_ptr<core::ProcessGroup> FlowController::loadInitialFlow() {
+  std::unique_ptr<core::ProcessGroup> root = flow_configuration_->getRoot();
+  if (root) {
+    return root;
+  }
+  logger_->log_error("Couldn't load flow configuration file, trying to fetch 
it from C2 server");
+  auto opt_flow_url = configuration_->get(Configure::nifi_c2_flow_url);
+  if (!opt_flow_url) {
+    logger_->log_error("No flow configuration url found");
+    return nullptr;
+  }
+  // ensure that C2 connection is up and running
+  // since we don't have access to the flow definition, the C2 communication
+  // won't be able to use the services defined there, e.g. SSLContextService
+  controller_service_provider_impl_ = 
flow_configuration_->getControllerServiceProvider();
+  C2Client::initialize(this, shared_from_this());
+  auto opt_source = fetchFlow(*opt_flow_url);
+  if (!opt_source) {
+    logger_->log_error("Couldn't fetch flow configuration from C2 server");
+    return nullptr;
+  }
+  root = flow_configuration_->updateFromPayload(*opt_flow_url, *opt_source);
+  if (root) {
+    logger_->log_info("Successfully fetched valid flow configuration");
+    if (!flow_configuration_->persist(*opt_source)) {
+      logger_->log_info("Failed to write the fetched flow to disk");
+    }
+  }
+  return root;
+}
+
 void FlowController::load(const std::shared_ptr<core::ProcessGroup> &root, 
bool reload) {
   std::lock_guard<std::recursive_mutex> flow_lock(mutex_);
   if (running_) {
@@ -268,12 +299,12 @@ void FlowController::load(const 
std::shared_ptr<core::ProcessGroup> &root, bool
       this->root_ = root;
     } else {
       logger_->log_info("Instantiating new flow");
-      this->root_ = 
std::shared_ptr<core::ProcessGroup>(flow_configuration_->getRoot());
+      this->root_ = std::shared_ptr<core::ProcessGroup>(loadInitialFlow());
     }
 
     logger_->log_info("Loaded root processor Group");
     logger_->log_info("Initializing timers");
-    controller_service_provider_ = 
flow_configuration_->getControllerServiceProvider();
+    controller_service_provider_impl_ = 
flow_configuration_->getControllerServiceProvider();
     auto base_shared_ptr = 
std::dynamic_pointer_cast<core::controller::ControllerServiceProvider>(shared_from_this());
 
     if (!thread_pool_.isRunning() || reload) {
@@ -287,8 +318,8 @@ void FlowController::load(const 
std::shared_ptr<core::ProcessGroup> &root, bool
     conditionalReloadScheduler<EventDrivenSchedulingAgent>(event_scheduler_, 
!event_scheduler_ || reload);
     conditionalReloadScheduler<CronDrivenSchedulingAgent>(cron_scheduler_, 
!cron_scheduler_ || reload);
 
-    
std::static_pointer_cast<core::controller::StandardControllerServiceProvider>(controller_service_provider_)->setRootGroup(root_);
-    
std::static_pointer_cast<core::controller::StandardControllerServiceProvider>(controller_service_provider_)->setSchedulingAgent(
+    
std::static_pointer_cast<core::controller::StandardControllerServiceProvider>(controller_service_provider_impl_)->setRootGroup(root_);
+    
std::static_pointer_cast<core::controller::StandardControllerServiceProvider>(controller_service_provider_impl_)->setSchedulingAgent(
         std::static_pointer_cast<minifi::SchedulingAgent>(event_scheduler_));
 
     logger_->log_info("Loaded controller service provider");
@@ -337,7 +368,7 @@ int16_t FlowController::start() {
   } else {
     if (!running_) {
       logger_->log_info("Starting Flow Controller");
-      controller_service_provider_->enableAllControllerServices();
+      controller_service_provider_impl_->enableAllControllerServices();
       this->timer_scheduler_->start();
       this->event_scheduler_->start();
       this->cron_scheduler_->start();
@@ -360,166 +391,6 @@ int16_t FlowController::start() {
   }
 }
 
-/**
- * Controller Service functions
- *
- */
-
-/**
- * Creates a controller service through the controller service provider impl.
- * @param type class name
- * @param id service identifier
- * @param firstTimeAdded first time this CS was added
- */
-std::shared_ptr<core::controller::ControllerServiceNode> 
FlowController::createControllerService(const std::string &type, const 
std::string &fullType, const std::string &id, bool firstTimeAdded) {
-  return controller_service_provider_->createControllerService(type, fullType, 
id, firstTimeAdded);
-}
-
-/**
- * controller service provider
- */
-/**
- * removes controller service
- * @param serviceNode service node to be removed.
- */
-
-void FlowController::removeControllerService(const 
std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
-  controller_map_->removeControllerService(serviceNode);
-}
-
-/**
- * Enables the controller service services
- * @param serviceNode service node which will be disabled, along with linked 
services.
- */
-std::future<utils::TaskRescheduleInfo> 
FlowController::enableControllerService(std::shared_ptr<core::controller::ControllerServiceNode>
 &serviceNode) {
-  return controller_service_provider_->enableControllerService(serviceNode);
-}
-
-/**
- * Enables controller services
- * @param serviceNoden vector of service nodes which will be enabled, along 
with linked services.
- */
-void 
FlowController::enableControllerServices(std::vector<std::shared_ptr<core::controller::ControllerServiceNode>>
 /*serviceNodes*/) {
-}
-
-/**
- * Disables controller services
- * @param serviceNode service node which will be disabled, along with linked 
services.
- */
-std::future<utils::TaskRescheduleInfo> 
FlowController::disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode>
 &serviceNode) {
-  return controller_service_provider_->disableControllerService(serviceNode);
-}
-
-/**
- * Removes all controller services.
- */
-void FlowController::clearControllerServices() {
-  controller_service_provider_->clearControllerServices();
-}
-
-/**
- * Gets all controller services.
- */
-std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> 
FlowController::getAllControllerServices() {
-  return controller_service_provider_->getAllControllerServices();
-}
-
-/**
- * Gets the controller service for <code>identifier</code>
- * @param identifier service identifier
- * @return shared pointer to teh controller service implementation or nullptr 
if it does not exist.
- */
-std::shared_ptr<core::controller::ControllerService> 
FlowController::getControllerService(const std::string &identifier) {
-  return controller_service_provider_->getControllerService(identifier);
-}
-/**
- * Gets controller service node specified by <code>id</code>
- * @param id service identifier
- * @return shared pointer to the controller service node or nullptr if it does 
not exist.
- */
-std::shared_ptr<core::controller::ControllerServiceNode> 
FlowController::getControllerServiceNode(const std::string &id) const {
-  return controller_service_provider_->getControllerServiceNode(id);
-}
-
-void 
FlowController::verifyCanStopReferencingComponents(std::shared_ptr<core::controller::ControllerServiceNode>&
 /*serviceNode*/) {
-}
-
-/**
- * Unschedules referencing components.
- */
-std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> 
FlowController::unscheduleReferencingComponents(std::shared_ptr<core::controller::ControllerServiceNode>
 &serviceNode) {
-  return 
controller_service_provider_->unscheduleReferencingComponents(serviceNode);
-}
-
-/**
- * Verify can disable referencing components
- * @param serviceNode service node whose referenced components will be 
scheduled.
- */
-void 
FlowController::verifyCanDisableReferencingServices(std::shared_ptr<core::controller::ControllerServiceNode>
 &serviceNode) {
-  
controller_service_provider_->verifyCanDisableReferencingServices(serviceNode);
-}
-
-/**
- * Disables referencing components
- * @param serviceNode service node whose referenced components will be 
scheduled.
- */
-std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> 
FlowController::disableReferencingServices(std::shared_ptr<core::controller::ControllerServiceNode>
 &serviceNode) {
-  return controller_service_provider_->disableReferencingServices(serviceNode);
-}
-
-/**
- * Verify can enable referencing components
- * @param serviceNode service node whose referenced components will be 
scheduled.
- */
-void 
FlowController::verifyCanEnableReferencingServices(std::shared_ptr<core::controller::ControllerServiceNode>
 &serviceNode) {
-  
controller_service_provider_->verifyCanEnableReferencingServices(serviceNode);
-}
-
-/**
- * Determines if the controller service specified by identifier is enabled.
- */
-bool FlowController::isControllerServiceEnabled(const std::string &identifier) 
{
-  return controller_service_provider_->isControllerServiceEnabled(identifier);
-}
-
-/**
- * Enables referencing components
- * @param serviceNode service node whose referenced components will be 
scheduled.
- */
-std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> 
FlowController::enableReferencingServices(std::shared_ptr<core::controller::ControllerServiceNode>
 &serviceNode) {
-  return controller_service_provider_->enableReferencingServices(serviceNode);
-}
-
-/**
- * Schedules referencing components
- * @param serviceNode service node whose referenced components will be 
scheduled.
- */
-std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> 
FlowController::scheduleReferencingComponents(std::shared_ptr<core::controller::ControllerServiceNode>
 &serviceNode) {
-  return 
controller_service_provider_->scheduleReferencingComponents(serviceNode);
-}
-
-/**
- * Returns controller service components referenced by serviceIdentifier from 
the embedded
- * controller service provider;
- */
-std::shared_ptr<core::controller::ControllerService> 
FlowController::getControllerServiceForComponent(const std::string 
&serviceIdentifier, const utils::Identifier &componentId) {
-  return 
controller_service_provider_->getControllerServiceForComponent(serviceIdentifier,
 componentId);
-}
-
-/**
- * Enables all controller services for the provider.
- */
-void FlowController::enableAllControllerServices() {
-  controller_service_provider_->enableAllControllerServices();
-}
-
-/**
- * Disables all controller services for the provider.
- */
-void FlowController::disableAllControllerServices() {
-  controller_service_provider_->disableAllControllerServices();
-}
-
 int16_t FlowController::applyUpdate(const std::string &source, const 
std::string &configuration, bool persist) {
   if (applyConfiguration(source, configuration)) {
     if (persist) {
diff --git a/libminifi/src/Properties.cpp b/libminifi/src/Properties.cpp
deleted file mode 100644
index 0e615f7..0000000
--- a/libminifi/src/Properties.cpp
+++ /dev/null
@@ -1,245 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "properties/Properties.h"
-#include <string>
-#include "utils/StringUtils.h"
-#include "utils/file/FileUtils.h"
-#include "utils/file/PathUtils.h"
-#include "core/Core.h"
-#include "core/logging/LoggerConfiguration.h"
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-
-#define TRACE_BUFFER_SIZE 512
-
-Properties::Properties(const std::string& name)
-    : logger_(logging::LoggerFactory<Properties>::getLogger()),
-    name_(name) {
-}
-
-// Get the config value
-bool Properties::getString(const std::string &key, std::string &value) const {
-  std::lock_guard<std::mutex> lock(mutex_);
-  auto it = properties_.find(key);
-
-  if (it != properties_.end()) {
-    value = it->second;
-    return true;
-  } else {
-    return false;
-  }
-}
-
-int Properties::getInt(const std::string &key, int default_value) const {
-  std::lock_guard<std::mutex> lock(mutex_);
-  auto it = properties_.find(key);
-
-  return it != properties_.end() ? std::stoi(it->second) : default_value;
-}
-
-// Parse one line in configure file like key=value
-bool Properties::parseConfigureFileLine(char *buf, std::string &prop_key, 
std::string &prop_value) {
-  char *line = buf;
-
-  while ((line[0] == ' ') || (line[0] == '\t'))
-    ++line;
-
-  char first = line[0];
-  if ((first == '\0') || (first == '#') || (first == '[')  || (first == '\r') 
|| (first == '\n') || (first == '=')) {
-    return true;
-  }
-
-  char *equal = strchr(line, '=');
-  if (equal == NULL) {
-    return false;  // invalid property as this is not a comment or property 
line
-  }
-
-  equal[0] = '\0';
-  std::string key = line;
-
-  equal++;
-  while ((equal[0] == ' ') || (equal[0] == '\t'))
-    ++equal;
-
-  first = equal[0];
-  if ((first == '\0') || (first == '\r') || (first == '\n')) {
-    return true;  // empty properties are okay
-  }
-
-  std::string value = equal;
-  value = 
org::apache::nifi::minifi::utils::StringUtils::replaceEnvironmentVariables(value);
-  prop_key = org::apache::nifi::minifi::utils::StringUtils::trimRight(key);
-  prop_value = org::apache::nifi::minifi::utils::StringUtils::trimRight(value);
-  return true;
-}
-
-// Load Configure File
-void Properties::loadConfigureFile(const char *fileName) {
-  if (NULL == fileName) {
-    logger_->log_error("Configuration file path for %s is a nullptr!", 
getName().c_str());
-    return;
-  }
-
-  properties_file_ = 
utils::file::getFullPath(utils::file::FileUtils::concat_path(getHome(), 
fileName));
-
-  logger_->log_info("Using configuration file to load configuration for %s 
from %s (located at %s)", getName().c_str(), fileName, properties_file_);
-
-  std::ifstream file(properties_file_, std::ifstream::in);
-  if (!file.good()) {
-    logger_->log_error("load configure file failed %s", properties_file_);
-    return;
-  }
-  this->clear();
-
-  char buf[TRACE_BUFFER_SIZE];
-  for (file.getline(buf, TRACE_BUFFER_SIZE); file.good(); file.getline(buf, 
TRACE_BUFFER_SIZE)) {
-    std::string key, value;
-    if (parseConfigureFileLine(buf, key, value)) {
-      set(key, value);
-    }
-  }
-  dirty_ = false;
-}
-
-bool Properties::validateConfigurationFile(const std::string &configFile) {
-  std::ifstream file(configFile, std::ifstream::in);
-  if (!file.good()) {
-    logger_->log_error("Failed to load configuration file %s to configure %s", 
configFile, getName().c_str());
-    return false;
-  }
-
-  char buf[TRACE_BUFFER_SIZE];
-  for (file.getline(buf, TRACE_BUFFER_SIZE); file.good(); file.getline(buf, 
TRACE_BUFFER_SIZE)) {
-    std::string key, value;
-    if (!parseConfigureFileLine(buf, key, value)) {
-      logger_->log_error("While loading configuration for %s found invalid 
line: %s", getName().c_str(), buf);
-      return false;
-    }
-  }
-  return true;
-}
-
-bool Properties::persistProperties() {
-  std::lock_guard<std::mutex> lock(mutex_);
-  if (!dirty_) {
-    logger_->log_info("Attempt to persist, but properties are not updated");
-    return true;
-  }
-  std::ifstream file(properties_file_, std::ifstream::in);
-  if (!file.good()) {
-    logger_->log_error("load configure file failed %s", properties_file_);
-    return false;
-  }
-
-  std::map<std::string, std::string> properties_copy = properties_;
-
-  std::string new_file = properties_file_ + ".new";
-
-  std::ofstream output_file(new_file, std::ios::out);
-
-  char buf[TRACE_BUFFER_SIZE];
-  for (file.getline(buf, TRACE_BUFFER_SIZE); file.good(); file.getline(buf, 
TRACE_BUFFER_SIZE)) {
-    char *line = buf;
-
-    char first = line[0];
-
-    if ((first == '\0') || (first == '#') || (first == '[') || (first == '\r') 
|| (first == '\n') || (first == '=')) {
-      // persist comments and newlines
-      output_file << line << std::endl;
-      continue;
-    }
-
-
-
-    char *equal = strchr(line, '=');
-    if (equal == NULL) {
-      output_file << line << std::endl;
-      continue;
-    }
-
-    equal[0] = '\0';
-    std::string key = line;
-
-    equal++;
-    while ((equal[0] == ' ') || (equal[0] == '\t'))
-      ++equal;
-
-    first = equal[0];
-    if ((first == '\0') || (first == '\r') || (first == '\n')) {
-      output_file << line << std::endl;
-      continue;
-    }
-
-    key = org::apache::nifi::minifi::utils::StringUtils::trimRight(key);
-    std::string value = 
org::apache::nifi::minifi::utils::StringUtils::trimRight(equal);
-    auto hasIt = properties_copy.find(key);
-    if (hasIt != properties_copy.end() && !value.empty()) {
-      output_file << key << "=" << value << std::endl;
-    }
-    properties_copy.erase(key);
-  }
-
-  for (const auto &kv : properties_copy) {
-    if (!kv.first.empty() && !kv.second.empty())
-      output_file << kv.first << "=" << kv.second << std::endl;
-  }
-  output_file.close();
-
-  if (validateConfigurationFile(new_file)) {
-    const std::string backup = properties_file_ + ".bak";
-    if (!utils::file::FileUtils::copy_file(properties_file_, backup) && 
!utils::file::FileUtils::copy_file(new_file, properties_file_)) {
-      logger_->log_info("Persisted %s", properties_file_);
-      return true;
-    } else {
-      logger_->log_error("Could not update %s", properties_file_);
-    }
-  }
-
-  dirty_ = false;
-
-  return false;
-}
-
-// Parse Command Line
-void Properties::parseCommandLine(int argc, char **argv) {
-  int i;
-  bool keyFound = false;
-  std::string key, value;
-
-  for (i = 1; i < argc; i++) {
-    if (argv[i][0] == '-' && argv[i][1] != '\0') {
-      keyFound = true;
-      key = &argv[i][1];
-      continue;
-    }
-    if (keyFound) {
-      value = argv[i];
-      set(key, value);
-      keyFound = false;
-    }
-  }
-  return;
-}
-
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
diff --git a/libminifi/src/c2/C2Agent.cpp b/libminifi/src/c2/C2Agent.cpp
index eb49f3f..8f25c07 100644
--- a/libminifi/src/c2/C2Agent.cpp
+++ b/libminifi/src/c2/C2Agent.cpp
@@ -707,7 +707,7 @@ utils::TaskRescheduleInfo C2Agent::produce() {
       logger_->log_error("Exception occurred while performing heartbeat. 
error: %s", e.what());
     }
     catch (...) {
-      logger_->log_error("Unknonwn exception occurred while performing 
heartbeat.");
+      logger_->log_error("Unknown exception occurred while performing 
heartbeat.");
     }
   }
 
diff --git a/libminifi/src/c2/C2Client.cpp b/libminifi/src/c2/C2Client.cpp
index 70dd84d..d24fc2a 100644
--- a/libminifi/src/c2/C2Client.cpp
+++ b/libminifi/src/c2/C2Client.cpp
@@ -118,12 +118,21 @@ void 
C2Client::initialize(core::controller::ControllerServiceProvider *controlle
   loadC2ResponseConfiguration("nifi.c2.root.class.definitions");
 
   if (!initialized_) {
+    // C2Agent is initialized once, meaning that a C2-triggered 
flow/configuration update
+    // might not be equal to a fresh restart
     c2_agent_ = std::unique_ptr<c2::C2Agent>(new c2::C2Agent(controller, 
update_sink, configuration_, filesystem_));
     c2_agent_->start();
     initialized_ = true;
   }
 }
 
+utils::optional<std::string> C2Client::fetchFlow(const std::string& uri) const 
{
+  if (!c2_agent_) {
+    return {};
+  }
+  return c2_agent_->fetchFlow(uri);
+}
+
 void C2Client::initializeComponentMetrics() {
   {
     std::lock_guard<std::mutex> guard(metrics_mutex_);
diff --git a/libminifi/src/core/logging/LoggerConfiguration.cpp 
b/libminifi/src/core/logging/LoggerConfiguration.cpp
index 79fe083..a53b669 100644
--- a/libminifi/src/core/logging/LoggerConfiguration.cpp
+++ b/libminifi/src/core/logging/LoggerConfiguration.cpp
@@ -63,7 +63,7 @@ const char* LoggerConfiguration::spdlog_default_pattern = 
"[%Y-%m-%d %H:%M:%S.%e
 std::vector<std::string> LoggerProperties::get_keys_of_type(const std::string 
&type) {
   std::vector<std::string> appenders;
   std::string prefix = type + ".";
-  for (auto const & entry : properties_) {
+  for (auto const & entry : getProperties()) {
     if (entry.first.rfind(prefix, 0) == 0 && entry.first.find(".", 
prefix.length() + 1) == std::string::npos) {
       appenders.push_back(entry.first);
     }
diff --git a/libminifi/src/properties/Properties.cpp 
b/libminifi/src/properties/Properties.cpp
new file mode 100644
index 0000000..a062fd0
--- /dev/null
+++ b/libminifi/src/properties/Properties.cpp
@@ -0,0 +1,150 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "properties/Properties.h"
+#include <string>
+#include "utils/StringUtils.h"
+#include "utils/file/FileUtils.h"
+#include "utils/file/PathUtils.h"
+#include "core/Core.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "properties/PropertiesFile.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+
+#define TRACE_BUFFER_SIZE 512
+
+Properties::Properties(const std::string& name)
+    : logger_(logging::LoggerFactory<Properties>::getLogger()),
+    name_(name) {
+}
+
+// Get the config value
+bool Properties::getString(const std::string &key, std::string &value) const {
+  std::lock_guard<std::mutex> lock(mutex_);
+  auto it = properties_.find(key);
+
+  if (it != properties_.end()) {
+    value = it->second.value;
+    return true;
+  } else {
+    return false;
+  }
+}
+
+utils::optional<std::string> Properties::getString(const std::string& key) 
const {
+  std::string result;
+  const bool found = getString(key, result);
+  if (found) {
+    return result;
+  } else {
+    return utils::nullopt;
+  }
+}
+
+int Properties::getInt(const std::string &key, int default_value) const {
+  std::lock_guard<std::mutex> lock(mutex_);
+  auto it = properties_.find(key);
+
+  return it != properties_.end() ? std::stoi(it->second.value) : default_value;
+}
+
+// Load Configure File
+void Properties::loadConfigureFile(const char *fileName) {
+  std::lock_guard<std::mutex> lock(mutex_);
+  if (fileName == nullptr) {
+    logger_->log_error("Configuration file path for %s is a nullptr!", 
getName().c_str());
+    return;
+  }
+
+  properties_file_ = 
utils::file::getFullPath(utils::file::FileUtils::concat_path(getHome(), 
fileName));
+
+  logger_->log_info("Using configuration file to load configuration for %s 
from %s (located at %s)",
+                    getName().c_str(), fileName, properties_file_);
+
+  std::ifstream file(properties_file_, std::ifstream::in);
+  if (!file.good()) {
+    logger_->log_error("load configure file failed %s", properties_file_);
+    return;
+  }
+  properties_.clear();
+  for (const auto& line : PropertiesFile{file}) {
+    properties_[line.getKey()] = 
{utils::StringUtils::replaceEnvironmentVariables(line.getValue()), false};
+  }
+  dirty_ = false;
+}
+
+bool Properties::persistProperties() {
+  std::lock_guard<std::mutex> lock(mutex_);
+  if (!dirty_) {
+    logger_->log_info("Attempt to persist, but properties are not updated");
+    return true;
+  }
+  std::ifstream file(properties_file_, std::ifstream::in);
+  if (!file) {
+    logger_->log_error("load configure file failed %s", properties_file_);
+    return false;
+  }
+
+  std::string new_file = properties_file_ + ".new";
+
+  PropertiesFile current_content{file};
+  for (const auto& prop : properties_) {
+    if (!prop.second.changed) {
+      continue;
+    }
+    if (current_content.hasValue(prop.first)) {
+      current_content.update(prop.first, prop.second.value);
+    } else {
+      current_content.append(prop.first, prop.second.value);
+    }
+  }
+
+  try {
+    current_content.writeTo(new_file);
+  } catch (const std::exception&) {
+    logger_->log_error("Could not update %s", properties_file_);
+    return false;
+  }
+
+  const std::string backup = properties_file_ + ".bak";
+  if (!utils::file::FileUtils::copy_file(properties_file_, backup) && 
!utils::file::FileUtils::copy_file(new_file, properties_file_)) {
+    logger_->log_info("Persisted %s", properties_file_);
+    dirty_ = false;
+    return true;
+  }
+
+  logger_->log_error("Could not update %s", properties_file_);
+  return false;
+}
+
+std::map<std::string, std::string> Properties::getProperties() const {
+  std::lock_guard<std::mutex> lock(mutex_);
+  std::map<std::string, std::string> properties;
+  for (const auto& prop : properties_) {
+    properties[prop.first] = prop.second.value;
+  }
+  return properties;
+}
+
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
diff --git a/libminifi/src/properties/PropertiesFile.cpp 
b/libminifi/src/properties/PropertiesFile.cpp
new file mode 100644
index 0000000..be69d92
--- /dev/null
+++ b/libminifi/src/properties/PropertiesFile.cpp
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "properties/PropertiesFile.h"
+
+#include <algorithm>
+#include <fstream>
+#include <utility>
+#include "utils/StringUtils.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+
+PropertiesFile::Line::Line(std::string line) : line_(line) {
+  line = utils::StringUtils::trim(line);
+  if (line.empty() || line[0] == '#') { return; }
+
+  size_t index_of_first_equals_sign = line.find('=');
+  if (index_of_first_equals_sign == std::string::npos) { return; }
+
+  std::string key = utils::StringUtils::trim(line.substr(0, 
index_of_first_equals_sign));
+  if (key.empty()) { return; }
+
+  key_ = key;
+  value_ = utils::StringUtils::trim(line.substr(index_of_first_equals_sign + 
1));
+}
+
+PropertiesFile::Line::Line(std::string key, std::string value)
+  : line_{utils::StringUtils::join_pack(key, "=", value)}, 
key_{std::move(key)}, value_{std::move(value)} {
+}
+
+bool PropertiesFile::Line::isValidKey(const std::string &key) {
+  return !key.empty();
+}
+
+void PropertiesFile::Line::updateValue(const std::string& value) {
+  auto pos = line_.find('=');
+  if (pos != std::string::npos) {
+    line_.replace(pos + 1, std::string::npos, value);
+    value_ = value;
+  } else {
+    throw std::invalid_argument{"Cannot update value in config line: it does 
not contain an = sign!"};
+  }
+}
+
+PropertiesFile::PropertiesFile(std::istream& input_stream) {
+  std::string line;
+  while (std::getline(input_stream, line)) {
+    lines_.push_back(Line{line});
+  }
+}
+
+PropertiesFile::Lines::const_iterator PropertiesFile::findKey(const 
std::string& key) const {
+  if (!Line::isValidKey(key)) {
+    return lines_.cend();
+  }
+  return std::find_if(lines_.cbegin(), lines_.cend(), [&key](const Line& line) 
{
+    return line.getKey() == key;
+  });
+}
+
+PropertiesFile::Lines::iterator PropertiesFile::findKey(const std::string& 
key) {
+  if (!Line::isValidKey(key)) {
+    return lines_.end();
+  }
+  return std::find_if(lines_.begin(), lines_.end(), [&key](const Line& line) {
+    return line.getKey() == key;
+  });
+}
+
+bool PropertiesFile::hasValue(const std::string& key) const {
+  return findKey(key) != lines_.end();
+}
+
+utils::optional<std::string> PropertiesFile::getValue(const std::string& key) 
const {
+  const auto it = findKey(key);
+  if (it != lines_.end()) {
+    return it->getValue();
+  } else {
+    return utils::nullopt;
+  }
+}
+
+void PropertiesFile::update(const std::string& key, const std::string& value) {
+  auto it = findKey(key);
+  if (it != lines_.end()) {
+    it->updateValue(value);
+  } else {
+    throw std::invalid_argument{"Key " + key + " not found in the config 
file!"};
+  }
+}
+
+void PropertiesFile::insertAfter(const std::string& after_key, const 
std::string& key, const std::string& value) {
+  auto it = findKey(after_key);
+  if (it != lines_.end()) {
+    ++it;
+    lines_.emplace(it, key, value);
+  } else {
+    throw std::invalid_argument{"Key " + after_key + " not found in the config 
file!"};
+  }
+}
+
+void PropertiesFile::append(const std::string& key, const std::string& value) {
+  lines_.emplace_back(key, value);
+}
+
+int PropertiesFile::erase(const std::string& key) {
+  if (!Line::isValidKey(key)) {
+    return 0;
+  }
+  auto has_this_key = [&key](const Line& line) { return line.getKey() == key; 
};
+  auto new_end = std::remove_if(lines_.begin(), lines_.end(), has_this_key);
+  auto num_removed = std::distance(new_end, lines_.end());
+  lines_.erase(new_end, lines_.end());
+  return gsl::narrow<int>(num_removed);
+}
+
+void PropertiesFile::writeTo(const std::string& file_path) const {
+  try {
+    std::ofstream file{file_path};
+    file.exceptions(std::ios::failbit | std::ios::badbit);
+
+    for (const auto& line : lines_) {
+      file << line.getLine() << '\n';
+    }
+  } catch (const std::exception&) {
+    throw std::runtime_error{"Could not write to file " + file_path};
+  }
+}
+
+PropertiesFile::Lines::const_iterator PropertiesFile::begin() const {
+  return lines_.begin();
+}
+
+PropertiesFile::Lines::const_iterator PropertiesFile::end() const {
+  return lines_.end();
+}
+
+}  // namespace minifi
+}  // namespace nifi
+}  // namespace apache
+}  // namespace org
diff --git a/libminifi/test/resources/TestEmpty.yml 
b/libminifi/test/resources/TestEmpty.yml
new file mode 100644
index 0000000..a3e1837
--- /dev/null
+++ b/libminifi/test/resources/TestEmpty.yml
@@ -0,0 +1,21 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the \"License\"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an \"AS IS\" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+Flow Controller:
+  name: MiNiFi Flow
+Processors: []
+Connections: []
+Remote Processing Groups: []
+Provenance Reporting:

Reply via email to