[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #877: MINIFICPP-1339 - Reopen rocksdb database when space becomes available on the disk
adamdebreceni commented on a change in pull request #877: URL: https://github.com/apache/nifi-minifi-cpp/pull/877#discussion_r477068493 ## File path: extensions/rocksdb-repos/FlowFileRepository.cpp ## @@ -91,14 +95,18 @@ void FlowFileRepository::flush() { } void FlowFileRepository::printStats() { + auto opendb = db_->open(); + if (!opendb) { +return; Review comment: logging error on failed `open` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #877: MINIFICPP-1339 - Reopen rocksdb database when space becomes available on the disk
adamdebreceni commented on a change in pull request #877: URL: https://github.com/apache/nifi-minifi-cpp/pull/877#discussion_r476475343 ## File path: extensions/rocksdb-repos/FlowFileRepository.cpp ## @@ -194,22 +201,27 @@ bool FlowFileRepository::ExecuteWithRetry(std::function opera * Returns True if there is data to interrogate. * @return true if our db has data stored. */ -bool FlowFileRepository::need_checkpoint(){ - std::unique_ptr it = std::unique_ptr(db_->NewIterator(rocksdb::ReadOptions())); +bool FlowFileRepository::need_checkpoint(minifi::internal::OpenRocksDB& opendb){ + auto it = opendb.NewIterator(rocksdb::ReadOptions()); for (it->SeekToFirst(); it->Valid(); it->Next()) { Review comment: great question, this whole checkpointing needs to be revisited because it is not really fault tolerant, e.g. it starts reading from the live db if it couldn't open the checkpoint, there is no retry with the checkpoint etc., for now I'll replace this magical loop This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #877: MINIFICPP-1339 - Reopen rocksdb database when space becomes available on the disk
adamdebreceni commented on a change in pull request #877: URL: https://github.com/apache/nifi-minifi-cpp/pull/877#discussion_r476472773 ## File path: extensions/rocksdb-repos/FlowFileRepository.cpp ## @@ -37,6 +37,10 @@ namespace core { namespace repository { void FlowFileRepository::flush() { + auto opendb = db_->open(); + if (!opendb) { +return; Review comment: an unsuccessful `open` now logs an error This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #877: MINIFICPP-1339 - Reopen rocksdb database when space becomes available on the disk
adamdebreceni commented on a change in pull request #877: URL: https://github.com/apache/nifi-minifi-cpp/pull/877#discussion_r476239186 ## File path: extensions/rocksdb-repos/RocksDatabase.cpp ## @@ -0,0 +1,138 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "RocksDatabase.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace internal { + +OpenRocksDB::OpenRocksDB(gsl::not_null db, gsl::not_null> impl) : db_(std::move(db)), impl_(std::move(impl)) {} + +rocksdb::Status OpenRocksDB::Put(const rocksdb::WriteOptions& options, const rocksdb::Slice& key, const rocksdb::Slice& value) { + rocksdb::Status result = impl_->Put(options, key, value); + if (result == rocksdb::Status::NoSpace()) { +db_->invalidate(); + } + return result; +} + +rocksdb::Status OpenRocksDB::Get(const rocksdb::ReadOptions& options, const rocksdb::Slice& key, std::string* value) { + rocksdb::Status result = impl_->Get(options, key, value); + if (result == rocksdb::Status::NoSpace()) { +db_->invalidate(); + } + return result; +} + +std::vector OpenRocksDB::MultiGet(const rocksdb::ReadOptions& options, const std::vector& keys, std::vector* values) { + std::vector results = impl_->MultiGet(options, keys, values); + for (const auto& result : results) { +if (result == rocksdb::Status::NoSpace()) { + db_->invalidate(); +} + } + return results; +} + +rocksdb::Status OpenRocksDB::Write(const rocksdb::WriteOptions& options, rocksdb::WriteBatch* updates) { + rocksdb::Status result = impl_->Write(options, updates); + if (result == rocksdb::Status::NoSpace()) { +db_->invalidate(); + } + return result; +} + +rocksdb::Status OpenRocksDB::Delete(const rocksdb::WriteOptions& options, const rocksdb::Slice& key) { + rocksdb::Status result = impl_->Delete(options, key); + if (result == rocksdb::Status::NoSpace()) { +db_->invalidate(); + } + return result; +} + +rocksdb::Status OpenRocksDB::Merge(const rocksdb::WriteOptions& options, const rocksdb::Slice& key, const rocksdb::Slice& value) { + rocksdb::Status result = impl_->Merge(options, key, value); + if (result == rocksdb::Status::NoSpace()) { +db_->invalidate(); + } + return result; +} + +bool OpenRocksDB::GetProperty(const rocksdb::Slice& property, std::string* value) { + return impl_->GetProperty(property, value); +} + +std::unique_ptr OpenRocksDB::NewIterator(const rocksdb::ReadOptions& options) { + return std::unique_ptr{impl_->NewIterator(options)}; +} + +rocksdb::Status OpenRocksDB::NewCheckpoint(rocksdb::Checkpoint **checkpoint) { + return rocksdb::Checkpoint::Create(impl_.get(), checkpoint); +} + +rocksdb::Status OpenRocksDB::FlushWAL(bool sync) { + rocksdb::Status result = impl_->FlushWAL(sync); + if (result == rocksdb::Status::NoSpace()) { +db_->invalidate(); + } + return result; +} + +rocksdb::DB* OpenRocksDB::get() { + return impl_.get(); +} + +RocksDatabase::RocksDatabase(const rocksdb::Options& options, const std::string& name, Mode mode) : open_options_(options), db_name_(name), mode_(mode) {} + +void RocksDatabase::invalidate() { + std::lock_guard db_guard{ mtx_ }; + // discard our own instance + impl_.reset(); +} + +utils::optional RocksDatabase::open() { + std::lock_guard db_guard{ mtx_ }; + if (!impl_) { +// database is not opened yet +rocksdb::DB* db_instance = nullptr; +rocksdb::Status result; +if (mode_ == Mode::ReadWrite) { Review comment: 樂 I think we should enable that error, although not in this PR This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #877: MINIFICPP-1339 - Reopen rocksdb database when space becomes available on the disk
adamdebreceni commented on a change in pull request #877: URL: https://github.com/apache/nifi-minifi-cpp/pull/877#discussion_r476234461 ## File path: extensions/rocksdb-repos/RocksDatabase.cpp ## @@ -0,0 +1,139 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "RocksDatabase.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace internal { + +OpenRocksDB::OpenRocksDB(gsl::not_null db, gsl::not_null> impl) : db_(std::move(db)), impl_(std::move(impl)) {} + +rocksdb::Status OpenRocksDB::Put(const rocksdb::WriteOptions& options, const rocksdb::Slice& key, const rocksdb::Slice& value) { + rocksdb::Status result = impl_->Put(options, key, value); + if (result == rocksdb::Status::NoSpace()) { +db_->invalidate(); + } + return result; +} + +rocksdb::Status OpenRocksDB::Get(const rocksdb::ReadOptions& options, const rocksdb::Slice& key, std::string* value) { + rocksdb::Status result = impl_->Get(options, key, value); + if (result == rocksdb::Status::NoSpace()) { +db_->invalidate(); + } + return result; +} + +std::vector OpenRocksDB::MultiGet(const rocksdb::ReadOptions& options, const std::vector& keys, std::vector* values) { + std::vector results = impl_->MultiGet(options, keys, values); + for (const auto& result : results) { +if (result == rocksdb::Status::NoSpace()) { + db_->invalidate(); +} + } Review comment: nice catch (although `invalidate` is idempotent), I just added a `break`, because the proposed solution is too verbose for my liking, I dream of a day when we can do something like `results.some(_ == rocksdb::Status::NoSpace())` but that will probably come in `C++never` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #877: MINIFICPP-1339 - Reopen rocksdb database when space becomes available on the disk
adamdebreceni commented on a change in pull request #877: URL: https://github.com/apache/nifi-minifi-cpp/pull/877#discussion_r475690750 ## File path: extensions/rocksdb-repos/RocksDbStream.h ## @@ -160,7 +160,7 @@ class RocksDbStream : public io::BaseStream { std::string value_; - rocksdb::DB *db_; + gsl::not_null db_; Review comment: but since nobody seems to share this feeling (here or the "internet") I will modify the constructor to take a reference This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #877: MINIFICPP-1339 - Reopen rocksdb database when space becomes available on the disk
adamdebreceni commented on a change in pull request #877: URL: https://github.com/apache/nifi-minifi-cpp/pull/877#discussion_r475689892 ## File path: extensions/rocksdb-repos/RocksDbStream.h ## @@ -160,7 +160,7 @@ class RocksDbStream : public io::BaseStream { std::string value_; - rocksdb::DB *db_; + gsl::not_null db_; Review comment: I couldn't find a convincing argument either, only a gut feeling that having `U f(T& t)` and `U g(T* t)`, after calling `g` I'm more reluctant to discard the `t` I provided than after calling `f`, I feel like if I write `f` I'm signaling that I only need the object for the duration of the function This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #877: MINIFICPP-1339 - Reopen rocksdb database when space becomes available on the disk
adamdebreceni commented on a change in pull request #877: URL: https://github.com/apache/nifi-minifi-cpp/pull/877#discussion_r475671668 ## File path: extensions/rocksdb-repos/RocksDatabase.cpp ## @@ -0,0 +1,138 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "RocksDatabase.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace internal { + +OpenRocksDB::OpenRocksDB(gsl::not_null db, gsl::not_null> impl) : db_(std::move(db)), impl_(std::move(impl)) {} + +rocksdb::Status OpenRocksDB::Put(const rocksdb::WriteOptions& options, const rocksdb::Slice& key, const rocksdb::Slice& value) { + rocksdb::Status result = impl_->Put(options, key, value); + if (result == rocksdb::Status::NoSpace()) { +db_->invalidate(); + } + return result; +} + +rocksdb::Status OpenRocksDB::Get(const rocksdb::ReadOptions& options, const rocksdb::Slice& key, std::string* value) { + rocksdb::Status result = impl_->Get(options, key, value); + if (result == rocksdb::Status::NoSpace()) { +db_->invalidate(); + } + return result; +} + +std::vector OpenRocksDB::MultiGet(const rocksdb::ReadOptions& options, const std::vector& keys, std::vector* values) { + std::vector results = impl_->MultiGet(options, keys, values); + for (const auto& result : results) { +if (result == rocksdb::Status::NoSpace()) { + db_->invalidate(); +} + } + return results; +} + +rocksdb::Status OpenRocksDB::Write(const rocksdb::WriteOptions& options, rocksdb::WriteBatch* updates) { + rocksdb::Status result = impl_->Write(options, updates); + if (result == rocksdb::Status::NoSpace()) { +db_->invalidate(); + } + return result; +} + +rocksdb::Status OpenRocksDB::Delete(const rocksdb::WriteOptions& options, const rocksdb::Slice& key) { + rocksdb::Status result = impl_->Delete(options, key); + if (result == rocksdb::Status::NoSpace()) { +db_->invalidate(); + } + return result; +} + +rocksdb::Status OpenRocksDB::Merge(const rocksdb::WriteOptions& options, const rocksdb::Slice& key, const rocksdb::Slice& value) { + rocksdb::Status result = impl_->Merge(options, key, value); + if (result == rocksdb::Status::NoSpace()) { +db_->invalidate(); + } + return result; +} + +bool OpenRocksDB::GetProperty(const rocksdb::Slice& property, std::string* value) { + return impl_->GetProperty(property, value); +} + +std::unique_ptr OpenRocksDB::NewIterator(const rocksdb::ReadOptions& options) { + return std::unique_ptr{impl_->NewIterator(options)}; +} + +rocksdb::Status OpenRocksDB::NewCheckpoint(rocksdb::Checkpoint **checkpoint) { + return rocksdb::Checkpoint::Create(impl_.get(), checkpoint); +} + +rocksdb::Status OpenRocksDB::FlushWAL(bool sync) { + rocksdb::Status result = impl_->FlushWAL(sync); + if (result == rocksdb::Status::NoSpace()) { +db_->invalidate(); + } + return result; +} + +rocksdb::DB* OpenRocksDB::get() { + return impl_.get(); +} + +RocksDatabase::RocksDatabase(const rocksdb::Options& options, const std::string& name, Mode mode) : open_options_(options), db_name_(name), mode_(mode) {} + +void RocksDatabase::invalidate() { + std::lock_guard db_guard{ mtx_ }; + // discard our own instance + impl_.reset(); +} + +utils::optional RocksDatabase::open() { + std::lock_guard db_guard{ mtx_ }; + if (!impl_) { +// database is not opened yet +rocksdb::DB* db_instance = nullptr; +rocksdb::Status result; +if (mode_ == Mode::ReadWrite) { Review comment: could we promote such a warning to an error at the file/function/block level? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #877: MINIFICPP-1339 - Reopen rocksdb database when space becomes available on the disk
adamdebreceni commented on a change in pull request #877: URL: https://github.com/apache/nifi-minifi-cpp/pull/877#discussion_r475634470 ## File path: extensions/rocksdb-repos/FlowFileRepository.cpp ## @@ -134,14 +142,18 @@ void FlowFileRepository::prune_stored_flowfiles() { if (status.ok()) { Review comment: made it possible to create a `RocksDatabase` in a `ReadOnly` mode, so `::open` acts accordingly, this cleared up the three different DB variables This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #877: MINIFICPP-1339 - Reopen rocksdb database when space becomes available on the disk
adamdebreceni commented on a change in pull request #877: URL: https://github.com/apache/nifi-minifi-cpp/pull/877#discussion_r475565552 ## File path: extensions/rocksdb-repos/RocksDatabase.h ## @@ -0,0 +1,98 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include "utils/OptionalUtils.h" +#include "rocksdb/db.h" +#include "rocksdb/options.h" +#include "rocksdb/slice.h" +#include "rocksdb/utilities/checkpoint.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace internal { + +class RocksDatabase; + +// Not thread safe +class OpenRocksDB { Review comment: done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #877: MINIFICPP-1339 - Reopen rocksdb database when space becomes available on the disk
adamdebreceni commented on a change in pull request #877: URL: https://github.com/apache/nifi-minifi-cpp/pull/877#discussion_r475565395 ## File path: extensions/rocksdb-repos/RocksDatabase.h ## @@ -0,0 +1,98 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include "utils/OptionalUtils.h" +#include "rocksdb/db.h" +#include "rocksdb/options.h" +#include "rocksdb/slice.h" +#include "rocksdb/utilities/checkpoint.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace internal { + +class RocksDatabase; + +// Not thread safe +class OpenRocksDB { + friend class RocksDatabase; + + OpenRocksDB(gsl::not_null db, gsl::not_null> impl); + + public: + rocksdb::Status Put(const rocksdb::WriteOptions& options, const rocksdb::Slice& key, const rocksdb::Slice& value); + + rocksdb::Status Get(const rocksdb::ReadOptions& options, const rocksdb::Slice& key, std::string* value); + + std::vector MultiGet(const rocksdb::ReadOptions& options, const std::vector& keys, std::vector* values); + + rocksdb::Status Write(const rocksdb::WriteOptions& options, rocksdb::WriteBatch* updates); + + rocksdb::Status Delete(const rocksdb::WriteOptions& options, const rocksdb::Slice& key); + + rocksdb::Status Merge(const rocksdb::WriteOptions& options, const rocksdb::Slice& key, const rocksdb::Slice& value); + + bool GetProperty(const rocksdb::Slice& property, std::string* value); + + rocksdb::Iterator* NewIterator(const rocksdb::ReadOptions& options); Review comment: done ## File path: extensions/rocksdb-repos/FlowFileRepository.cpp ## @@ -71,7 +75,7 @@ void FlowFileRepository::flush() { batch.Delete(keys[i]); } - auto operation = [this, ]() { return db_->Write(rocksdb::WriteOptions(), ); }; + auto operation = [this, , ]() { return opendb->Write(rocksdb::WriteOptions(), ); }; Review comment: done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #877: MINIFICPP-1339 - Reopen rocksdb database when space becomes available on the disk
adamdebreceni commented on a change in pull request #877: URL: https://github.com/apache/nifi-minifi-cpp/pull/877#discussion_r475550882 ## File path: extensions/rocksdb-repos/RocksDbStream.h ## @@ -160,7 +160,7 @@ class RocksDbStream : public io::BaseStream { std::string value_; - rocksdb::DB *db_; + gsl::not_null db_; Review comment: I was under the impression that taking a reference indicates that we are not planning on storing it, is this not a generally recognized convention? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org