This is an automated email from the ASF dual-hosted git repository. martinzink pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 545236fd06b613f2d2b38f74ff9ee85df9190d59 Author: Gabor Gyimesi <gamezb...@gmail.com> AuthorDate: Tue Mar 12 13:51:58 2024 +0100 MINIFICPP-2298 Make RocksDB options configurable - Also set keep_log_file_num default value to 5 Closes #1731 Signed-off-by: Martin Zink <martinz...@apache.org> --- CONFIGURE.md | 22 +++++++++++ cmake/BundledRocksDB.cmake | 14 ++++--- .../rocksdb-repos/DatabaseContentRepository.cpp | 8 ++-- extensions/rocksdb-repos/FlowFileRepository.cpp | 7 ++-- extensions/rocksdb-repos/ProvenanceRepository.cpp | 7 ++-- .../controllers/RocksDbStateStorage.cpp | 7 ++-- .../rocksdb-repos/database/RocksDatabase.cpp | 12 +++--- extensions/rocksdb-repos/database/RocksDatabase.h | 5 ++- .../rocksdb-repos/database/RocksDbInstance.cpp | 29 ++++++++++---- .../rocksdb-repos/database/RocksDbInstance.h | 4 +- extensions/rocksdb-repos/database/RocksDbUtils.cpp | 25 ++++++++++++ extensions/rocksdb-repos/database/RocksDbUtils.h | 3 ++ .../tests/DBContentRepositoryTests.cpp | 2 +- .../rocksdb-repos/tests/RocksDBStreamTests.cpp | 2 +- extensions/rocksdb-repos/tests/RocksDBTests.cpp | 38 +++++++++--------- .../rocksdb-repos/tests/RocksDBUtilsTests.cpp | 46 ++++++++++++++++++++++ libminifi/include/properties/Configuration.h | 5 +++ libminifi/include/properties/Properties.h | 1 - .../rocksdb/dboptions_equality_operator.patch | 21 ++++++++++ 19 files changed, 198 insertions(+), 60 deletions(-) diff --git a/CONFIGURE.md b/CONFIGURE.md index 16469db58..2df8d695a 100644 --- a/CONFIGURE.md +++ b/CONFIGURE.md @@ -425,6 +425,28 @@ Rocksdb has an option to run compaction at specific intervals not just when need nifi.flowfile.repository.rocksdb.compaction.period=2 min nifi.database.content.repository.rocksdb.compaction.period=2 min +### Global RocksDB options + +There are a few options for RocksDB that are set for all used RocksDB databases in MiNiFi: + `create_if_missing` is set to `true` + `use_direct_io_for_flush_and_compaction` is set to `true` + `use_direct_reads` is set to `true` + `keep_log_file_num` is set to `5` + +Any RocksDB option can be set or overriden using the `nifi.global.rocksdb.options.` prefix in the minifi.properties file. + + # in minifi.properties + nifi.global.rocksdb.options.keep_log_file_num=7 + nifi.global.rocksdb.options.atomic_flush=true + +RocksDB options can also be overridden for a specific repository using the `nifi.<repository>.rocksdb.options.` prefix in the minifi.properties file. + + # in minifi.properties + nifi.flowfile.repository.rocksdb.options.keep_log_file_num=3 + nifi.content.repository.rocksdb.options.atomic_flush=true + nifi.provenance.repository.rocksdb.options.use_direct_reads=false + nifi.state.storage.rocksdb.options.use_direct_io_for_flush_and_compaction=false + #### Shared database It is also possible to use a single database to store multiple repositories with the `minifidb://` scheme. diff --git a/cmake/BundledRocksDB.cmake b/cmake/BundledRocksDB.cmake index f1dc063d3..557d44df0 100644 --- a/cmake/BundledRocksDB.cmake +++ b/cmake/BundledRocksDB.cmake @@ -27,9 +27,12 @@ function(use_bundled_rocksdb SOURCE_DIR BINARY_DIR) endif() # Patch to fix build issue on ARM7 architecture: https://github.com/facebook/rocksdb/issues/8609#issuecomment-1009572506 - set(PATCH_FILE "${SOURCE_DIR}/thirdparty/rocksdb/arm7.patch") - set(PC ${Bash_EXECUTABLE} -c "set -x && \ - (\"${Patch_EXECUTABLE}\" -p1 -R -s -f --dry-run -i \"${PATCH_FILE}\" || \"${Patch_EXECUTABLE}\" -p1 -N -i \"${PATCH_FILE}\")") + set(PATCH_FILE_1 "${SOURCE_DIR}/thirdparty/rocksdb/arm7.patch") + set(PATCH_FILE_2 "${SOURCE_DIR}/thirdparty/rocksdb/dboptions_equality_operator.patch") + set(PC ${Bash_EXECUTABLE} -c "set -x &&\ + (\"${Patch_EXECUTABLE}\" -p1 -R -s -f --dry-run -i \"${PATCH_FILE_1}\" || \"${Patch_EXECUTABLE}\" -p1 -N -i \"${PATCH_FILE_1}\") &&\ + (\"${Patch_EXECUTABLE}\" -p1 -R -s -f --dry-run -i \"${PATCH_FILE_2}\" || \"${Patch_EXECUTABLE}\" -p1 -N -i \"${PATCH_FILE_2}\") ") + # Define byproducts if (WIN32) set(BYPRODUCT "lib/rocksdb.lib") @@ -49,7 +52,6 @@ function(use_bundled_rocksdb SOURCE_DIR BINARY_DIR) -DUSE_RTTI=1 -DROCKSDB_BUILD_SHARED=OFF -DFAIL_ON_WARNINGS=OFF - -DCMAKE_CXX_STANDARD=17 # RocksDB fails to build in C++20 mode on GCC 11: https://godbolt.org/z/YeMcEzs8W ) if(PORTABLE) list(APPEND ROCKSDB_CMAKE_ARGS -DPORTABLE=ON) @@ -71,8 +73,8 @@ function(use_bundled_rocksdb SOURCE_DIR BINARY_DIR) # Build project ExternalProject_Add( rocksdb-external - URL "https://github.com/facebook/rocksdb/archive/refs/tags/v8.1.1.tar.gz" - URL_HASH "SHA256=9102704e169cfb53e7724a30750eeeb3e71307663852f01fa08d5a320e6155a8" + URL "https://github.com/facebook/rocksdb/archive/refs/tags/v8.10.2.tar.gz" + URL_HASH "SHA256=44b6ec2f4723a0d495762da245d4a59d38704e0d9d3d31c45af4014bee853256" SOURCE_DIR "${BINARY_DIR}/thirdparty/rocksdb-src" CMAKE_ARGS ${ROCKSDB_CMAKE_ARGS} PATCH_COMMAND ${PC} diff --git a/extensions/rocksdb-repos/DatabaseContentRepository.cpp b/extensions/rocksdb-repos/DatabaseContentRepository.cpp index 65781fef9..b1cd9859c 100644 --- a/extensions/rocksdb-repos/DatabaseContentRepository.cpp +++ b/extensions/rocksdb-repos/DatabaseContentRepository.cpp @@ -56,10 +56,7 @@ bool DatabaseContentRepository::initialize(const std::shared_ptr<minifi::Configu setCompactionPeriod(configuration); auto set_db_opts = [encrypted_env] (minifi::internal::Writable<rocksdb::DBOptions>& db_opts) { - db_opts.set(&rocksdb::DBOptions::create_if_missing, true); - db_opts.set(&rocksdb::DBOptions::use_direct_io_for_flush_and_compaction, true); - db_opts.set(&rocksdb::DBOptions::use_direct_reads, true); - db_opts.set(&rocksdb::DBOptions::error_if_exists, false); + minifi::internal::setCommonRocksDbOptions(db_opts); if (encrypted_env) { db_opts.set(&rocksdb::DBOptions::env, encrypted_env.get(), EncryptionEq{}); } else { @@ -74,7 +71,8 @@ bool DatabaseContentRepository::initialize(const std::shared_ptr<minifi::Configu cf_opts.compression = *compression_type; } }; - db_ = minifi::internal::RocksDatabase::create(set_db_opts, set_cf_opts, directory_); + db_ = minifi::internal::RocksDatabase::create(set_db_opts, set_cf_opts, directory_, + minifi::internal::getRocksDbOptionsToOverride(configuration, Configure::nifi_content_repository_rocksdb_options)); if (db_->open()) { logger_->log_debug("NiFi Content DB Repository database open {} success", directory_); is_valid_ = true; diff --git a/extensions/rocksdb-repos/FlowFileRepository.cpp b/extensions/rocksdb-repos/FlowFileRepository.cpp index 67f08cba5..ca81c795a 100644 --- a/extensions/rocksdb-repos/FlowFileRepository.cpp +++ b/extensions/rocksdb-repos/FlowFileRepository.cpp @@ -185,9 +185,7 @@ bool FlowFileRepository::initialize(const std::shared_ptr<Configure> &configure) logger_->log_info("Using {} FlowFileRepository", encrypted_env ? "encrypted" : "plaintext"); auto db_options = [encrypted_env] (minifi::internal::Writable<rocksdb::DBOptions>& options) { - options.set(&rocksdb::DBOptions::create_if_missing, true); - options.set(&rocksdb::DBOptions::use_direct_io_for_flush_and_compaction, true); - options.set(&rocksdb::DBOptions::use_direct_reads, true); + minifi::internal::setCommonRocksDbOptions(options); if (encrypted_env) { options.set(&rocksdb::DBOptions::env, encrypted_env.get(), EncryptionEq{}); } else { @@ -210,7 +208,8 @@ bool FlowFileRepository::initialize(const std::shared_ptr<Configure> &configure) cf_opts.compression = *compression_type; } }; - db_ = minifi::internal::RocksDatabase::create(db_options, cf_options, directory_); + db_ = minifi::internal::RocksDatabase::create(db_options, cf_options, directory_, + minifi::internal::getRocksDbOptionsToOverride(configure, Configure::nifi_flowfile_repository_rocksdb_options)); if (db_->open()) { logger_->log_debug("NiFi FlowFile Repository database open {} success", directory_); return true; diff --git a/extensions/rocksdb-repos/ProvenanceRepository.cpp b/extensions/rocksdb-repos/ProvenanceRepository.cpp index 42bf840fa..68f404fe3 100644 --- a/extensions/rocksdb-repos/ProvenanceRepository.cpp +++ b/extensions/rocksdb-repos/ProvenanceRepository.cpp @@ -40,9 +40,7 @@ bool ProvenanceRepository::initialize(const std::shared_ptr<org::apache::nifi::m logger_->log_debug("MiNiFi Provenance Max Storage Time: [{}]", max_partition_millis_); auto db_options = [] (minifi::internal::Writable<rocksdb::DBOptions>& db_opts) { - db_opts.set(&rocksdb::DBOptions::create_if_missing, true); - db_opts.set(&rocksdb::DBOptions::use_direct_io_for_flush_and_compaction, true); - db_opts.set(&rocksdb::DBOptions::use_direct_reads, true); + minifi::internal::setCommonRocksDbOptions(db_opts); }; // Rocksdb write buffers act as a log of database operation: grow till reaching the limit, serialized after @@ -60,7 +58,8 @@ bool ProvenanceRepository::initialize(const std::shared_ptr<org::apache::nifi::m } }; - db_ = minifi::internal::RocksDatabase::create(db_options, cf_options, directory_); + db_ = minifi::internal::RocksDatabase::create(db_options, cf_options, directory_, + minifi::internal::getRocksDbOptionsToOverride(config, Configure::nifi_provenance_repository_rocksdb_options)); if (db_->open()) { logger_->log_debug("MiNiFi Provenance Repository database open {} success", directory_); } else { diff --git a/extensions/rocksdb-repos/controllers/RocksDbStateStorage.cpp b/extensions/rocksdb-repos/controllers/RocksDbStateStorage.cpp index a07e1a451..965165ced 100644 --- a/extensions/rocksdb-repos/controllers/RocksDbStateStorage.cpp +++ b/extensions/rocksdb-repos/controllers/RocksDbStateStorage.cpp @@ -67,9 +67,7 @@ void RocksDbStateStorage::onEnable() { logger_->log_info("Using {} RocksDbStateStorage", encrypted_env ? "encrypted" : "plaintext"); auto set_db_opts = [encrypted_env] (internal::Writable<rocksdb::DBOptions>& db_opts) { - db_opts.set(&rocksdb::DBOptions::create_if_missing, true); - db_opts.set(&rocksdb::DBOptions::use_direct_io_for_flush_and_compaction, true); - db_opts.set(&rocksdb::DBOptions::use_direct_reads, true); + minifi::internal::setCommonRocksDbOptions(db_opts); if (encrypted_env) { db_opts.set(&rocksdb::DBOptions::env, encrypted_env.get(), core::repository::EncryptionEq{}); } else { @@ -81,7 +79,8 @@ void RocksDbStateStorage::onEnable() { cf_opts.write_buffer_size = 8ULL << 20U; cf_opts.min_write_buffer_number_to_merge = 1; }; - db_ = minifi::internal::RocksDatabase::create(set_db_opts, set_cf_opts, directory_); + db_ = minifi::internal::RocksDatabase::create(set_db_opts, set_cf_opts, directory_, + minifi::internal::getRocksDbOptionsToOverride(configuration_, Configure::nifi_state_storage_rocksdb_options)); if (db_->open()) { logger_->log_trace("Successfully opened RocksDB database at {}", directory_.c_str()); } else { diff --git a/extensions/rocksdb-repos/database/RocksDatabase.cpp b/extensions/rocksdb-repos/database/RocksDatabase.cpp index 78d31db18..c4a166a6b 100644 --- a/extensions/rocksdb-repos/database/RocksDatabase.cpp +++ b/extensions/rocksdb-repos/database/RocksDatabase.cpp @@ -29,7 +29,8 @@ namespace org::apache::nifi::minifi::internal { std::shared_ptr<core::logging::Logger> RocksDatabase::logger_ = core::logging::LoggerFactory<RocksDatabase>::getLogger(); -std::unique_ptr<RocksDatabase> RocksDatabase::create(const DBOptionsPatch& db_options_patch, const ColumnFamilyOptionsPatch& cf_options_patch, const std::string& uri, RocksDbMode mode) { +std::unique_ptr<RocksDatabase> RocksDatabase::create(const DBOptionsPatch& db_options_patch, const ColumnFamilyOptionsPatch& cf_options_patch, const std::string& uri, + const std::unordered_map<std::string, std::string>& db_config_override, RocksDbMode mode) { const std::string scheme = "minifidb://"; logger_->log_info("Acquiring database handle '{}'", uri); @@ -56,7 +57,7 @@ std::unique_ptr<RocksDatabase> RocksDatabase::create(const DBOptionsPatch& db_op if (mode == RocksDbMode::ReadOnly) { // no need to cache anything with read-only databases - return std::make_unique<RocksDatabase>(std::make_shared<RocksDbInstance>(db_path, mode), db_column, db_options_patch, cf_options_patch); + return std::make_unique<RocksDatabase>(std::make_shared<RocksDbInstance>(db_path, mode), db_column, db_options_patch, cf_options_patch, db_config_override); } static std::mutex mtx; @@ -74,12 +75,13 @@ std::unique_ptr<RocksDatabase> RocksDatabase::create(const DBOptionsPatch& db_op logger_->log_info("Using previously opened rocksdb instance '{}'", db_path); } } - return std::make_unique<RocksDatabase>(instance, db_column, db_options_patch, cf_options_patch); + return std::make_unique<RocksDatabase>(instance, db_column, db_options_patch, cf_options_patch, db_config_override); } -RocksDatabase::RocksDatabase(std::shared_ptr<RocksDbInstance> db, std::string column, const DBOptionsPatch& db_options_patch, const ColumnFamilyOptionsPatch& cf_options_patch) +RocksDatabase::RocksDatabase(std::shared_ptr<RocksDbInstance> db, std::string column, const DBOptionsPatch& db_options_patch, const ColumnFamilyOptionsPatch& cf_options_patch, + const std::unordered_map<std::string, std::string>& db_config_override) : column_(std::move(column)), db_(std::move(db)) { - db_->registerColumnConfig(column_, db_options_patch, cf_options_patch); + db_->registerColumnConfig(column_, db_options_patch, cf_options_patch, db_config_override); } RocksDatabase::~RocksDatabase() { diff --git a/extensions/rocksdb-repos/database/RocksDatabase.h b/extensions/rocksdb-repos/database/RocksDatabase.h index acaa3a6d4..7bcb229e9 100644 --- a/extensions/rocksdb-repos/database/RocksDatabase.h +++ b/extensions/rocksdb-repos/database/RocksDatabase.h @@ -21,6 +21,7 @@ #include <memory> #include <optional> #include <string> +#include <unordered_map> #include "rocksdb/db.h" #include "logging/Logger.h" @@ -40,9 +41,11 @@ class RocksDatabase { static std::unique_ptr<RocksDatabase> create(const DBOptionsPatch& db_options_patch, const ColumnFamilyOptionsPatch& cf_options_patch, const std::string& uri, + const std::unordered_map<std::string, std::string>& db_config_override, RocksDbMode mode = RocksDbMode::ReadWrite); - RocksDatabase(std::shared_ptr<RocksDbInstance> db, std::string column, const DBOptionsPatch& db_options_patch, const ColumnFamilyOptionsPatch& cf_options_patch); + RocksDatabase(std::shared_ptr<RocksDbInstance> db, std::string column, const DBOptionsPatch& db_options_patch, const ColumnFamilyOptionsPatch& cf_options_patch, + const std::unordered_map<std::string, std::string>& db_config_override); RocksDatabase(const RocksDatabase&) = delete; RocksDatabase(RocksDatabase&&) = delete; diff --git a/extensions/rocksdb-repos/database/RocksDbInstance.cpp b/extensions/rocksdb-repos/database/RocksDbInstance.cpp index 0bab95c1d..4c23e0ec8 100644 --- a/extensions/rocksdb-repos/database/RocksDbInstance.cpp +++ b/extensions/rocksdb-repos/database/RocksDbInstance.cpp @@ -42,9 +42,11 @@ void RocksDbInstance::invalidate(const std::lock_guard<std::mutex>&) { impl_.reset(); } -void RocksDbInstance::registerColumnConfig(const std::string& column, const DBOptionsPatch& db_options_patch, const ColumnFamilyOptionsPatch& cf_options_patch) { +void RocksDbInstance::registerColumnConfig(const std::string& column, const DBOptionsPatch& db_options_patch, const ColumnFamilyOptionsPatch& cf_options_patch, + const std::unordered_map<std::string, std::string>& db_config_override) { std::lock_guard<std::mutex> db_guard{mtx_}; logger_->log_trace("Registering column '{}' in database '{}'", column, db_name_); + db_config_override_ = db_config_override; auto [_, inserted] = column_configs_.insert({column, ColumnConfig{.dbo_patch = db_options_patch, .cfo_patch = cf_options_patch}}); if (!inserted) { throw std::runtime_error("Configuration is already registered for column '" + column + "'"); @@ -60,8 +62,14 @@ void RocksDbInstance::registerColumnConfig(const std::string& column, const DBOp Writable<rocksdb::DBOptions> db_opts_writer(db_opts_copy); if (db_options_patch) { db_options_patch(db_opts_writer); - if (db_opts_writer.isModified()) { - logger_->log_trace("Requested a difference DBOptions than the one that was used to open the database"); + rocksdb::ConfigOptions conf_options; + conf_options.sanity_level = rocksdb::ConfigOptions::kSanityLevelLooselyCompatible; + auto db_config_override_status = rocksdb::GetDBOptionsFromMap(conf_options, db_opts_copy, db_config_override_, &db_opts_copy); + if (!db_config_override_status.ok()) { + throw std::runtime_error("Failed to override RocksDB options from minifi.properties file: " + db_config_override_status.ToString()); + } + if (db_opts_copy != db_options_) { + logger_->log_trace("Requested a different DBOptions than the one that was used to open the database"); return true; } } @@ -130,7 +138,7 @@ std::optional<OpenRocksDb> RocksDbInstance::open(const std::string& column) { } db_options_ = rocksdb::DBOptions{}; std::vector<rocksdb::ColumnFamilyDescriptor> cf_descriptors; - rocksdb::Status option_status = rocksdb::LoadLatestOptions(conf_options, db_name_, &db_options_, &cf_descriptors); + rocksdb::Status latest_option_status = rocksdb::LoadLatestOptions(conf_options, db_name_, &db_options_, &cf_descriptors); { // apply the database options patchers Writable<rocksdb::DBOptions> db_options_writer(db_options_); @@ -148,14 +156,19 @@ std::optional<OpenRocksDb> RocksDbInstance::open(const std::string& column) { } } } - if (option_status.ok()) { + auto db_config_override_status = rocksdb::GetDBOptionsFromMap(conf_options, db_options_, db_config_override_, &db_options_); + if (!db_config_override_status.ok()) { + logger_->log_error("Failed to override RocksDB options from minifi.properties file: {}", db_config_override_status.ToString()); + return std::nullopt; + } + if (latest_option_status.ok()) { logger_->log_trace("Found existing database '{}', checking compatibility", db_name_); rocksdb::Status compat_status = rocksdb::CheckOptionsCompatibility(conf_options, db_name_, db_options_, cf_descriptors); if (!compat_status.ok()) { logger_->log_error("Incompatible database options: {}", compat_status.ToString()); return std::nullopt; } - } else if (option_status.IsNotFound()) { + } else if (latest_option_status.IsNotFound()) { logger_->log_trace("Database at '{}' not found, creating", db_name_); rocksdb::ColumnFamilyOptions default_cf_options; if (auto it = column_configs_.find("default"); it != column_configs_.end()) { @@ -164,8 +177,8 @@ std::optional<OpenRocksDb> RocksDbInstance::open(const std::string& column) { } } cf_descriptors.emplace_back("default", default_cf_options); - } else if (!option_status.ok()) { - logger_->log_error("Couldn't query database '{}' for options: '{}'", db_name_, option_status.ToString()); + } else if (!latest_option_status.ok()) { + logger_->log_error("Couldn't query database '{}' for options: '{}'", db_name_, latest_option_status.ToString()); return std::nullopt; } std::vector<rocksdb::ColumnFamilyHandle*> column_handles; diff --git a/extensions/rocksdb-repos/database/RocksDbInstance.h b/extensions/rocksdb-repos/database/RocksDbInstance.h index 182324958..b4796c4eb 100644 --- a/extensions/rocksdb-repos/database/RocksDbInstance.h +++ b/extensions/rocksdb-repos/database/RocksDbInstance.h @@ -61,11 +61,13 @@ class RocksDbInstance { void invalidate(const std::lock_guard<std::mutex>&); - void registerColumnConfig(const std::string& column, const DBOptionsPatch& db_options_patch, const ColumnFamilyOptionsPatch& cf_options_patch); + void registerColumnConfig(const std::string& column, const DBOptionsPatch& db_options_patch, const ColumnFamilyOptionsPatch& cf_options_patch, + const std::unordered_map<std::string, std::string>& db_config_override); void unregisterColumnConfig(const std::string& column); rocksdb::DBOptions db_options_; const std::string db_name_; + std::unordered_map<std::string, std::string> db_config_override_; const RocksDbMode mode_; std::mutex mtx_; diff --git a/extensions/rocksdb-repos/database/RocksDbUtils.cpp b/extensions/rocksdb-repos/database/RocksDbUtils.cpp index d3834dfef..223cbb27e 100644 --- a/extensions/rocksdb-repos/database/RocksDbUtils.cpp +++ b/extensions/rocksdb-repos/database/RocksDbUtils.cpp @@ -51,4 +51,29 @@ std::optional<rocksdb::CompressionType> readConfiguredCompressionType(const std: #endif } + +void setCommonRocksDbOptions(Writable<rocksdb::DBOptions>& db_opts) { + db_opts.set(&rocksdb::DBOptions::create_if_missing, true); + db_opts.set(&rocksdb::DBOptions::use_direct_io_for_flush_and_compaction, true); + db_opts.set(&rocksdb::DBOptions::use_direct_reads, true); + db_opts.set(&rocksdb::DBOptions::keep_log_file_num, 5); +} + +std::unordered_map<std::string, std::string> getRocksDbOptionsToOverride(const std::shared_ptr<Configure> &configuration, std::string_view custom_db_prefix) { + std::unordered_map<std::string, std::string> options; + const auto addOverrideOptions = [&configuration, &options](std::string_view prefix) { + if (prefix.empty()) { + return; + } + for (const auto& [key, value] : configuration->getProperties()) { + if (key.starts_with(prefix)) { + options[key.substr(prefix.size())] = value; + } + } + }; + addOverrideOptions(Configuration::nifi_global_rocksdb_options); + addOverrideOptions(custom_db_prefix); + return options; +} + } // namespace org::apache::nifi::minifi::internal diff --git a/extensions/rocksdb-repos/database/RocksDbUtils.h b/extensions/rocksdb-repos/database/RocksDbUtils.h index b5748c117..0deac2c85 100644 --- a/extensions/rocksdb-repos/database/RocksDbUtils.h +++ b/extensions/rocksdb-repos/database/RocksDbUtils.h @@ -22,6 +22,7 @@ #include <optional> #include <memory> #include <string> +#include <unordered_map> #include "rocksdb/db.h" #include "rocksdb/options.h" @@ -71,5 +72,7 @@ using DBOptionsPatch = std::function<void(Writable<rocksdb::DBOptions>&)>; using ColumnFamilyOptionsPatch = std::function<void(rocksdb::ColumnFamilyOptions&)>; std::optional<rocksdb::CompressionType> readConfiguredCompressionType(const std::shared_ptr<Configure> &configuration, const std::string& config_key); +void setCommonRocksDbOptions(Writable<rocksdb::DBOptions>& db_opts); +std::unordered_map<std::string, std::string> getRocksDbOptionsToOverride(const std::shared_ptr<Configure> &configuration, std::string_view custom_db_prefix); } // namespace org::apache::nifi::minifi::internal diff --git a/extensions/rocksdb-repos/tests/DBContentRepositoryTests.cpp b/extensions/rocksdb-repos/tests/DBContentRepositoryTests.cpp index d46601930..5f9eb7b25 100644 --- a/extensions/rocksdb-repos/tests/DBContentRepositoryTests.cpp +++ b/extensions/rocksdb-repos/tests/DBContentRepositoryTests.cpp @@ -330,7 +330,7 @@ TEST_CASE("ProcessSession::read can read zero length flowfiles without crash (Ro } size_t getDbSize(const std::filesystem::path& dir) { - auto db = minifi::internal::RocksDatabase::create({}, {}, dir.string()); + auto db = minifi::internal::RocksDatabase::create({}, {}, dir.string(), {}); auto opendb = db->open(); REQUIRE(opendb); diff --git a/extensions/rocksdb-repos/tests/RocksDBStreamTests.cpp b/extensions/rocksdb-repos/tests/RocksDBStreamTests.cpp index 4b8e2563d..e9433e576 100644 --- a/extensions/rocksdb-repos/tests/RocksDBStreamTests.cpp +++ b/extensions/rocksdb-repos/tests/RocksDBStreamTests.cpp @@ -34,7 +34,7 @@ class RocksDBStreamTest : TestController { auto set_cf_opts = [] (rocksdb::ColumnFamilyOptions& cf_opts) { cf_opts.merge_operator = std::make_shared<core::repository::StringAppender>(); }; - db = minifi::internal::RocksDatabase::create(set_db_opts, set_cf_opts, dbPath); + db = minifi::internal::RocksDatabase::create(set_db_opts, set_cf_opts, dbPath, {}); REQUIRE(db->open()); } diff --git a/extensions/rocksdb-repos/tests/RocksDBTests.cpp b/extensions/rocksdb-repos/tests/RocksDBTests.cpp index 51cea8e7c..8a923a051 100644 --- a/extensions/rocksdb-repos/tests/RocksDBTests.cpp +++ b/extensions/rocksdb-repos/tests/RocksDBTests.cpp @@ -70,7 +70,7 @@ void new_db_opts(minifi::internal::Writable<rocksdb::DBOptions>& db_opts) { } TEST_CASE_METHOD(RocksDBTest, "Malformed database uri - Missing column name", "[rocksDBTest1]") { - auto db = minifi::internal::RocksDatabase::create({}, {}, "minifidb://malformed"); + auto db = minifi::internal::RocksDatabase::create({}, {}, "minifidb://malformed", {}); REQUIRE(!db); REQUIRE(utils::verifyLogLinePresenceInPollTime( std::chrono::seconds{1}, "Couldn't detect the column name in 'minifidb://malformed'")); @@ -78,7 +78,7 @@ TEST_CASE_METHOD(RocksDBTest, "Malformed database uri - Missing column name", "[ TEST_CASE_METHOD(RocksDBTest, "Can write to default column", "[rocksDBTest2]") { { - auto db = minifi::internal::RocksDatabase::create(new_db_opts, {}, db_dir); + auto db = minifi::internal::RocksDatabase::create(new_db_opts, {}, db_dir, {}); auto opendb = db->open(); opendb->Put(rocksdb::WriteOptions{}, "fruit", "apple"); } @@ -92,7 +92,7 @@ TEST_CASE_METHOD(RocksDBTest, "Can write to default column", "[rocksDBTest2]") { TEST_CASE_METHOD(RocksDBTest, "Can write to specific column using the rocksdb uri scheme", "[rocksDBTest3]") { { - auto db = minifi::internal::RocksDatabase::create(new_db_opts, {}, "minifidb://" + db_dir + "/column_one"); + auto db = minifi::internal::RocksDatabase::create(new_db_opts, {}, "minifidb://" + db_dir + "/column_one", {}); auto opendb = db->open(); opendb->Put(rocksdb::WriteOptions{}, "fruit", "apple"); } @@ -106,11 +106,11 @@ TEST_CASE_METHOD(RocksDBTest, "Can write to specific column using the rocksdb ur TEST_CASE_METHOD(RocksDBTest, "Can write to two specific columns at once", "[rocksDBTest4]") { { - auto db1 = minifi::internal::RocksDatabase::create(new_db_opts, {}, "minifidb://" + db_dir + "/column_one"); + auto db1 = minifi::internal::RocksDatabase::create(new_db_opts, {}, "minifidb://" + db_dir + "/column_one", {}); auto opendb1 = db1->open(); REQUIRE(opendb1); opendb1->Put(rocksdb::WriteOptions{}, "fruit", "apple"); - auto db2 = minifi::internal::RocksDatabase::create(new_db_opts, {}, "minifidb://" + db_dir + "/column_two"); + auto db2 = minifi::internal::RocksDatabase::create(new_db_opts, {}, "minifidb://" + db_dir + "/column_two", {}); auto opendb2 = db2->open(); REQUIRE(opendb2); opendb2->Put(rocksdb::WriteOptions{}, "animal", "penguin"); @@ -128,11 +128,11 @@ TEST_CASE_METHOD(RocksDBTest, "Can write to two specific columns at once", "[roc TEST_CASE_METHOD(RocksDBTest, "Can write to the default and a specific column at once", "[rocksDBTest5]") { { - auto db1 = minifi::internal::RocksDatabase::create(new_db_opts, {}, "minifidb://" + db_dir + "/column_one"); + auto db1 = minifi::internal::RocksDatabase::create(new_db_opts, {}, "minifidb://" + db_dir + "/column_one", {}); auto opendb1 = db1->open(); REQUIRE(opendb1); opendb1->Put(rocksdb::WriteOptions{}, "fruit", "apple"); - auto db2 = minifi::internal::RocksDatabase::create(new_db_opts, {}, db_dir); + auto db2 = minifi::internal::RocksDatabase::create(new_db_opts, {}, db_dir, {}); auto opendb2 = db2->open(); REQUIRE(opendb2); opendb2->Put(rocksdb::WriteOptions{}, "animal", "penguin"); @@ -149,13 +149,13 @@ TEST_CASE_METHOD(RocksDBTest, "Can write to the default and a specific column at } TEST_CASE_METHOD(RocksDBTest, "Logged if the options are incompatible with an existing column family", "[rocksDBTest6]") { - auto db = minifi::internal::RocksDatabase::create(new_db_opts, {}, "minifidb://" + db_dir + "/column_one"); + auto db = minifi::internal::RocksDatabase::create(new_db_opts, {}, "minifidb://" + db_dir + "/column_one", {}); REQUIRE(db->open()); // implicitly created the "default" column family, but with the default options auto cf_opts = [] (rocksdb::ColumnFamilyOptions& cf_opts) { cf_opts.merge_operator = std::make_shared<minifi::core::repository::StringAppender>(); }; - auto default_db = minifi::internal::RocksDatabase::create(new_db_opts, cf_opts, "minifidb://" + db_dir + "/default"); + auto default_db = minifi::internal::RocksDatabase::create(new_db_opts, cf_opts, "minifidb://" + db_dir + "/default", {}); REQUIRE(default_db->open()); REQUIRE(utils::verifyLogLinePresenceInPollTime( std::chrono::seconds{1}, "Could not determine if we definitely need to reopen or we are definitely safe, requesting reopen")); @@ -170,16 +170,16 @@ TEST_CASE_METHOD(RocksDBTest, "Error is logged if different DBOptions are used", db_opts.set(&rocksdb::DBOptions::create_if_missing, true); db_opts.set(&rocksdb::DBOptions::manual_wal_flush, true); }; - auto col_1 = minifi::internal::RocksDatabase::create(db_opt_1, {}, "minifidb://" + db_dir + "/column_one"); + auto col_1 = minifi::internal::RocksDatabase::create(db_opt_1, {}, "minifidb://" + db_dir + "/column_one", {}); REQUIRE(col_1->open()); - auto col_2 = minifi::internal::RocksDatabase::create(db_opt_2, {}, "minifidb://" + db_dir + "/column_two"); + auto col_2 = minifi::internal::RocksDatabase::create(db_opt_2, {}, "minifidb://" + db_dir + "/column_two", {}); REQUIRE_FALSE(col_2->open()); REQUIRE(utils::verifyLogLinePresenceInPollTime( std::chrono::seconds{1}, "Conflicting database options requested for '" + db_dir + "'")); } TEST_CASE_METHOD(RocksDBTest, "Sanity check: merge fails without merge_operator", "[rocksDBTest8]") { - auto db = minifi::internal::RocksDatabase::create(new_db_opts, {}, "minifidb://" + db_dir + "/col_one"); + auto db = minifi::internal::RocksDatabase::create(new_db_opts, {}, "minifidb://" + db_dir + "/col_one", {}); REQUIRE(db); auto opendb = db->open(); @@ -203,7 +203,7 @@ TEST_CASE_METHOD(RocksDBTest, "Column options are applied", "[rocksDBTest9]") { SECTION("Implicit default column") { db_uri = db_dir; } - auto db = minifi::internal::RocksDatabase::create(new_db_opts, cf_opts, db_uri); + auto db = minifi::internal::RocksDatabase::create(new_db_opts, cf_opts, db_uri, {}); REQUIRE(db); auto opendb = db->open(); @@ -238,25 +238,25 @@ TEST_CASE_METHOD(RocksDBTest, "Error is logged if different encryption keys are << "encryption.key.two=" << "905D7B95EF44DC27C87FFBC4DFDE376DAE604D55DB2C5496DEEF5236362DE62E" << "\n"; auto db_opt_1 = createEncrSetter(home_dir, "one", "encryption.key.one"); - auto col_1 = minifi::internal::RocksDatabase::create(db_opt_1, {}, "minifidb://" + db_dir + "/column_one"); + auto col_1 = minifi::internal::RocksDatabase::create(db_opt_1, {}, "minifidb://" + db_dir + "/column_one", {}); REQUIRE(col_1->open()); SECTION("Using the same encryption key is OK") { auto db_opt_2 = createEncrSetter(home_dir, "two", "encryption.key.one"); - auto col_2 = minifi::internal::RocksDatabase::create(db_opt_2, {}, "minifidb://" + db_dir + "/column_two"); + auto col_2 = minifi::internal::RocksDatabase::create(db_opt_2, {}, "minifidb://" + db_dir + "/column_two", {}); REQUIRE(col_2->open()); } SECTION("Using different encryption key") { auto db_opt_2 = createEncrSetter(home_dir, "two", "encryption.key.two"); - auto col_2 = minifi::internal::RocksDatabase::create(db_opt_2, {}, "minifidb://" + db_dir + "/column_two"); + auto col_2 = minifi::internal::RocksDatabase::create(db_opt_2, {}, "minifidb://" + db_dir + "/column_two", {}); REQUIRE_FALSE(col_2->open()); REQUIRE(utils::verifyLogLinePresenceInPollTime( std::chrono::seconds{1}, "Conflicting database options requested for '" + db_dir + "'")); } SECTION("Using no encryption key") { - auto col_2 = minifi::internal::RocksDatabase::create(withDefaultEnv, {}, "minifidb://" + db_dir + "/column_two"); + auto col_2 = minifi::internal::RocksDatabase::create(withDefaultEnv, {}, "minifidb://" + db_dir + "/column_two", {}); REQUIRE_FALSE(col_2->open()); REQUIRE(utils::verifyLogLinePresenceInPollTime( std::chrono::seconds{1}, "Conflicting database options requested for '" + db_dir + "'")); @@ -265,7 +265,7 @@ TEST_CASE_METHOD(RocksDBTest, "Error is logged if different encryption keys are TEST_CASE_METHOD(RocksDBTest, "RocksDb works correctly on changed (but compatible) ColumnFamilyOptions change", "[rocksDBTest11]") { { - auto col1 = minifi::internal::RocksDatabase::create(new_db_opts, {}, db_dir); + auto col1 = minifi::internal::RocksDatabase::create(new_db_opts, {}, db_dir, {}); auto opendb = col1->open(); REQUIRE(opendb); REQUIRE(opendb->Put({}, "fruit", "apple").ok()); @@ -275,7 +275,7 @@ TEST_CASE_METHOD(RocksDBTest, "RocksDb works correctly on changed (but compatibl cf_opts.OptimizeForPointLookup(4); }; - auto col2 = minifi::internal::RocksDatabase::create(new_db_opts, cf_opts, db_dir); + auto col2 = minifi::internal::RocksDatabase::create(new_db_opts, cf_opts, db_dir, {}); REQUIRE(col2); { auto opendb = col2->open(); diff --git a/extensions/rocksdb-repos/tests/RocksDBUtilsTests.cpp b/extensions/rocksdb-repos/tests/RocksDBUtilsTests.cpp new file mode 100644 index 000000000..9b7f662f5 --- /dev/null +++ b/extensions/rocksdb-repos/tests/RocksDBUtilsTests.cpp @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <unordered_map> +#include <string> + +#include "TestBase.h" +#include "Catch.h" +#include "../database/RocksDbUtils.h" +#include "properties/Configure.h" + +namespace org::apache::nifi::minifi::test { + +TEST_CASE("Get global RocksDB options to override", "[rocksdbutils]") { + std::unordered_map<std::string, std::string> expected_options { + {"keep_log_file_num", "2"}, + {"table_cache_numshardbits", "456"}, + {"create_if_missing", "false"} + }; + + auto config = std::make_shared<minifi::Configure>(); + config->set("nifi.c2.rest.url", "http://localhost:8080"); + config->set("nifi.global.rocksdb.options.table_cache_numshardbits", "456"); + config->set("nifi.global.rocksdb.options.keep_log_file_num", "123"); + config->set("nifi.flowfile.repository.rocksdb.options.keep_log_file_num", "2"); + config->set("nifi.flowfile.repository.rocksdb.options.create_if_missing", "false"); + config->set("nifi.c2.enable", "true"); + auto result = org::apache::nifi::minifi::internal::getRocksDbOptionsToOverride(config, minifi::Configure::nifi_flowfile_repository_rocksdb_options); + REQUIRE(result == expected_options); +} + +} // namespace org::apache::nifi::minifi::test diff --git a/libminifi/include/properties/Configuration.h b/libminifi/include/properties/Configuration.h index 579adca34..e50bc3115 100644 --- a/libminifi/include/properties/Configuration.h +++ b/libminifi/include/properties/Configuration.h @@ -36,6 +36,11 @@ class Configuration : public Properties { Configuration() : Properties("MiNiFi configuration") {} static constexpr const char *nifi_volatile_repository_options = "nifi.volatile.repository.options."; + static constexpr const char *nifi_global_rocksdb_options = "nifi.global.rocksdb.options."; + static constexpr const char *nifi_flowfile_repository_rocksdb_options = "nifi.flowfile.repository.rocksdb.options."; + static constexpr const char *nifi_content_repository_rocksdb_options = "nifi.content.repository.rocksdb.options."; + static constexpr const char *nifi_provenance_repository_rocksdb_options = "nifi.provenance.repository.rocksdb.options."; + static constexpr const char *nifi_state_storage_rocksdb_options = "nifi.state.storage.rocksdb.options."; // nifi.flow.configuration.file static constexpr const char *nifi_default_directory = "nifi.default.directory"; diff --git a/libminifi/include/properties/Properties.h b/libminifi/include/properties/Properties.h index e86ec4b5e..6d427d64f 100644 --- a/libminifi/include/properties/Properties.h +++ b/libminifi/include/properties/Properties.h @@ -139,7 +139,6 @@ class Properties { std::filesystem::path getFilePath() const; - protected: std::map<std::string, std::string> getProperties() const; private: diff --git a/thirdparty/rocksdb/dboptions_equality_operator.patch b/thirdparty/rocksdb/dboptions_equality_operator.patch new file mode 100644 index 000000000..2efb91ea0 --- /dev/null +++ b/thirdparty/rocksdb/dboptions_equality_operator.patch @@ -0,0 +1,21 @@ +diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h +index ae5ed2c26..0038c6bff 100644 +--- a/include/rocksdb/options.h ++++ b/include/rocksdb/options.h +@@ -397,6 +397,7 @@ struct DbPath { + + DbPath() : target_size(0) {} + DbPath(const std::string& p, uint64_t t) : path(p), target_size(t) {} ++ bool operator==(const DbPath& other) const = default; + }; + + extern const char* kHostnameForDbHostId; +@@ -1008,6 +1009,8 @@ struct DBOptions { + // Create DBOptions from Options + explicit DBOptions(const Options& options); + ++ bool operator==(const DBOptions& other) const = default; ++ + void Dump(Logger* log) const; + + // Allows OS to incrementally sync files to disk while they are being