szaszm commented on code in PR #1508: URL: https://github.com/apache/nifi-minifi-cpp/pull/1508#discussion_r1165298009
########## extensions/rocksdb-repos/FlowFileRepository.cpp: ########## @@ -41,52 +41,70 @@ void FlowFileRepository::flush() { return; } auto batch = opendb->createWriteBatch(); - rocksdb::ReadOptions options; - std::vector<std::shared_ptr<FlowFile>> purgeList; + std::list<ExpiredFlowFileInfo> flow_files; - std::vector<rocksdb::Slice> keys; - std::list<std::string> keystrings; - std::vector<std::string> values; - - while (keys_to_delete.size_approx() > 0) { - std::string key; - if (keys_to_delete.try_dequeue(key)) { - keystrings.push_back(std::move(key)); // rocksdb::Slice doesn't copy the string, only grabs ptrs. Hacky, but have to ensure the required lifetime of the strings. - keys.push_back(keystrings.back()); + while (keys_to_delete_.size_approx() > 0) { + ExpiredFlowFileInfo info; + if (keys_to_delete_.try_dequeue(info)) { + flow_files.push_back(std::move(info)); } } - auto multistatus = opendb->MultiGet(options, keys, &values); - for (size_t i = 0; i < keys.size() && i < values.size() && i < multistatus.size(); ++i) { - if (!multistatus[i].ok()) { - logger_->log_error("Failed to read key from rocksdb: %s! DB is most probably in an inconsistent state!", keys[i].data()); - keystrings.remove(keys[i].data()); - continue; - } + deserializeFlowFilesWithNoContentClaim(opendb.value(), flow_files); - utils::Identifier containerId; - auto eventRead = FlowFileRecord::DeSerialize(gsl::make_span(values[i]).as_span<const std::byte>(), content_repo_, containerId); - if (eventRead) { - purgeList.push_back(eventRead); - } - logger_->log_debug("Issuing batch delete, including %s, Content path %s", eventRead->getUUIDStr(), eventRead->getContentFullPath()); - batch.Delete(keys[i]); + for (auto& ff : flow_files) { + batch.Delete(ff.key); + logger_->log_debug("Issuing batch delete, including %s, Content path %s", ff.key, ff.content ? ff.content->getContentFullPath() : "null"); } auto operation = [&batch, &opendb]() { return opendb->Write(rocksdb::WriteOptions(), &batch); }; if (!ExecuteWithRetry(operation)) { - for (const auto& key : keystrings) { - keys_to_delete.enqueue(key); // Push back the values that we could get but couldn't delete + for (auto&& ff : flow_files) { + keys_to_delete_.enqueue(std::move(ff)); } return; // Stop here - don't delete from content repo while we have records in FF repo } if (content_repo_) { - for (const auto &ffr : purgeList) { - auto claim = ffr->getResourceClaim(); - if (claim) claim->decreaseFlowFileRecordOwnedCount(); + for (auto& ff : flow_files) { + if (ff.content) { + ff.content->decreaseFlowFileRecordOwnedCount(); + } + } + } +} + +void FlowFileRepository::deserializeFlowFilesWithNoContentClaim(minifi::internal::OpenRocksDb& opendb, std::list<ExpiredFlowFileInfo>& flow_files) { + std::vector<rocksdb::Slice> keys; + std::vector<std::list<ExpiredFlowFileInfo>::iterator> key_positions; + for (auto it = flow_files.begin(); it != flow_files.end(); ++it) { + if (!it->content) { + keys.push_back(it->key); + key_positions.push_back(it); + } + } + if (!keys.empty()) { Review Comment: I would flip this condition, make an early return, and reduce the indentation level of the code below. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org