This is an automated email from the ASF dual-hosted git repository.

martinzink pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 545236fd06b613f2d2b38f74ff9ee85df9190d59
Author: Gabor Gyimesi <gamezb...@gmail.com>
AuthorDate: Tue Mar 12 13:51:58 2024 +0100

    MINIFICPP-2298 Make RocksDB options configurable
        - Also set keep_log_file_num default value to 5
    
    Closes #1731
    Signed-off-by: Martin Zink <martinz...@apache.org>
---
 CONFIGURE.md                                       | 22 +++++++++++
 cmake/BundledRocksDB.cmake                         | 14 ++++---
 .../rocksdb-repos/DatabaseContentRepository.cpp    |  8 ++--
 extensions/rocksdb-repos/FlowFileRepository.cpp    |  7 ++--
 extensions/rocksdb-repos/ProvenanceRepository.cpp  |  7 ++--
 .../controllers/RocksDbStateStorage.cpp            |  7 ++--
 .../rocksdb-repos/database/RocksDatabase.cpp       | 12 +++---
 extensions/rocksdb-repos/database/RocksDatabase.h  |  5 ++-
 .../rocksdb-repos/database/RocksDbInstance.cpp     | 29 ++++++++++----
 .../rocksdb-repos/database/RocksDbInstance.h       |  4 +-
 extensions/rocksdb-repos/database/RocksDbUtils.cpp | 25 ++++++++++++
 extensions/rocksdb-repos/database/RocksDbUtils.h   |  3 ++
 .../tests/DBContentRepositoryTests.cpp             |  2 +-
 .../rocksdb-repos/tests/RocksDBStreamTests.cpp     |  2 +-
 extensions/rocksdb-repos/tests/RocksDBTests.cpp    | 38 +++++++++---------
 .../rocksdb-repos/tests/RocksDBUtilsTests.cpp      | 46 ++++++++++++++++++++++
 libminifi/include/properties/Configuration.h       |  5 +++
 libminifi/include/properties/Properties.h          |  1 -
 .../rocksdb/dboptions_equality_operator.patch      | 21 ++++++++++
 19 files changed, 198 insertions(+), 60 deletions(-)

diff --git a/CONFIGURE.md b/CONFIGURE.md
index 16469db58..2df8d695a 100644
--- a/CONFIGURE.md
+++ b/CONFIGURE.md
@@ -425,6 +425,28 @@ Rocksdb has an option to run compaction at specific 
intervals not just when need
      nifi.flowfile.repository.rocksdb.compaction.period=2 min
      nifi.database.content.repository.rocksdb.compaction.period=2 min
 
+### Global RocksDB options
+
+There are a few options for RocksDB that are set for all used RocksDB 
databases in MiNiFi:
+    `create_if_missing` is set to `true`
+    `use_direct_io_for_flush_and_compaction` is set to `true`
+    `use_direct_reads` is set to `true`
+    `keep_log_file_num` is set to `5`
+
+Any RocksDB option can be set or overriden using the 
`nifi.global.rocksdb.options.` prefix in the minifi.properties file.
+
+    # in minifi.properties
+    nifi.global.rocksdb.options.keep_log_file_num=7
+    nifi.global.rocksdb.options.atomic_flush=true
+
+RocksDB options can also be overridden for a specific repository using the 
`nifi.<repository>.rocksdb.options.` prefix in the minifi.properties file.
+
+    # in minifi.properties
+    nifi.flowfile.repository.rocksdb.options.keep_log_file_num=3
+    nifi.content.repository.rocksdb.options.atomic_flush=true
+    nifi.provenance.repository.rocksdb.options.use_direct_reads=false
+    
nifi.state.storage.rocksdb.options.use_direct_io_for_flush_and_compaction=false
+
 #### Shared database
 
 It is also possible to use a single database to store multiple repositories 
with the `minifidb://` scheme.
diff --git a/cmake/BundledRocksDB.cmake b/cmake/BundledRocksDB.cmake
index f1dc063d3..557d44df0 100644
--- a/cmake/BundledRocksDB.cmake
+++ b/cmake/BundledRocksDB.cmake
@@ -27,9 +27,12 @@ function(use_bundled_rocksdb SOURCE_DIR BINARY_DIR)
     endif()
 
     # Patch to fix build issue on ARM7 architecture: 
https://github.com/facebook/rocksdb/issues/8609#issuecomment-1009572506
-    set(PATCH_FILE "${SOURCE_DIR}/thirdparty/rocksdb/arm7.patch")
-    set(PC ${Bash_EXECUTABLE} -c "set -x && \
-            (\"${Patch_EXECUTABLE}\" -p1 -R -s -f --dry-run -i 
\"${PATCH_FILE}\" || \"${Patch_EXECUTABLE}\" -p1 -N -i \"${PATCH_FILE}\")")
+    set(PATCH_FILE_1 "${SOURCE_DIR}/thirdparty/rocksdb/arm7.patch")
+    set(PATCH_FILE_2 
"${SOURCE_DIR}/thirdparty/rocksdb/dboptions_equality_operator.patch")
+    set(PC ${Bash_EXECUTABLE} -c "set -x &&\
+            (\"${Patch_EXECUTABLE}\" -p1 -R -s -f --dry-run -i 
\"${PATCH_FILE_1}\" || \"${Patch_EXECUTABLE}\" -p1 -N -i \"${PATCH_FILE_1}\") 
&&\
+            (\"${Patch_EXECUTABLE}\" -p1 -R -s -f --dry-run -i 
\"${PATCH_FILE_2}\" || \"${Patch_EXECUTABLE}\" -p1 -N -i \"${PATCH_FILE_2}\") ")
+
     # Define byproducts
     if (WIN32)
         set(BYPRODUCT "lib/rocksdb.lib")
@@ -49,7 +52,6 @@ function(use_bundled_rocksdb SOURCE_DIR BINARY_DIR)
             -DUSE_RTTI=1
             -DROCKSDB_BUILD_SHARED=OFF
             -DFAIL_ON_WARNINGS=OFF
-            -DCMAKE_CXX_STANDARD=17  # RocksDB fails to build in C++20 mode on 
GCC 11: https://godbolt.org/z/YeMcEzs8W
             )
     if(PORTABLE)
         list(APPEND ROCKSDB_CMAKE_ARGS -DPORTABLE=ON)
@@ -71,8 +73,8 @@ function(use_bundled_rocksdb SOURCE_DIR BINARY_DIR)
     # Build project
     ExternalProject_Add(
             rocksdb-external
-            URL 
"https://github.com/facebook/rocksdb/archive/refs/tags/v8.1.1.tar.gz";
-            URL_HASH 
"SHA256=9102704e169cfb53e7724a30750eeeb3e71307663852f01fa08d5a320e6155a8"
+            URL 
"https://github.com/facebook/rocksdb/archive/refs/tags/v8.10.2.tar.gz";
+            URL_HASH 
"SHA256=44b6ec2f4723a0d495762da245d4a59d38704e0d9d3d31c45af4014bee853256"
             SOURCE_DIR "${BINARY_DIR}/thirdparty/rocksdb-src"
             CMAKE_ARGS ${ROCKSDB_CMAKE_ARGS}
             PATCH_COMMAND ${PC}
diff --git a/extensions/rocksdb-repos/DatabaseContentRepository.cpp 
b/extensions/rocksdb-repos/DatabaseContentRepository.cpp
index 65781fef9..b1cd9859c 100644
--- a/extensions/rocksdb-repos/DatabaseContentRepository.cpp
+++ b/extensions/rocksdb-repos/DatabaseContentRepository.cpp
@@ -56,10 +56,7 @@ bool DatabaseContentRepository::initialize(const 
std::shared_ptr<minifi::Configu
   setCompactionPeriod(configuration);
 
   auto set_db_opts = [encrypted_env] 
(minifi::internal::Writable<rocksdb::DBOptions>& db_opts) {
-    db_opts.set(&rocksdb::DBOptions::create_if_missing, true);
-    db_opts.set(&rocksdb::DBOptions::use_direct_io_for_flush_and_compaction, 
true);
-    db_opts.set(&rocksdb::DBOptions::use_direct_reads, true);
-    db_opts.set(&rocksdb::DBOptions::error_if_exists, false);
+    minifi::internal::setCommonRocksDbOptions(db_opts);
     if (encrypted_env) {
       db_opts.set(&rocksdb::DBOptions::env, encrypted_env.get(), 
EncryptionEq{});
     } else {
@@ -74,7 +71,8 @@ bool DatabaseContentRepository::initialize(const 
std::shared_ptr<minifi::Configu
       cf_opts.compression = *compression_type;
     }
   };
-  db_ = minifi::internal::RocksDatabase::create(set_db_opts, set_cf_opts, 
directory_);
+  db_ = minifi::internal::RocksDatabase::create(set_db_opts, set_cf_opts, 
directory_,
+    minifi::internal::getRocksDbOptionsToOverride(configuration, 
Configure::nifi_content_repository_rocksdb_options));
   if (db_->open()) {
     logger_->log_debug("NiFi Content DB Repository database open {} success", 
directory_);
     is_valid_ = true;
diff --git a/extensions/rocksdb-repos/FlowFileRepository.cpp 
b/extensions/rocksdb-repos/FlowFileRepository.cpp
index 67f08cba5..ca81c795a 100644
--- a/extensions/rocksdb-repos/FlowFileRepository.cpp
+++ b/extensions/rocksdb-repos/FlowFileRepository.cpp
@@ -185,9 +185,7 @@ bool FlowFileRepository::initialize(const 
std::shared_ptr<Configure> &configure)
   logger_->log_info("Using {} FlowFileRepository", encrypted_env ? "encrypted" 
: "plaintext");
 
   auto db_options = [encrypted_env] 
(minifi::internal::Writable<rocksdb::DBOptions>& options) {
-    options.set(&rocksdb::DBOptions::create_if_missing, true);
-    options.set(&rocksdb::DBOptions::use_direct_io_for_flush_and_compaction, 
true);
-    options.set(&rocksdb::DBOptions::use_direct_reads, true);
+    minifi::internal::setCommonRocksDbOptions(options);
     if (encrypted_env) {
       options.set(&rocksdb::DBOptions::env, encrypted_env.get(), 
EncryptionEq{});
     } else {
@@ -210,7 +208,8 @@ bool FlowFileRepository::initialize(const 
std::shared_ptr<Configure> &configure)
       cf_opts.compression = *compression_type;
     }
   };
-  db_ = minifi::internal::RocksDatabase::create(db_options, cf_options, 
directory_);
+  db_ = minifi::internal::RocksDatabase::create(db_options, cf_options, 
directory_,
+    minifi::internal::getRocksDbOptionsToOverride(configure, 
Configure::nifi_flowfile_repository_rocksdb_options));
   if (db_->open()) {
     logger_->log_debug("NiFi FlowFile Repository database open {} success", 
directory_);
     return true;
diff --git a/extensions/rocksdb-repos/ProvenanceRepository.cpp 
b/extensions/rocksdb-repos/ProvenanceRepository.cpp
index 42bf840fa..68f404fe3 100644
--- a/extensions/rocksdb-repos/ProvenanceRepository.cpp
+++ b/extensions/rocksdb-repos/ProvenanceRepository.cpp
@@ -40,9 +40,7 @@ bool ProvenanceRepository::initialize(const 
std::shared_ptr<org::apache::nifi::m
   logger_->log_debug("MiNiFi Provenance Max Storage Time: [{}]", 
max_partition_millis_);
 
   auto db_options = [] (minifi::internal::Writable<rocksdb::DBOptions>& 
db_opts) {
-    db_opts.set(&rocksdb::DBOptions::create_if_missing, true);
-    db_opts.set(&rocksdb::DBOptions::use_direct_io_for_flush_and_compaction, 
true);
-    db_opts.set(&rocksdb::DBOptions::use_direct_reads, true);
+    minifi::internal::setCommonRocksDbOptions(db_opts);
   };
 
   // Rocksdb write buffers act as a log of database operation: grow till 
reaching the limit, serialized after
@@ -60,7 +58,8 @@ bool ProvenanceRepository::initialize(const 
std::shared_ptr<org::apache::nifi::m
     }
   };
 
-  db_ = minifi::internal::RocksDatabase::create(db_options, cf_options, 
directory_);
+  db_ = minifi::internal::RocksDatabase::create(db_options, cf_options, 
directory_,
+    minifi::internal::getRocksDbOptionsToOverride(config, 
Configure::nifi_provenance_repository_rocksdb_options));
   if (db_->open()) {
     logger_->log_debug("MiNiFi Provenance Repository database open {} 
success", directory_);
   } else {
diff --git a/extensions/rocksdb-repos/controllers/RocksDbStateStorage.cpp 
b/extensions/rocksdb-repos/controllers/RocksDbStateStorage.cpp
index a07e1a451..965165ced 100644
--- a/extensions/rocksdb-repos/controllers/RocksDbStateStorage.cpp
+++ b/extensions/rocksdb-repos/controllers/RocksDbStateStorage.cpp
@@ -67,9 +67,7 @@ void RocksDbStateStorage::onEnable() {
   logger_->log_info("Using {} RocksDbStateStorage", encrypted_env ? 
"encrypted" : "plaintext");
 
   auto set_db_opts = [encrypted_env] (internal::Writable<rocksdb::DBOptions>& 
db_opts) {
-    db_opts.set(&rocksdb::DBOptions::create_if_missing, true);
-    db_opts.set(&rocksdb::DBOptions::use_direct_io_for_flush_and_compaction, 
true);
-    db_opts.set(&rocksdb::DBOptions::use_direct_reads, true);
+    minifi::internal::setCommonRocksDbOptions(db_opts);
     if (encrypted_env) {
       db_opts.set(&rocksdb::DBOptions::env, encrypted_env.get(), 
core::repository::EncryptionEq{});
     } else {
@@ -81,7 +79,8 @@ void RocksDbStateStorage::onEnable() {
     cf_opts.write_buffer_size = 8ULL << 20U;
     cf_opts.min_write_buffer_number_to_merge = 1;
   };
-  db_ = minifi::internal::RocksDatabase::create(set_db_opts, set_cf_opts, 
directory_);
+  db_ = minifi::internal::RocksDatabase::create(set_db_opts, set_cf_opts, 
directory_,
+    minifi::internal::getRocksDbOptionsToOverride(configuration_, 
Configure::nifi_state_storage_rocksdb_options));
   if (db_->open()) {
     logger_->log_trace("Successfully opened RocksDB database at {}", 
directory_.c_str());
   } else {
diff --git a/extensions/rocksdb-repos/database/RocksDatabase.cpp 
b/extensions/rocksdb-repos/database/RocksDatabase.cpp
index 78d31db18..c4a166a6b 100644
--- a/extensions/rocksdb-repos/database/RocksDatabase.cpp
+++ b/extensions/rocksdb-repos/database/RocksDatabase.cpp
@@ -29,7 +29,8 @@ namespace org::apache::nifi::minifi::internal {
 
 std::shared_ptr<core::logging::Logger> RocksDatabase::logger_ = 
core::logging::LoggerFactory<RocksDatabase>::getLogger();
 
-std::unique_ptr<RocksDatabase> RocksDatabase::create(const DBOptionsPatch& 
db_options_patch, const ColumnFamilyOptionsPatch& cf_options_patch, const 
std::string& uri, RocksDbMode mode) {
+std::unique_ptr<RocksDatabase> RocksDatabase::create(const DBOptionsPatch& 
db_options_patch, const ColumnFamilyOptionsPatch& cf_options_patch, const 
std::string& uri,
+    const std::unordered_map<std::string, std::string>& db_config_override, 
RocksDbMode mode) {
   const std::string scheme = "minifidb://";
 
   logger_->log_info("Acquiring database handle '{}'", uri);
@@ -56,7 +57,7 @@ std::unique_ptr<RocksDatabase> RocksDatabase::create(const 
DBOptionsPatch& db_op
 
   if (mode == RocksDbMode::ReadOnly) {
     // no need to cache anything with read-only databases
-    return 
std::make_unique<RocksDatabase>(std::make_shared<RocksDbInstance>(db_path, 
mode), db_column, db_options_patch, cf_options_patch);
+    return 
std::make_unique<RocksDatabase>(std::make_shared<RocksDbInstance>(db_path, 
mode), db_column, db_options_patch, cf_options_patch, db_config_override);
   }
 
   static std::mutex mtx;
@@ -74,12 +75,13 @@ std::unique_ptr<RocksDatabase> RocksDatabase::create(const 
DBOptionsPatch& db_op
       logger_->log_info("Using previously opened rocksdb instance '{}'", 
db_path);
     }
   }
-  return std::make_unique<RocksDatabase>(instance, db_column, 
db_options_patch, cf_options_patch);
+  return std::make_unique<RocksDatabase>(instance, db_column, 
db_options_patch, cf_options_patch, db_config_override);
 }
 
-RocksDatabase::RocksDatabase(std::shared_ptr<RocksDbInstance> db, std::string 
column, const DBOptionsPatch& db_options_patch, const ColumnFamilyOptionsPatch& 
cf_options_patch)
+RocksDatabase::RocksDatabase(std::shared_ptr<RocksDbInstance> db, std::string 
column, const DBOptionsPatch& db_options_patch, const ColumnFamilyOptionsPatch& 
cf_options_patch,
+  const std::unordered_map<std::string, std::string>& db_config_override)
     : column_(std::move(column)), db_(std::move(db)) {
-  db_->registerColumnConfig(column_, db_options_patch, cf_options_patch);
+  db_->registerColumnConfig(column_, db_options_patch, cf_options_patch, 
db_config_override);
 }
 
 RocksDatabase::~RocksDatabase() {
diff --git a/extensions/rocksdb-repos/database/RocksDatabase.h 
b/extensions/rocksdb-repos/database/RocksDatabase.h
index acaa3a6d4..7bcb229e9 100644
--- a/extensions/rocksdb-repos/database/RocksDatabase.h
+++ b/extensions/rocksdb-repos/database/RocksDatabase.h
@@ -21,6 +21,7 @@
 #include <memory>
 #include <optional>
 #include <string>
+#include <unordered_map>
 
 #include "rocksdb/db.h"
 #include "logging/Logger.h"
@@ -40,9 +41,11 @@ class RocksDatabase {
   static std::unique_ptr<RocksDatabase> create(const DBOptionsPatch& 
db_options_patch,
                                                const ColumnFamilyOptionsPatch& 
cf_options_patch,
                                                const std::string& uri,
+                                               const 
std::unordered_map<std::string, std::string>& db_config_override,
                                                RocksDbMode mode = 
RocksDbMode::ReadWrite);
 
-  RocksDatabase(std::shared_ptr<RocksDbInstance> db, std::string column, const 
DBOptionsPatch& db_options_patch, const ColumnFamilyOptionsPatch& 
cf_options_patch);
+  RocksDatabase(std::shared_ptr<RocksDbInstance> db, std::string column, const 
DBOptionsPatch& db_options_patch, const ColumnFamilyOptionsPatch& 
cf_options_patch,
+    const std::unordered_map<std::string, std::string>& db_config_override);
 
   RocksDatabase(const RocksDatabase&) = delete;
   RocksDatabase(RocksDatabase&&) = delete;
diff --git a/extensions/rocksdb-repos/database/RocksDbInstance.cpp 
b/extensions/rocksdb-repos/database/RocksDbInstance.cpp
index 0bab95c1d..4c23e0ec8 100644
--- a/extensions/rocksdb-repos/database/RocksDbInstance.cpp
+++ b/extensions/rocksdb-repos/database/RocksDbInstance.cpp
@@ -42,9 +42,11 @@ void RocksDbInstance::invalidate(const 
std::lock_guard<std::mutex>&) {
   impl_.reset();
 }
 
-void RocksDbInstance::registerColumnConfig(const std::string& column, const 
DBOptionsPatch& db_options_patch, const ColumnFamilyOptionsPatch& 
cf_options_patch) {
+void RocksDbInstance::registerColumnConfig(const std::string& column, const 
DBOptionsPatch& db_options_patch, const ColumnFamilyOptionsPatch& 
cf_options_patch,
+    const std::unordered_map<std::string, std::string>& db_config_override) {
   std::lock_guard<std::mutex> db_guard{mtx_};
   logger_->log_trace("Registering column '{}' in database '{}'", column, 
db_name_);
+  db_config_override_ = db_config_override;
   auto [_, inserted] = column_configs_.insert({column, ColumnConfig{.dbo_patch 
= db_options_patch, .cfo_patch = cf_options_patch}});
   if (!inserted) {
     throw std::runtime_error("Configuration is already registered for column 
'" + column + "'");
@@ -60,8 +62,14 @@ void RocksDbInstance::registerColumnConfig(const 
std::string& column, const DBOp
       Writable<rocksdb::DBOptions> db_opts_writer(db_opts_copy);
       if (db_options_patch) {
         db_options_patch(db_opts_writer);
-        if (db_opts_writer.isModified()) {
-          logger_->log_trace("Requested a difference DBOptions than the one 
that was used to open the database");
+        rocksdb::ConfigOptions conf_options;
+        conf_options.sanity_level = 
rocksdb::ConfigOptions::kSanityLevelLooselyCompatible;
+        auto db_config_override_status = 
rocksdb::GetDBOptionsFromMap(conf_options, db_opts_copy, db_config_override_, 
&db_opts_copy);
+        if (!db_config_override_status.ok()) {
+          throw std::runtime_error("Failed to override RocksDB options from 
minifi.properties file: " + db_config_override_status.ToString());
+        }
+        if (db_opts_copy != db_options_) {
+          logger_->log_trace("Requested a different DBOptions than the one 
that was used to open the database");
           return true;
         }
       }
@@ -130,7 +138,7 @@ std::optional<OpenRocksDb> RocksDbInstance::open(const 
std::string& column) {
     }
     db_options_ = rocksdb::DBOptions{};
     std::vector<rocksdb::ColumnFamilyDescriptor> cf_descriptors;
-    rocksdb::Status option_status = rocksdb::LoadLatestOptions(conf_options, 
db_name_, &db_options_, &cf_descriptors);
+    rocksdb::Status latest_option_status = 
rocksdb::LoadLatestOptions(conf_options, db_name_, &db_options_, 
&cf_descriptors);
     {
       // apply the database options patchers
       Writable<rocksdb::DBOptions> db_options_writer(db_options_);
@@ -148,14 +156,19 @@ std::optional<OpenRocksDb> RocksDbInstance::open(const 
std::string& column) {
         }
       }
     }
-    if (option_status.ok()) {
+    auto db_config_override_status = 
rocksdb::GetDBOptionsFromMap(conf_options, db_options_, db_config_override_, 
&db_options_);
+    if (!db_config_override_status.ok()) {
+      logger_->log_error("Failed to override RocksDB options from 
minifi.properties file: {}", db_config_override_status.ToString());
+      return std::nullopt;
+    }
+    if (latest_option_status.ok()) {
       logger_->log_trace("Found existing database '{}', checking 
compatibility", db_name_);
       rocksdb::Status compat_status = 
rocksdb::CheckOptionsCompatibility(conf_options, db_name_, db_options_, 
cf_descriptors);
       if (!compat_status.ok()) {
         logger_->log_error("Incompatible database options: {}", 
compat_status.ToString());
         return std::nullopt;
       }
-    } else if (option_status.IsNotFound()) {
+    } else if (latest_option_status.IsNotFound()) {
       logger_->log_trace("Database at '{}' not found, creating", db_name_);
       rocksdb::ColumnFamilyOptions default_cf_options;
       if (auto it = column_configs_.find("default"); it != 
column_configs_.end()) {
@@ -164,8 +177,8 @@ std::optional<OpenRocksDb> RocksDbInstance::open(const 
std::string& column) {
         }
       }
       cf_descriptors.emplace_back("default", default_cf_options);
-    } else if (!option_status.ok()) {
-      logger_->log_error("Couldn't query database '{}' for options: '{}'", 
db_name_, option_status.ToString());
+    } else if (!latest_option_status.ok()) {
+      logger_->log_error("Couldn't query database '{}' for options: '{}'", 
db_name_, latest_option_status.ToString());
       return std::nullopt;
     }
     std::vector<rocksdb::ColumnFamilyHandle*> column_handles;
diff --git a/extensions/rocksdb-repos/database/RocksDbInstance.h 
b/extensions/rocksdb-repos/database/RocksDbInstance.h
index 182324958..b4796c4eb 100644
--- a/extensions/rocksdb-repos/database/RocksDbInstance.h
+++ b/extensions/rocksdb-repos/database/RocksDbInstance.h
@@ -61,11 +61,13 @@ class RocksDbInstance {
 
   void invalidate(const std::lock_guard<std::mutex>&);
 
-  void registerColumnConfig(const std::string& column, const DBOptionsPatch& 
db_options_patch, const ColumnFamilyOptionsPatch& cf_options_patch);
+  void registerColumnConfig(const std::string& column, const DBOptionsPatch& 
db_options_patch, const ColumnFamilyOptionsPatch& cf_options_patch,
+    const std::unordered_map<std::string, std::string>& db_config_override);
   void unregisterColumnConfig(const std::string& column);
 
   rocksdb::DBOptions db_options_;
   const std::string db_name_;
+  std::unordered_map<std::string, std::string> db_config_override_;
   const RocksDbMode mode_;
 
   std::mutex mtx_;
diff --git a/extensions/rocksdb-repos/database/RocksDbUtils.cpp 
b/extensions/rocksdb-repos/database/RocksDbUtils.cpp
index d3834dfef..223cbb27e 100644
--- a/extensions/rocksdb-repos/database/RocksDbUtils.cpp
+++ b/extensions/rocksdb-repos/database/RocksDbUtils.cpp
@@ -51,4 +51,29 @@ std::optional<rocksdb::CompressionType> 
readConfiguredCompressionType(const std:
 #endif
 }
 
+
+void setCommonRocksDbOptions(Writable<rocksdb::DBOptions>& db_opts) {
+  db_opts.set(&rocksdb::DBOptions::create_if_missing, true);
+  db_opts.set(&rocksdb::DBOptions::use_direct_io_for_flush_and_compaction, 
true);
+  db_opts.set(&rocksdb::DBOptions::use_direct_reads, true);
+  db_opts.set(&rocksdb::DBOptions::keep_log_file_num, 5);
+}
+
+std::unordered_map<std::string, std::string> getRocksDbOptionsToOverride(const 
std::shared_ptr<Configure> &configuration, std::string_view custom_db_prefix) {
+  std::unordered_map<std::string, std::string> options;
+  const auto addOverrideOptions = [&configuration, &options](std::string_view 
prefix) {
+    if (prefix.empty()) {
+      return;
+    }
+    for (const auto& [key, value] : configuration->getProperties()) {
+      if (key.starts_with(prefix)) {
+        options[key.substr(prefix.size())] = value;
+      }
+    }
+  };
+  addOverrideOptions(Configuration::nifi_global_rocksdb_options);
+  addOverrideOptions(custom_db_prefix);
+  return options;
+}
+
 }  // namespace org::apache::nifi::minifi::internal
diff --git a/extensions/rocksdb-repos/database/RocksDbUtils.h 
b/extensions/rocksdb-repos/database/RocksDbUtils.h
index b5748c117..0deac2c85 100644
--- a/extensions/rocksdb-repos/database/RocksDbUtils.h
+++ b/extensions/rocksdb-repos/database/RocksDbUtils.h
@@ -22,6 +22,7 @@
 #include <optional>
 #include <memory>
 #include <string>
+#include <unordered_map>
 
 #include "rocksdb/db.h"
 #include "rocksdb/options.h"
@@ -71,5 +72,7 @@ using DBOptionsPatch = 
std::function<void(Writable<rocksdb::DBOptions>&)>;
 using ColumnFamilyOptionsPatch = 
std::function<void(rocksdb::ColumnFamilyOptions&)>;
 
 std::optional<rocksdb::CompressionType> readConfiguredCompressionType(const 
std::shared_ptr<Configure> &configuration, const std::string& config_key);
+void setCommonRocksDbOptions(Writable<rocksdb::DBOptions>& db_opts);
+std::unordered_map<std::string, std::string> getRocksDbOptionsToOverride(const 
std::shared_ptr<Configure> &configuration, std::string_view custom_db_prefix);
 
 }  // namespace org::apache::nifi::minifi::internal
diff --git a/extensions/rocksdb-repos/tests/DBContentRepositoryTests.cpp 
b/extensions/rocksdb-repos/tests/DBContentRepositoryTests.cpp
index d46601930..5f9eb7b25 100644
--- a/extensions/rocksdb-repos/tests/DBContentRepositoryTests.cpp
+++ b/extensions/rocksdb-repos/tests/DBContentRepositoryTests.cpp
@@ -330,7 +330,7 @@ TEST_CASE("ProcessSession::read can read zero length 
flowfiles without crash (Ro
 }
 
 size_t getDbSize(const std::filesystem::path& dir) {
-  auto db = minifi::internal::RocksDatabase::create({}, {}, dir.string());
+  auto db = minifi::internal::RocksDatabase::create({}, {}, dir.string(), {});
   auto opendb = db->open();
   REQUIRE(opendb);
 
diff --git a/extensions/rocksdb-repos/tests/RocksDBStreamTests.cpp 
b/extensions/rocksdb-repos/tests/RocksDBStreamTests.cpp
index 4b8e2563d..e9433e576 100644
--- a/extensions/rocksdb-repos/tests/RocksDBStreamTests.cpp
+++ b/extensions/rocksdb-repos/tests/RocksDBStreamTests.cpp
@@ -34,7 +34,7 @@ class RocksDBStreamTest : TestController {
     auto set_cf_opts = [] (rocksdb::ColumnFamilyOptions& cf_opts) {
       cf_opts.merge_operator = 
std::make_shared<core::repository::StringAppender>();
     };
-    db = minifi::internal::RocksDatabase::create(set_db_opts, set_cf_opts, 
dbPath);
+    db = minifi::internal::RocksDatabase::create(set_db_opts, set_cf_opts, 
dbPath, {});
     REQUIRE(db->open());
   }
 
diff --git a/extensions/rocksdb-repos/tests/RocksDBTests.cpp 
b/extensions/rocksdb-repos/tests/RocksDBTests.cpp
index 51cea8e7c..8a923a051 100644
--- a/extensions/rocksdb-repos/tests/RocksDBTests.cpp
+++ b/extensions/rocksdb-repos/tests/RocksDBTests.cpp
@@ -70,7 +70,7 @@ void 
new_db_opts(minifi::internal::Writable<rocksdb::DBOptions>& db_opts) {
 }
 
 TEST_CASE_METHOD(RocksDBTest, "Malformed database uri - Missing column name", 
"[rocksDBTest1]") {
-  auto db = minifi::internal::RocksDatabase::create({}, {}, 
"minifidb://malformed");
+  auto db = minifi::internal::RocksDatabase::create({}, {}, 
"minifidb://malformed", {});
   REQUIRE(!db);
   REQUIRE(utils::verifyLogLinePresenceInPollTime(
       std::chrono::seconds{1}, "Couldn't detect the column name in 
'minifidb://malformed'"));
@@ -78,7 +78,7 @@ TEST_CASE_METHOD(RocksDBTest, "Malformed database uri - 
Missing column name", "[
 
 TEST_CASE_METHOD(RocksDBTest, "Can write to default column", "[rocksDBTest2]") 
{
   {
-    auto db = minifi::internal::RocksDatabase::create(new_db_opts, {}, db_dir);
+    auto db = minifi::internal::RocksDatabase::create(new_db_opts, {}, db_dir, 
{});
     auto opendb = db->open();
     opendb->Put(rocksdb::WriteOptions{}, "fruit", "apple");
   }
@@ -92,7 +92,7 @@ TEST_CASE_METHOD(RocksDBTest, "Can write to default column", 
"[rocksDBTest2]") {
 
 TEST_CASE_METHOD(RocksDBTest, "Can write to specific column using the rocksdb 
uri scheme", "[rocksDBTest3]") {
   {
-    auto db = minifi::internal::RocksDatabase::create(new_db_opts, {}, 
"minifidb://" + db_dir + "/column_one");
+    auto db = minifi::internal::RocksDatabase::create(new_db_opts, {}, 
"minifidb://" + db_dir + "/column_one", {});
     auto opendb = db->open();
     opendb->Put(rocksdb::WriteOptions{}, "fruit", "apple");
   }
@@ -106,11 +106,11 @@ TEST_CASE_METHOD(RocksDBTest, "Can write to specific 
column using the rocksdb ur
 
 TEST_CASE_METHOD(RocksDBTest, "Can write to two specific columns at once", 
"[rocksDBTest4]") {
   {
-    auto db1 = minifi::internal::RocksDatabase::create(new_db_opts, {}, 
"minifidb://" + db_dir + "/column_one");
+    auto db1 = minifi::internal::RocksDatabase::create(new_db_opts, {}, 
"minifidb://" + db_dir + "/column_one", {});
     auto opendb1 = db1->open();
     REQUIRE(opendb1);
     opendb1->Put(rocksdb::WriteOptions{}, "fruit", "apple");
-    auto db2 = minifi::internal::RocksDatabase::create(new_db_opts, {}, 
"minifidb://" + db_dir + "/column_two");
+    auto db2 = minifi::internal::RocksDatabase::create(new_db_opts, {}, 
"minifidb://" + db_dir + "/column_two", {});
     auto opendb2 = db2->open();
     REQUIRE(opendb2);
     opendb2->Put(rocksdb::WriteOptions{}, "animal", "penguin");
@@ -128,11 +128,11 @@ TEST_CASE_METHOD(RocksDBTest, "Can write to two specific 
columns at once", "[roc
 
 TEST_CASE_METHOD(RocksDBTest, "Can write to the default and a specific column 
at once", "[rocksDBTest5]") {
   {
-    auto db1 = minifi::internal::RocksDatabase::create(new_db_opts, {}, 
"minifidb://" + db_dir + "/column_one");
+    auto db1 = minifi::internal::RocksDatabase::create(new_db_opts, {}, 
"minifidb://" + db_dir + "/column_one", {});
     auto opendb1 = db1->open();
     REQUIRE(opendb1);
     opendb1->Put(rocksdb::WriteOptions{}, "fruit", "apple");
-    auto db2 = minifi::internal::RocksDatabase::create(new_db_opts, {}, 
db_dir);
+    auto db2 = minifi::internal::RocksDatabase::create(new_db_opts, {}, 
db_dir, {});
     auto opendb2 = db2->open();
     REQUIRE(opendb2);
     opendb2->Put(rocksdb::WriteOptions{}, "animal", "penguin");
@@ -149,13 +149,13 @@ TEST_CASE_METHOD(RocksDBTest, "Can write to the default 
and a specific column at
 }
 
 TEST_CASE_METHOD(RocksDBTest, "Logged if the options are incompatible with an 
existing column family", "[rocksDBTest6]") {
-  auto db = minifi::internal::RocksDatabase::create(new_db_opts, {}, 
"minifidb://" + db_dir + "/column_one");
+  auto db = minifi::internal::RocksDatabase::create(new_db_opts, {}, 
"minifidb://" + db_dir + "/column_one", {});
   REQUIRE(db->open());
   // implicitly created the "default" column family, but with the default 
options
   auto cf_opts = [] (rocksdb::ColumnFamilyOptions& cf_opts) {
     cf_opts.merge_operator = 
std::make_shared<minifi::core::repository::StringAppender>();
   };
-  auto default_db = minifi::internal::RocksDatabase::create(new_db_opts, 
cf_opts, "minifidb://" + db_dir + "/default");
+  auto default_db = minifi::internal::RocksDatabase::create(new_db_opts, 
cf_opts, "minifidb://" + db_dir + "/default", {});
   REQUIRE(default_db->open());
   REQUIRE(utils::verifyLogLinePresenceInPollTime(
       std::chrono::seconds{1}, "Could not determine if we definitely need to 
reopen or we are definitely safe, requesting reopen"));
@@ -170,16 +170,16 @@ TEST_CASE_METHOD(RocksDBTest, "Error is logged if 
different DBOptions are used",
     db_opts.set(&rocksdb::DBOptions::create_if_missing, true);
     db_opts.set(&rocksdb::DBOptions::manual_wal_flush, true);
   };
-  auto col_1 = minifi::internal::RocksDatabase::create(db_opt_1, {}, 
"minifidb://" + db_dir + "/column_one");
+  auto col_1 = minifi::internal::RocksDatabase::create(db_opt_1, {}, 
"minifidb://" + db_dir + "/column_one", {});
   REQUIRE(col_1->open());
-  auto col_2 = minifi::internal::RocksDatabase::create(db_opt_2, {}, 
"minifidb://" + db_dir + "/column_two");
+  auto col_2 = minifi::internal::RocksDatabase::create(db_opt_2, {}, 
"minifidb://" + db_dir + "/column_two", {});
   REQUIRE_FALSE(col_2->open());
   REQUIRE(utils::verifyLogLinePresenceInPollTime(
       std::chrono::seconds{1}, "Conflicting database options requested for '" 
+ db_dir + "'"));
 }
 
 TEST_CASE_METHOD(RocksDBTest, "Sanity check: merge fails without 
merge_operator", "[rocksDBTest8]") {
-  auto db = minifi::internal::RocksDatabase::create(new_db_opts, {}, 
"minifidb://" + db_dir + "/col_one");
+  auto db = minifi::internal::RocksDatabase::create(new_db_opts, {}, 
"minifidb://" + db_dir + "/col_one", {});
   REQUIRE(db);
 
   auto opendb = db->open();
@@ -203,7 +203,7 @@ TEST_CASE_METHOD(RocksDBTest, "Column options are applied", 
"[rocksDBTest9]") {
   SECTION("Implicit default column") {
     db_uri = db_dir;
   }
-  auto db = minifi::internal::RocksDatabase::create(new_db_opts, cf_opts, 
db_uri);
+  auto db = minifi::internal::RocksDatabase::create(new_db_opts, cf_opts, 
db_uri, {});
   REQUIRE(db);
 
   auto opendb = db->open();
@@ -238,25 +238,25 @@ TEST_CASE_METHOD(RocksDBTest, "Error is logged if 
different encryption keys are
     << "encryption.key.two=" << 
"905D7B95EF44DC27C87FFBC4DFDE376DAE604D55DB2C5496DEEF5236362DE62E" << "\n";
 
   auto db_opt_1 = createEncrSetter(home_dir, "one", "encryption.key.one");
-  auto col_1 = minifi::internal::RocksDatabase::create(db_opt_1, {}, 
"minifidb://" + db_dir + "/column_one");
+  auto col_1 = minifi::internal::RocksDatabase::create(db_opt_1, {}, 
"minifidb://" + db_dir + "/column_one", {});
   REQUIRE(col_1->open());
 
   SECTION("Using the same encryption key is OK") {
     auto db_opt_2 = createEncrSetter(home_dir, "two", "encryption.key.one");
-    auto col_2 = minifi::internal::RocksDatabase::create(db_opt_2, {}, 
"minifidb://" + db_dir + "/column_two");
+    auto col_2 = minifi::internal::RocksDatabase::create(db_opt_2, {}, 
"minifidb://" + db_dir + "/column_two", {});
     REQUIRE(col_2->open());
   }
 
   SECTION("Using different encryption key") {
     auto db_opt_2 = createEncrSetter(home_dir, "two", "encryption.key.two");
-    auto col_2 = minifi::internal::RocksDatabase::create(db_opt_2, {}, 
"minifidb://" + db_dir + "/column_two");
+    auto col_2 = minifi::internal::RocksDatabase::create(db_opt_2, {}, 
"minifidb://" + db_dir + "/column_two", {});
     REQUIRE_FALSE(col_2->open());
     REQUIRE(utils::verifyLogLinePresenceInPollTime(
         std::chrono::seconds{1}, "Conflicting database options requested for 
'" + db_dir + "'"));
   }
 
   SECTION("Using no encryption key") {
-    auto col_2 = minifi::internal::RocksDatabase::create(withDefaultEnv, {}, 
"minifidb://" + db_dir + "/column_two");
+    auto col_2 = minifi::internal::RocksDatabase::create(withDefaultEnv, {}, 
"minifidb://" + db_dir + "/column_two", {});
     REQUIRE_FALSE(col_2->open());
     REQUIRE(utils::verifyLogLinePresenceInPollTime(
         std::chrono::seconds{1}, "Conflicting database options requested for 
'" + db_dir + "'"));
@@ -265,7 +265,7 @@ TEST_CASE_METHOD(RocksDBTest, "Error is logged if different 
encryption keys are
 
 TEST_CASE_METHOD(RocksDBTest, "RocksDb works correctly on changed (but 
compatible) ColumnFamilyOptions change", "[rocksDBTest11]") {
   {
-    auto col1 = minifi::internal::RocksDatabase::create(new_db_opts, {}, 
db_dir);
+    auto col1 = minifi::internal::RocksDatabase::create(new_db_opts, {}, 
db_dir, {});
     auto opendb = col1->open();
     REQUIRE(opendb);
     REQUIRE(opendb->Put({}, "fruit", "apple").ok());
@@ -275,7 +275,7 @@ TEST_CASE_METHOD(RocksDBTest, "RocksDb works correctly on 
changed (but compatibl
     cf_opts.OptimizeForPointLookup(4);
   };
 
-  auto col2 = minifi::internal::RocksDatabase::create(new_db_opts, cf_opts, 
db_dir);
+  auto col2 = minifi::internal::RocksDatabase::create(new_db_opts, cf_opts, 
db_dir, {});
   REQUIRE(col2);
   {
     auto opendb = col2->open();
diff --git a/extensions/rocksdb-repos/tests/RocksDBUtilsTests.cpp 
b/extensions/rocksdb-repos/tests/RocksDBUtilsTests.cpp
new file mode 100644
index 000000000..9b7f662f5
--- /dev/null
+++ b/extensions/rocksdb-repos/tests/RocksDBUtilsTests.cpp
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <unordered_map>
+#include <string>
+
+#include "TestBase.h"
+#include "Catch.h"
+#include "../database/RocksDbUtils.h"
+#include "properties/Configure.h"
+
+namespace org::apache::nifi::minifi::test {
+
+TEST_CASE("Get global RocksDB options to override", "[rocksdbutils]") {
+  std::unordered_map<std::string, std::string> expected_options {
+    {"keep_log_file_num", "2"},
+    {"table_cache_numshardbits", "456"},
+    {"create_if_missing", "false"}
+  };
+
+  auto config = std::make_shared<minifi::Configure>();
+  config->set("nifi.c2.rest.url", "http://localhost:8080";);
+  config->set("nifi.global.rocksdb.options.table_cache_numshardbits", "456");
+  config->set("nifi.global.rocksdb.options.keep_log_file_num", "123");
+  config->set("nifi.flowfile.repository.rocksdb.options.keep_log_file_num", 
"2");
+  config->set("nifi.flowfile.repository.rocksdb.options.create_if_missing", 
"false");
+  config->set("nifi.c2.enable", "true");
+  auto result = 
org::apache::nifi::minifi::internal::getRocksDbOptionsToOverride(config, 
minifi::Configure::nifi_flowfile_repository_rocksdb_options);
+  REQUIRE(result == expected_options);
+}
+
+}  // namespace org::apache::nifi::minifi::test
diff --git a/libminifi/include/properties/Configuration.h 
b/libminifi/include/properties/Configuration.h
index 579adca34..e50bc3115 100644
--- a/libminifi/include/properties/Configuration.h
+++ b/libminifi/include/properties/Configuration.h
@@ -36,6 +36,11 @@ class Configuration : public Properties {
   Configuration() : Properties("MiNiFi configuration") {}
 
   static constexpr const char *nifi_volatile_repository_options = 
"nifi.volatile.repository.options.";
+  static constexpr const char *nifi_global_rocksdb_options = 
"nifi.global.rocksdb.options.";
+  static constexpr const char *nifi_flowfile_repository_rocksdb_options = 
"nifi.flowfile.repository.rocksdb.options.";
+  static constexpr const char *nifi_content_repository_rocksdb_options = 
"nifi.content.repository.rocksdb.options.";
+  static constexpr const char *nifi_provenance_repository_rocksdb_options = 
"nifi.provenance.repository.rocksdb.options.";
+  static constexpr const char *nifi_state_storage_rocksdb_options = 
"nifi.state.storage.rocksdb.options.";
 
   // nifi.flow.configuration.file
   static constexpr const char *nifi_default_directory = 
"nifi.default.directory";
diff --git a/libminifi/include/properties/Properties.h 
b/libminifi/include/properties/Properties.h
index e86ec4b5e..6d427d64f 100644
--- a/libminifi/include/properties/Properties.h
+++ b/libminifi/include/properties/Properties.h
@@ -139,7 +139,6 @@ class Properties {
 
   std::filesystem::path getFilePath() const;
 
- protected:
   std::map<std::string, std::string> getProperties() const;
 
  private:
diff --git a/thirdparty/rocksdb/dboptions_equality_operator.patch 
b/thirdparty/rocksdb/dboptions_equality_operator.patch
new file mode 100644
index 000000000..2efb91ea0
--- /dev/null
+++ b/thirdparty/rocksdb/dboptions_equality_operator.patch
@@ -0,0 +1,21 @@
+diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h
+index ae5ed2c26..0038c6bff 100644
+--- a/include/rocksdb/options.h
++++ b/include/rocksdb/options.h
+@@ -397,6 +397,7 @@ struct DbPath {
+ 
+   DbPath() : target_size(0) {}
+   DbPath(const std::string& p, uint64_t t) : path(p), target_size(t) {}
++  bool operator==(const DbPath& other) const = default;
+ };
+ 
+ extern const char* kHostnameForDbHostId;
+@@ -1008,6 +1009,8 @@ struct DBOptions {
+   // Create DBOptions from Options
+   explicit DBOptions(const Options& options);
+ 
++  bool operator==(const DBOptions& other) const = default;
++
+   void Dump(Logger* log) const;
+ 
+   // Allows OS to incrementally sync files to disk while they are being


Reply via email to