This is an automated email from the ASF dual-hosted git repository. saipranav pushed a commit to branch QueccBranch in repository https://gitbox.apache.org/repos/asf/incubator-resilientdb.git
commit bcd10cd8bf31742bb22ff909a842cb29b7639016 Author: Saipranav Kotamreddy <[email protected]> AuthorDate: Sun Jan 21 17:05:50 2024 -0800 "Feeding multi_op transactions to threads to process" --- executor/kv/quecc_executor.cpp | 53 ++++++++++++++++++++++++++++++++++-------- executor/kv/quecc_executor.h | 5 ++++ 2 files changed, 48 insertions(+), 10 deletions(-) diff --git a/executor/kv/quecc_executor.cpp b/executor/kv/quecc_executor.cpp index c890af7c..119fd9d6 100644 --- a/executor/kv/quecc_executor.cpp +++ b/executor/kv/quecc_executor.cpp @@ -62,6 +62,9 @@ QueccExecutor::QueccExecutor(std::unique_ptr<Storage> storage) sorted_transactions_.push_back( std::vector<std::vector<KVOperation>>(thread_count_)); batch_ready_.push_back(false); + multi_op_batches_.push_back(std::vector<KVRequest>()); + multi_op_number_batches_.push_back(std::vector<int>()); + multi_op_ready_.store(false); std::thread planner(&QueccExecutor::PlannerThread, this, thread_number); @@ -192,6 +195,8 @@ std::unique_ptr<BatchUserResponse> QueccExecutor::ExecuteBatch( transaction_tracker_.clear(); operation_list_.clear(); key_weight_.clear(); + multi_op_transactions_.clear(); + multi_op_transactions_numbers_.clear(); total_weight_=0; for (int i = 0; i < (int)batch_array_.size(); i++) { @@ -210,6 +215,10 @@ std::unique_ptr<BatchUserResponse> QueccExecutor::ExecuteBatch( // printf("txn id: %lu\n", kv_request.txn_id()); // printf("kv_request size: %d\n", kv_request.ops_size()); if (kv_request.ops_size()) { + if(kv_request.ops_size()>1){ + multi_op_transactions_.push_back(kv_request); + multi_op_transactions_numbers_.push_back(txn_id); + } transaction_tracker_[txn_id] = kv_request.ops_size(); for(const auto& op : kv_request.ops()){ KVOperation newOp; @@ -259,17 +268,41 @@ std::unique_ptr<BatchUserResponse> QueccExecutor::ExecuteBatch( batch_number++; } } -/* - // RIDs in hash map are now equal to which range they go into - int range_count = 0; - int range_size = ((rid_to_range_.size() - 1) / thread_count_) + 1; - for (const auto& key : rid_to_range_) { - rid_to_range_[key.first] = range_count / range_size; - range_count++; - } -*/ CreateRanges(); - + + int multi_op_split_size = multi_op_transactions_numbers_.size()/thread_count_+1; + int count_per_split=0; + int op_batch_number=0; + //Send multi operation transactions to thread to gauge if they require waits + for(int i=0; i<multi_op_transactions_numbers_.size(); i++){ + multi_op_ready_.store(true); + multi_op_batches_[op_batch_number].push_back(multi_op_transactions_[i]); + multi_op_number_batches_[op_batch_number].push_back(multi_op_transactions_numbers_[i]); + count_per_split++; + if(count_per_split>multi_op_split_size){ + count_per_split=0; + op_batch_number++; + } + } + + if(multi_op_ready_.load()){ + ready_planner_count_.fetch_add(thread_count_); + // Allows planner threads to start consuming + for (int i = 0; i < thread_count_; i++) { + batch_ready_[i] = true; + } + cv_.notify_all(); + + // Wait for threads to finish to get batch response + while (ready_planner_count_.load() != 0) { + std::unique_lock<std::mutex> lk2(mutex2_); + if (cv2_.wait_for(lk2, std::chrono::microseconds(100), + [this] { return is_stop_; })) { + return std::move(this->batch_response_); + } + } + } + ready_planner_count_.fetch_add(thread_count_); // Allows planner threads to start consuming for (int i = 0; i < thread_count_; i++) { diff --git a/executor/kv/quecc_executor.h b/executor/kv/quecc_executor.h index 48f7f99c..786ced11 100644 --- a/executor/kv/quecc_executor.h +++ b/executor/kv/quecc_executor.h @@ -97,6 +97,11 @@ class QueccExecutor : public TransactionManager { atomic<int> ready_planner_count_; std::unique_ptr<BatchUserResponse> batch_response_; std::unique_ptr<Storage> storage_; + vector<KVRequest> multi_op_transactions_; + vector<int> multi_op_transactions_numbers_; + vector<vector<KVRequest>> multi_op_batches_; + vector<vector<int>> multi_op_number_batches_; + atomic<bool> multi_op_ready_; }; } // namespace resdb
