szaszm commented on a change in pull request #605:
URL: https://github.com/apache/nifi-minifi-cpp/pull/605#discussion_r408966137



##########
File path: libminifi/test/keyvalue-tests/CMakeLists.txt
##########
@@ -0,0 +1,42 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+file(GLOB KEYVALUE_TESTS  "*.cpp")
+
+SET(KEYVALUE_INT_TEST_COUNT 0)
+
+FOREACH(testfile ${KEYVALUE_TESTS})
+    get_filename_component(testfilename "${testfile}" NAME_WE)
+    add_executable("${testfilename}" "${testfile}" )
+    target_include_directories(${testfilename} PRIVATE BEFORE 
"${CMAKE_SOURCE_DIR}/libminifi/test/")
+    createTests("${testfilename}")
+    target_wholearchive_library(${testfilename} minifi-standard-processors)
+    if (NOT DISABLE_ROCKSDB)
+        target_wholearchive_library(${testfilename} minifi-rocksdb-repos)
+    endif()
+
+    MATH(EXPR KEYVALUE_INT_TEST_COUNT "${KEYVALUE_INT_TEST_COUNT}+1")
+ENDFOREACH()
+
+message("-- Finished building ${KEYVALUE_INT_TEST_COUNT} keyvalue controller 
test file(s)...")
+
+add_test(NAME UnorderedMapPersistableKeyValueStoreServiceTest COMMAND 
PersistableKeyValueStoreServiceTest --config-yaml 
"${TEST_RESOURCES}/UnorderedMapPersistableKeyValueStoreServiceTest.yml")
+if (NOT DISABLE_ROCKSDB)
+    add_test(NAME RocksdDbPersistableKeyValueStoreServiceTest COMMAND 
PersistableKeyValueStoreServiceTest --config-yaml 
"${TEST_RESOURCES}/RocksDbPersistableKeyValueStoreServiceTest.yml")
+endif()

Review comment:
       missing line break at the end of the file

##########
File path: libminifi/test/TestBase.cpp
##########
@@ -45,6 +45,23 @@ TestPlan::TestPlan(std::shared_ptr<core::ContentRepository> 
content_repo, std::s
       flow_version_(flow_version),
       logger_(logging::LoggerFactory<TestPlan>::getLogger()) {
   stream_factory = 
org::apache::nifi::minifi::io::StreamFactory::getInstance(std::make_shared<minifi::Configure>());
+  controller_services_ = 
std::make_shared<core::controller::ControllerServiceMap>();
+  controller_services_provider_ = 
std::make_shared<core::controller::StandardControllerServiceProvider>(controller_services_,
 nullptr, configuration_);
+  /* Inject the default state provider ahead of ProcessContext to make sure we 
have a unique state directory */
+  if (state_dir == nullptr) {
+    char state_dir_name_template[] = "/tmp/teststate.XXXXXX";
+    state_dir_ = 
utils::file::FileUtils::create_temp_directory(state_dir_name_template);
+  } else {
+    state_dir_ = state_dir;
+  }
+  state_manager_provider_ = 
core::ProcessContext::getOrCreateDefaultStateManagerProvider(controller_services_provider_,
 configuration_, state_dir_.c_str());

Review comment:
       We should move the state dir to `/var/tmp` to avoid breaking the tests 
on tmpfs when using rocksdb storage, as described in 
[MINIFICPP-1188](https://issues.apache.org/jira/browse/MINIFICPP-1188).

##########
File path: libminifi/test/integration/IntegrationBase.h
##########
@@ -121,6 +122,11 @@ void IntegrationBase::run(std::string test_file_location) {
 
   core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, 
stream_factory, configuration, test_file_location);
 
+  auto controller_service_provider = yaml_ptr->getControllerServiceProvider();
+  char state_dir_name_template[] = "/tmp/integrationstate.XXXXXX";
+  state_dir = 
utils::file::FileUtils::create_temp_directory(state_dir_name_template);
+  
core::ProcessContext::getOrCreateDefaultStateManagerProvider(controller_service_provider,
 configuration, state_dir.c_str());

Review comment:
       We should move the state dir to `/var/tmp` to avoid breaking the tests 
on tmpfs when using rocksdb storage, as described in 
[MINIFICPP-1188](https://issues.apache.org/jira/browse/MINIFICPP-1188).

##########
File path: extensions/sftp/processors/ListSFTP.cpp
##########
@@ -507,85 +463,87 @@ ListSFTP::ListedEntity::ListedEntity(uint64_t timestamp_, 
uint64_t size_)
     , size(size_) {
 }
 
-bool ListSFTP::persistTrackingTimestampsCache(const std::string& hostname, 
const std::string& username, const std::string& remote_path) {
-  std::ofstream file(tracking_timestamps_state_filename_);
-  if (!file.is_open()) {
-    logger_->log_error("Failed to store state to Tracking Timestamps state 
file \"%s\"", tracking_timestamps_state_filename_.c_str());
+bool ListSFTP::persistTrackingTimestampsCache(const 
std::shared_ptr<core::ProcessContext>& context, const std::string& hostname, 
const std::string& username, const std::string& remote_path) {
+  auto state_manager = context->getStateManager();
+  if (state_manager == nullptr) {
     return false;
   }
-  file << "hostname=" << hostname << "\n";
-  file << "username=" << username << "\n";
-  file << "remote_path=" << remote_path << "\n";
-  file << "listing.timestamp=" << last_listed_latest_entry_timestamp_ << "\n";
-  file << "processed.timestamp=" << last_processed_latest_entry_timestamp_ << 
"\n";
+  std::unordered_map<std::string, std::string> state;
+  state["listing_strategy"] = LISTING_STRATEGY_TRACKING_TIMESTAMPS;
+  state["hostname"] = hostname;
+  state["username"] = username;
+  state["remote_path"] = remote_path;
+  state["listing.timestamp"] = 
std::to_string(last_listed_latest_entry_timestamp_);
+  state["processed.timestamp"] = 
std::to_string(last_processed_latest_entry_timestamp_);
   size_t i = 0;
   for (const auto& identifier : latest_identifiers_processed_) {
-    file << "id." << i << "=" << identifier << "\n";
+    state["id." + std::to_string(i)] = identifier;
     ++i;
   }
+  state_manager->set(state);
+  if (!state_manager->persist()) {
+    return false;
+  }
   return true;
 }
 
-bool ListSFTP::updateFromTrackingTimestampsCache(const std::string& hostname, 
const std::string& username, const std::string& remote_path) {
-  std::ifstream file(tracking_timestamps_state_filename_);
-  if (!file.is_open()) {
-    logger_->log_error("Failed to open Tracking Timestamps state file \"%s\"", 
tracking_timestamps_state_filename_.c_str());
-    return false;
-  }
+bool ListSFTP::updateFromTrackingTimestampsCache(const 
std::shared_ptr<core::ProcessContext>& context, const std::string& hostname, 
const std::string& username, const std::string& remote_path) {

Review comment:
       Are you open to changing it to `core::ProcessContext&` wherever the API 
allows that? It would simplify the API, stop saying the wrong thing (my 
original comment), not allow for null (except in arcane cases) and make future 
"de-`shared_ptr`-ification" refactorings easier.

##########
File path: extensions/sql/processors/ExecuteSQL.cpp
##########
@@ -81,14 +81,14 @@ void ExecuteSQL::initialize() {
   setSupportedRelationships( { s_success });
 }
 
-void ExecuteSQL::processOnSchedule(const core::ProcessContext &context) {
+void ExecuteSQL::processOnSchedule(core::ProcessContext &context) {
   initOutputFormat(context);
 
   context.getProperty(s_sqlSelectQuery.getName(), sqlSelectQuery_);
   context.getProperty(s_maxRowsPerFlowFile.getName(), max_rows_);
 }
 
-void ExecuteSQL::processOnTrigger(core::ProcessSession &session) {
+void ExecuteSQL::processOnTrigger(core::ProcessContext& /*context*/, 
core::ProcessSession& session) {

Review comment:
       Why introduce the new parameter if it's not used and not mandated by 
interface restrictions? (also in PutSQL)

##########
File path: extensions/sftp/processors/ListSFTP.cpp
##########
@@ -760,140 +713,83 @@ void ListSFTP::listByTrackingTimestamps(
   }
 }
 
-bool ListSFTP::persistTrackingEntitiesCache(const std::string& hostname, const 
std::string& username, const std::string& remote_path) {
-  std::ofstream file(tracking_entities_state_filename_);
-  if (!file.is_open()) {
-    logger_->log_error("Failed to store Tracking Entities state to state file 
\"%s\"", tracking_entities_state_filename_.c_str());
-    return false;
-  }
-  file << "hostname=" << hostname << "\n";
-  file << "username=" << username << "\n";
-  file << "remote_path=" << remote_path << "\n";
-  file << "json_state_file=" << tracking_entities_state_json_filename_ << "\n";
-  file.close();
-
-  std::ofstream json_file(tracking_entities_state_json_filename_);
-  if (!json_file.is_open()) {
-    logger_->log_error("Failed to store Tracking Entities state to state json 
file \"%s\"", tracking_entities_state_json_filename_.c_str());
-    return false;
-  }
-
-  rapidjson::Document entities(rapidjson::kObjectType);
-  rapidjson::Document::AllocatorType& alloc = entities.GetAllocator();
-  for (const auto& already_listed_entity : already_listed_entities_) {
-    rapidjson::Value entity(rapidjson::kObjectType);
-    entity.AddMember("timestamp", already_listed_entity.second.timestamp, 
alloc);
-    entity.AddMember("size", already_listed_entity.second.size, alloc);
-    entities.AddMember(rapidjson::Value(already_listed_entity.first.c_str(), 
alloc), std::move(entity), alloc);
+bool ListSFTP::persistTrackingEntitiesCache(const 
std::shared_ptr<core::ProcessContext>& context, const std::string& hostname, 
const std::string& username, const std::string& remote_path) {
+  std::unordered_map<std::string, std::string> state;
+  state["listing_strategy"] = listing_strategy_;
+  state["hostname"] = hostname;
+  state["username"] = username;
+  state["remote_path"] = remote_path;
+  size_t i = 0;
+  for (const auto &already_listed_entity : already_listed_entities_) {
+    state["entity." + std::to_string(i) + ".name"] = 
already_listed_entity.first;
+    state["entity." + std::to_string(i) + ".timestamp"] = 
std::to_string(already_listed_entity.second.timestamp);
+    state["entity." + std::to_string(i) + ".size"] = 
std::to_string(already_listed_entity.second.size);
+    ++i;
   }
-
-  rapidjson::OStreamWrapper osw(json_file);
-  rapidjson::Writer<rapidjson::OStreamWrapper> writer(osw);
-  entities.Accept(writer);
-
-  return true;
+  return state_manager_->set(state);
 }
 
-bool ListSFTP::updateFromTrackingEntitiesCache(const std::string& hostname, 
const std::string& username, const std::string& remote_path) {
-  std::ifstream file(tracking_entities_state_filename_);
-  if (!file.is_open()) {
-    logger_->log_error("Failed to open Tracking Entities state file \"%s\"", 
tracking_entities_state_filename_.c_str());
-    return false;
-  }
+bool ListSFTP::updateFromTrackingEntitiesCache(const 
std::shared_ptr<core::ProcessContext>& context, const std::string& hostname, 
const std::string& username, const std::string& remote_path) {
+  std::string state_listing_strategy;
   std::string state_hostname;
   std::string state_username;
   std::string state_remote_path;
-  std::string state_json_state_file;
-
-  std::string line;
-  while (std::getline(file, line)) {
-    size_t separator_pos = line.find('=');
-    if (separator_pos == std::string::npos) {
-      logger_->log_warn("None key-value line found in Tracking Entities state 
file \"%s\": \"%s\"", tracking_entities_state_filename_.c_str(), line.c_str());
-      continue;
-    }
-    std::string key = line.substr(0, separator_pos);
-    std::string value = line.substr(separator_pos + 1);
-    if (key == "hostname") {
-      state_hostname = std::move(value);
-    } else if (key == "username") {
-      state_username = std::move(value);
-    } else if (key == "remote_path") {
-      state_remote_path = std::move(value);
-    } else if (key == "json_state_file") {
-      state_json_state_file = std::move(value);
-    } else {
-      logger_->log_warn("Unknown key found in Tracking Entities state file 
\"%s\": \"%s\"", tracking_entities_state_filename_.c_str(), key.c_str());
-    }
-  }
-  file.close();
-
-  if (state_hostname != hostname ||
-      state_username != username ||
-      state_remote_path != remote_path) {
-    logger_->log_error("Tracking Entities state file \"%s\" was created with 
different settings than the current ones, ignoring. "
-                       "Hostname: \"%s\" vs. \"%s\", "
-                       "Username: \"%s\" vs. \"%s\", "
-                       "Remote Path: \"%s\" vs. \"%s\"",
-                       tracking_entities_state_filename_.c_str(),
-                       state_hostname, hostname,
-                       state_username, username,
-                       state_remote_path, remote_path);
-    return false;
-  }
+  std::unordered_map<std::string, ListedEntity> new_already_listed_entities;
 
-  if (state_json_state_file.empty()) {
-    logger_->log_error("Could not found json state file path in Tracking 
Entities state file \"%s\"", tracking_entities_state_filename_.c_str());
+  std::unordered_map<std::string, std::string> state_map;
+  if (!state_manager_->get(state_map)) {
+    logger_->log_debug("Failed to get state from StateManager");

Review comment:
       If we can make a distinction of such cases AND (if the state storage is 
present, but corrupted, OR the state storage is not present, but expected), 
then I'd like to log a warning, not an info.
   The current API doesn't seem to allow for such distinction and I'm not sure 
whether modifying it is worth it or even possible.

##########
File path: extensions/sftp/processors/ListSFTP.cpp
##########
@@ -760,140 +708,83 @@ void ListSFTP::listByTrackingTimestamps(
   }
 }
 
-bool ListSFTP::persistTrackingEntitiesCache(const std::string& hostname, const 
std::string& username, const std::string& remote_path) {
-  std::ofstream file(tracking_entities_state_filename_);
-  if (!file.is_open()) {
-    logger_->log_error("Failed to store Tracking Entities state to state file 
\"%s\"", tracking_entities_state_filename_.c_str());
-    return false;
-  }
-  file << "hostname=" << hostname << "\n";
-  file << "username=" << username << "\n";
-  file << "remote_path=" << remote_path << "\n";
-  file << "json_state_file=" << tracking_entities_state_json_filename_ << "\n";
-  file.close();
-
-  std::ofstream json_file(tracking_entities_state_json_filename_);
-  if (!json_file.is_open()) {
-    logger_->log_error("Failed to store Tracking Entities state to state json 
file \"%s\"", tracking_entities_state_json_filename_.c_str());
-    return false;
-  }
-
-  rapidjson::Document entities(rapidjson::kObjectType);
-  rapidjson::Document::AllocatorType& alloc = entities.GetAllocator();
-  for (const auto& already_listed_entity : already_listed_entities_) {
-    rapidjson::Value entity(rapidjson::kObjectType);
-    entity.AddMember("timestamp", already_listed_entity.second.timestamp, 
alloc);
-    entity.AddMember("size", already_listed_entity.second.size, alloc);
-    entities.AddMember(rapidjson::Value(already_listed_entity.first.c_str(), 
alloc), std::move(entity), alloc);
+bool ListSFTP::persistTrackingEntitiesCache(const 
std::shared_ptr<core::ProcessContext>& context, const std::string& hostname, 
const std::string& username, const std::string& remote_path) {
+  std::unordered_map<std::string, std::string> state;
+  state["listing_strategy"] = listing_strategy_;
+  state["hostname"] = hostname;
+  state["username"] = username;
+  state["remote_path"] = remote_path;
+  size_t i = 0;
+  for (const auto &already_listed_entity : already_listed_entities_) {
+    state["entity." + std::to_string(i) + ".name"] = 
already_listed_entity.first;
+    state["entity." + std::to_string(i) + ".timestamp"] = 
std::to_string(already_listed_entity.second.timestamp);
+    state["entity." + std::to_string(i) + ".size"] = 
std::to_string(already_listed_entity.second.size);
+    ++i;
   }
-
-  rapidjson::OStreamWrapper osw(json_file);
-  rapidjson::Writer<rapidjson::OStreamWrapper> writer(osw);
-  entities.Accept(writer);
-
-  return true;
+  return state_manager_->set(state);
 }
 
-bool ListSFTP::updateFromTrackingEntitiesCache(const std::string& hostname, 
const std::string& username, const std::string& remote_path) {
-  std::ifstream file(tracking_entities_state_filename_);
-  if (!file.is_open()) {
-    logger_->log_error("Failed to open Tracking Entities state file \"%s\"", 
tracking_entities_state_filename_.c_str());
-    return false;
-  }
+bool ListSFTP::updateFromTrackingEntitiesCache(const 
std::shared_ptr<core::ProcessContext>& context, const std::string& hostname, 
const std::string& username, const std::string& remote_path) {
+  std::string state_listing_strategy;
   std::string state_hostname;
   std::string state_username;
   std::string state_remote_path;
-  std::string state_json_state_file;
-
-  std::string line;
-  while (std::getline(file, line)) {
-    size_t separator_pos = line.find('=');
-    if (separator_pos == std::string::npos) {
-      logger_->log_warn("None key-value line found in Tracking Entities state 
file \"%s\": \"%s\"", tracking_entities_state_filename_.c_str(), line.c_str());
-      continue;
-    }
-    std::string key = line.substr(0, separator_pos);
-    std::string value = line.substr(separator_pos + 1);
-    if (key == "hostname") {
-      state_hostname = std::move(value);
-    } else if (key == "username") {
-      state_username = std::move(value);
-    } else if (key == "remote_path") {
-      state_remote_path = std::move(value);
-    } else if (key == "json_state_file") {
-      state_json_state_file = std::move(value);
-    } else {
-      logger_->log_warn("Unknown key found in Tracking Entities state file 
\"%s\": \"%s\"", tracking_entities_state_filename_.c_str(), key.c_str());
-    }
-  }
-  file.close();
-
-  if (state_hostname != hostname ||
-      state_username != username ||
-      state_remote_path != remote_path) {
-    logger_->log_error("Tracking Entities state file \"%s\" was created with 
different settings than the current ones, ignoring. "
-                       "Hostname: \"%s\" vs. \"%s\", "
-                       "Username: \"%s\" vs. \"%s\", "
-                       "Remote Path: \"%s\" vs. \"%s\"",
-                       tracking_entities_state_filename_.c_str(),
-                       state_hostname, hostname,
-                       state_username, username,
-                       state_remote_path, remote_path);
-    return false;
-  }
+  std::unordered_map<std::string, ListedEntity> new_already_listed_entities;
 
-  if (state_json_state_file.empty()) {
-    logger_->log_error("Could not found json state file path in Tracking 
Entities state file \"%s\"", tracking_entities_state_filename_.c_str());
+  std::unordered_map<std::string, std::string> state_map;
+  if (!state_manager_->get(state_map)) {
+    logger_->log_debug("Failed to get state from StateManager");
     return false;
   }
-
-  std::ifstream json_file(state_json_state_file);
-  if (!json_file.is_open()) {
-    logger_->log_error("Failed to open entities Tracking Entities state json 
file \"%s\"", state_json_state_file.c_str());
-    return false;
-  }
-
   try {
-    rapidjson::IStreamWrapper isw(json_file);
-    rapidjson::Document d;
-    rapidjson::ParseResult res = d.ParseStream(isw);
-    if (!res) {
-      logger_->log_error("Failed to parse Tracking Entities state json file 
\"%s\"", state_json_state_file.c_str());
-      return false;
-    }
-    if (!d.IsObject()) {
-      logger_->log_error("Tracking Entities state json file \"%s\" root is not 
an object", state_json_state_file.c_str());
-      return false;
-    }
+    state_listing_strategy = state_map.at("listing_strategy");
+    state_hostname = state_map.at("hostname");
+    state_username = state_map.at("username");
+    state_remote_path = state_map.at("remote_path");
 
     std::unordered_map<std::string, ListedEntity> new_already_listed_entities;
-    for (const auto &already_listed_entity : d.GetObject()) {
-      auto it = already_listed_entity.value.FindMember("timestamp");
-      if (it == already_listed_entity.value.MemberEnd() || 
!it->value.IsUint64()) {
-        logger_->log_error("Tracking Entities state json file \"%s\" timestamp 
missing or malformatted for entity \"%s\"",
-            state_json_state_file.c_str(),
-            already_listed_entity.name.GetString());
-        continue;
+    size_t i = 0;
+    while (true) {
+      std::string name;
+      try {
+        name = state_map.at("entity." + std::to_string(i) + ".name");
+      } catch (...) {
+        break;
       }
-      uint64_t timestamp = it->value.GetUint64();
-      it = already_listed_entity.value.FindMember("size");
-      if (it == already_listed_entity.value.MemberEnd() || 
!it->value.IsUint64()) {
-        logger_->log_error("Tracking Entities state json file \"%s\" size 
missing or malformatted for entity \"%s\"",
-                           state_json_state_file.c_str(),
-                           already_listed_entity.name.GetString());
+      try {
+        uint64_t timestamp = std::stoull(state_map.at("entity." + 
std::to_string(i) + ".timestamp"));
+        uint64_t size = std::stoull(state_map.at("entity." + std::to_string(i) 
+ ".size"));
+        new_already_listed_entities.emplace(std::piecewise_construct,
+                                            std::forward_as_tuple(name),
+                                            std::forward_as_tuple(timestamp, 
size));
+      } catch (...) {
         continue;

Review comment:
       A warning/error like would be useful, like before.

##########
File path: extensions/sftp/processors/ListSFTP.cpp
##########
@@ -507,85 +468,74 @@ ListSFTP::ListedEntity::ListedEntity(uint64_t timestamp_, 
uint64_t size_)
     , size(size_) {
 }
 
-bool ListSFTP::persistTrackingTimestampsCache(const std::string& hostname, 
const std::string& username, const std::string& remote_path) {
-  std::ofstream file(tracking_timestamps_state_filename_);
-  if (!file.is_open()) {
-    logger_->log_error("Failed to store state to Tracking Timestamps state 
file \"%s\"", tracking_timestamps_state_filename_.c_str());
-    return false;
-  }
-  file << "hostname=" << hostname << "\n";
-  file << "username=" << username << "\n";
-  file << "remote_path=" << remote_path << "\n";
-  file << "listing.timestamp=" << last_listed_latest_entry_timestamp_ << "\n";
-  file << "processed.timestamp=" << last_processed_latest_entry_timestamp_ << 
"\n";
+bool ListSFTP::persistTrackingTimestampsCache(const 
std::shared_ptr<core::ProcessContext>& context, const std::string& hostname, 
const std::string& username, const std::string& remote_path) {
+  std::unordered_map<std::string, std::string> state;
+  state["listing_strategy"] = LISTING_STRATEGY_TRACKING_TIMESTAMPS;
+  state["hostname"] = hostname;
+  state["username"] = username;
+  state["remote_path"] = remote_path;
+  state["listing.timestamp"] = 
std::to_string(last_listed_latest_entry_timestamp_);
+  state["processed.timestamp"] = 
std::to_string(last_processed_latest_entry_timestamp_);
   size_t i = 0;
   for (const auto& identifier : latest_identifiers_processed_) {
-    file << "id." << i << "=" << identifier << "\n";
+    state["id." + std::to_string(i)] = identifier;
     ++i;
   }
-  return true;
+  return state_manager_->set(state);
 }
 
-bool ListSFTP::updateFromTrackingTimestampsCache(const std::string& hostname, 
const std::string& username, const std::string& remote_path) {
-  std::ifstream file(tracking_timestamps_state_filename_);
-  if (!file.is_open()) {
-    logger_->log_error("Failed to open Tracking Timestamps state file \"%s\"", 
tracking_timestamps_state_filename_.c_str());
-    return false;
-  }
+bool ListSFTP::updateFromTrackingTimestampsCache(const 
std::shared_ptr<core::ProcessContext>& context, const std::string& hostname, 
const std::string& username, const std::string& remote_path) {
+  std::string state_listing_strategy;
   std::string state_hostname;
   std::string state_username;
   std::string state_remote_path;
   uint64_t state_listing_timestamp;
   uint64_t state_processed_timestamp;
   std::set<std::string> state_ids;
 
-  std::string line;
-  while (std::getline(file, line)) {
-    size_t separator_pos = line.find('=');
-    if (separator_pos == std::string::npos) {
-      logger_->log_warn("None key-value line found in Tracking Timestamps 
state file \"%s\": \"%s\"", tracking_timestamps_state_filename_.c_str(), 
line.c_str());
+  std::unordered_map<std::string, std::string> state_map;
+  if (!state_manager_->get(state_map)) {
+    logger_->log_info("Found no stored state");
+    return false;
+  }
+  try {
+    state_listing_strategy = state_map.at("listing_strategy");
+    state_hostname = state_map.at("hostname");
+    state_username = state_map.at("username");
+    state_remote_path = state_map.at("remote_path");
+    try {
+      state_listing_timestamp = stoull(state_map.at("listing.timestamp"));
+    } catch (...) {
+      return false;
     }
-    std::string key = line.substr(0, separator_pos);
-    std::string value = line.substr(separator_pos + 1);
-    if (key == "hostname") {
-      state_hostname = std::move(value);
-    } else if (key == "username") {
-      state_username = std::move(value);
-    } else if (key == "remote_path") {
-      state_remote_path = std::move(value);
-    } else if (key == "listing.timestamp") {
-      try {
-        state_listing_timestamp = stoull(value);
-      } catch (...) {
-        logger_->log_error("listing.timestamp is not an uint64 in Tracking 
Timestamps state file \"%s\"", tracking_timestamps_state_filename_.c_str());
-        return false;
-      }
-    } else if (key == "processed.timestamp") {
-      try {
-        state_processed_timestamp = stoull(value);
-      } catch (...) {
-        logger_->log_error("processed.timestamp is not an uint64 in Tracking 
Timestamps state file \"%s\"", tracking_timestamps_state_filename_.c_str());
-        return false;
+    try {
+      state_processed_timestamp = stoull(state_map.at("processed.timestamp"));
+    } catch (...) {
+      return false;
+    }

Review comment:
       The inner catch-alls are redundant, since the outer one does the same: 
`return false`.
   I'd prefer to have the log messages restored, which may make nested `try` 
blocks necessary again.

##########
File path: extensions/standard-processors/processors/TailFile.cpp
##########
@@ -232,20 +236,61 @@ void TailFile::parseStateFileLine(char *buf) {
     const auto file = key.substr(strlen(POSITION_STR));
     tail_states_[file].currentTailFilePosition_ = std::stoull(value);
   }
-
-  return;
 }
 
-bool TailFile::recoverState() {
-  std::ifstream file(state_file_.c_str(), std::ifstream::in);
-  if (!file.good()) {
-    logger_->log_error("load state file failed %s", state_file_);
-    return false;
+
+
+bool TailFile::recoverState(const std::shared_ptr<core::ProcessContext>& 
context) {
+  bool state_load_success = false;
+
+  std::unordered_map<std::string, std::string> state_map;
+  if (state_manager_->get(state_map)) {
+    std::map<std::string, TailState> new_tail_states;
+    size_t i = 0;
+    while (true) {
+      std::string name;
+      try {
+        name = state_map.at("file." + std::to_string(i) + ".name");
+      } catch (...) {
+        break;
+      }
+      try {
+        const std::string& current = state_map.at("file." + std::to_string(i) 
+ ".current");
+        uint64_t position = std::stoull(state_map.at("file." + 
std::to_string(i) + ".position"));
+
+        std::string fileLocation, fileName;
+        if (utils::file::PathUtils::getFileNameAndPath(current, fileLocation, 
fileName)) {
+          logger_->log_debug("Received path %s, file %s", fileLocation, 
fileName);
+          new_tail_states.emplace(fileName, TailState { fileLocation, 
fileName, position, 0 });
+        } else {
+          new_tail_states.emplace(current, TailState { fileLocation, current, 
position, 0 });
+        }
+      } catch (...) {
+        continue;
+      }
+      ++i;
+    }
+    state_load_success = true;
+    tail_states_ = std::move(new_tail_states);
+    for (const auto& s : tail_states_) {
+      logger_->log_debug("TailState %s: %s, %s, %llu, %llu", s.first, 
s.second.path_, s.second.current_file_name_, s.second.currentTailFilePosition_, 
s.second.currentTailFileModificationTime_);
+    }
+  } else {
+    logger_->log_info("Found no stored state");
   }
-  tail_states_.clear();
-  char buf[BUFFER_SIZE];
-  for (file.getline(buf, BUFFER_SIZE); file.good(); file.getline(buf, 
BUFFER_SIZE)) {
-    parseStateFileLine(buf);
+
+  /* We could not get the state form the StateManager, try to migrate the old 
state file if it exists */

Review comment:
       typo: s/form/from/

##########
File path: libminifi/src/core/ConfigurableComponent.cpp
##########
@@ -195,6 +195,19 @@ bool 
ConfigurableComponent::setSupportedProperties(std::set<Property> properties
   return true;
 }
 
+bool ConfigurableComponent::updateSupportedProperties(std::set<Property> 
properties) {
+  if (!canEdit()) {
+    return false;
+  }
+
+  std::lock_guard<std::mutex> lock(configuration_mutex_);
+
+  for (auto item : properties) {
+    properties_[item.getName()] = item;

Review comment:
       We can avoid both copies of each property.
   ```
   for (auto& item: properties) {
     const auto name = item.getName();
     properties_[name] = std::move(item);
   }
   ```

##########
File path: 
libminifi/src/controllers/keyvalue/AbstractCoreComponentStateManagerProvider.cpp
##########
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "controllers/keyvalue/AbstractCoreComponentStateManagerProvider.h"
+
+#include "rapidjson/rapidjson.h"
+#include "rapidjson/document.h"
+#include "rapidjson/stringbuffer.h"
+#include "rapidjson/writer.h"
+
+#include <memory>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace controllers {
+
+AbstractCoreComponentStateManagerProvider::AbstractCoreComponentStateManager::AbstractCoreComponentStateManager(
+    std::shared_ptr<AbstractCoreComponentStateManagerProvider> provider,
+    const std::string& id)
+    : provider_(std::move(provider))
+    , id_(id)
+    , state_valid_(false) {
+  std::string serialized;
+  if (provider_->getImpl(id_, serialized) && 
provider_->deserialize(serialized, state_)) {
+    state_valid_ = true;
+  }
+}
+
+bool 
AbstractCoreComponentStateManagerProvider::AbstractCoreComponentStateManager::set(const
 std::unordered_map<std::string, std::string>& kvs) {
+  if (provider_->setImpl(id_, provider_->serialize(kvs))) {
+    state_valid_ = true;
+    state_ = kvs;
+    return true;
+  } else {
+    return false;
+  }
+}
+
+bool 
AbstractCoreComponentStateManagerProvider::AbstractCoreComponentStateManager::get(std::unordered_map<std::string,
 std::string>& kvs) {
+  if (!state_valid_) {
+    return false;
+  }
+  kvs = state_;
+  return true;
+}
+
+bool 
AbstractCoreComponentStateManagerProvider::AbstractCoreComponentStateManager::clear()
 {
+  if (!state_valid_) {
+    return false;
+  }
+  if (provider_->removeImpl(id_)) {
+    state_valid_ = false;
+    state_.clear();
+    return true;
+  } else {
+    return false;
+  }
+}
+
+bool 
AbstractCoreComponentStateManagerProvider::AbstractCoreComponentStateManager::persist()
 {
+  if (!state_valid_) {
+    return false;
+  }
+  return provider_->persistImpl();
+}
+
+AbstractCoreComponentStateManagerProvider::~AbstractCoreComponentStateManagerProvider()
 {
+}

Review comment:
       It's possible to use `= default;` in .cpp files as well.

##########
File path: 
extensions/standard-processors/controllers/UnorderedMapKeyValueStoreService.cpp
##########
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "UnorderedMapKeyValueStoreService.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace controllers {
+
+UnorderedMapKeyValueStoreService::UnorderedMapKeyValueStoreService(const 
std::string& name, const std::string& id)
+    : KeyValueStoreService(name, id)
+    , 
logger_(logging::LoggerFactory<UnorderedMapKeyValueStoreService>::getLogger()) {
+}
+
+UnorderedMapKeyValueStoreService::UnorderedMapKeyValueStoreService(const 
std::string& name, utils::Identifier uuid /*= utils::Identifier()*/)
+    : KeyValueStoreService(name, uuid)
+    , 
logger_(logging::LoggerFactory<UnorderedMapKeyValueStoreService>::getLogger()) {
+}
+
+UnorderedMapKeyValueStoreService::UnorderedMapKeyValueStoreService(const 
std::string& name, const std::shared_ptr<Configure> &configuration)
+    : KeyValueStoreService(name)
+    , 
logger_(logging::LoggerFactory<UnorderedMapKeyValueStoreService>::getLogger())  
{
+  setConfiguration(configuration);
+  initialize();
+}
+
+UnorderedMapKeyValueStoreService::~UnorderedMapKeyValueStoreService() {
+}
+
+bool UnorderedMapKeyValueStoreService::set(const std::string& key, const 
std::string& value) {
+  std::lock_guard<std::recursive_mutex> lock(mutex_);
+  map_[key] = value;
+  return true;
+}
+
+bool UnorderedMapKeyValueStoreService::get(const std::string& key, 
std::string& value) {
+  std::lock_guard<std::recursive_mutex> lock(mutex_);
+  auto it = map_.find(key);
+  if (it == map_.end()) {
+    return false;
+  } else {
+    value = it->second;
+    return true;
+  }
+}
+
+bool UnorderedMapKeyValueStoreService::get(std::unordered_map<std::string, 
std::string>& kvs) {
+  std::lock_guard<std::recursive_mutex> lock(mutex_);
+  kvs = map_;
+  return true;
+}
+
+bool UnorderedMapKeyValueStoreService::remove(const std::string& key) {
+  std::lock_guard<std::recursive_mutex> lock(mutex_);
+  return map_.erase(key) == 1U;
+}
+
+bool UnorderedMapKeyValueStoreService::clear() {
+  std::lock_guard<std::recursive_mutex> lock(mutex_);
+  map_.clear();
+  return true;
+}
+
+bool UnorderedMapKeyValueStoreService::update(const std::string& key, const 
std::function<bool(bool /*exists*/, std::string& /*value*/)>& update_func) {
+  std::lock_guard<std::recursive_mutex> lock(mutex_);
+  bool exists = false;
+  std::string value;
+  auto it = map_.find(key);
+  if (it != map_.end()) {
+    exists = true;
+    value = it->second;
+  }
+  try {
+    if (!update_func(exists, value)) {
+      return false;
+    }
+  } catch (...) {
+    return false;
+  }

Review comment:
       I think we shouldn't silently swallow the exceptions of the callback. If 
I'm wrong, could you please add a code comment explaining why we do it in this 
case?

##########
File path: extensions/standard-processors/tests/unit/TailFileTests.cpp
##########
@@ -501,7 +481,7 @@ TEST_CASE("TailFileWithDelimiterMultipleDelimiters", 
"[tailfiletest2]") {
   std::string line1(4097, '\n');
   std::mt19937 gen(std::random_device { }());
   std::generate_n(line1.begin(), 4095, [&]() -> char {
-    return 32 + gen() % (127 - 32);
+  return 32 + gen() % (127 - 32);

Review comment:
       The old indentation looks correct to me.
   
   `std::uniform_int_distribution{ 32, 126 }` would be a more readable way of 
generating these characters, if you're interested in changing this piece of old 
code.

##########
File path: extensions/sql/processors/QueryDatabaseTable.cpp
##########
@@ -308,19 +240,43 @@ void QueryDatabaseTable::processOnSchedule(const 
core::ProcessContext &context)
   context.getProperty(s_sqlQuery.getName(), sqlQuery_);
   context.getProperty(s_maxRowsPerFlowFile.getName(), maxRowsPerFlowFile_);
 
-  std::string stateDir;
-  context.getProperty(s_stateDirectory.getName(), stateDir);
-  if (stateDir.empty()) {
-    logger_->log_error("State Directory is empty");
-    return;
+  mapState_.clear();
+
+  state_manager_ = context.getStateManager();
+  if (state_manager_ == nullptr) {
+    throw Exception(PROCESSOR_EXCEPTION, "Failed to get StateManager");
   }
 
-  pState_ = std::make_unique<State>(tableName_, stateDir, getUUIDStr(), 
logger_);
-  if (!*pState_) {
-    return;
+  std::unordered_map<std::string, std::string> state_map;
+  if (state_manager_->get(state_map)) {
+    if (state_map[TABLENAME_KEY] != tableName_) {
+      state_manager_->clear();
+    } else {
+      for (auto&& elem : state_map) {
+        if (elem.first.find(MAXVALUE_KEY_PREFIX) == 0) {
+          mapState_.emplace(elem.first.substr(MAXVALUE_KEY_PREFIX.length()), 
std::move(elem.second));
+        }
+      }
+    }
+  } else {
+    // Try to migrate legacy state file
+    std::string stateDir;
+    context.getProperty(s_stateDirectory.getName(), stateDir);
+    if (!stateDir.empty()) {
+      LegacyState legacyState(tableName_, stateDir, getUUIDStr(), logger_);
+      if (legacyState) {
+        mapState_ = legacyState.getStateMap();
+        if (saveState() && state_manager_->persist()) {
+          logger_->log_info("State migration successful");
+          legacyState.moveStateFileToMigrated();
+        } else {
+          logger_->log_warn("Failed to persists migrated state");
+        }
+      } else {
+        logger_->log_warn("Could not migrate state from specified State 
Directory %s", stateDir);
+      }
+    }

Review comment:
       I suggest extracting this to a function, since we can name it and it's 
non-trivial. Maybe `this->migrateLegacyStateFile(stateDir)`?

##########
File path: libminifi/src/c2/C2Agent.cpp
##########
@@ -435,6 +437,27 @@ void C2Agent::handle_c2_server_response(const 
C2ContentResponse &resp) {
         update_sink_->drainRepositories();
         C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true);
         enqueue_c2_response(std::move(response));
+      } else if (resp.name == "corecomponentstate") {

Review comment:
       Could you add a `TODO: untested` comment to the top of the block?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to