This is an automated email from the ASF dual-hosted git repository.

saipranav pushed a commit to branch QueccBranch
in repository https://gitbox.apache.org/repos/asf/incubator-resilientdb.git

commit bcd10cd8bf31742bb22ff909a842cb29b7639016
Author: Saipranav Kotamreddy <[email protected]>
AuthorDate: Sun Jan 21 17:05:50 2024 -0800

    "Feeding multi_op transactions to threads to process"
---
 executor/kv/quecc_executor.cpp | 53 ++++++++++++++++++++++++++++++++++--------
 executor/kv/quecc_executor.h   |  5 ++++
 2 files changed, 48 insertions(+), 10 deletions(-)

diff --git a/executor/kv/quecc_executor.cpp b/executor/kv/quecc_executor.cpp
index c890af7c..119fd9d6 100644
--- a/executor/kv/quecc_executor.cpp
+++ b/executor/kv/quecc_executor.cpp
@@ -62,6 +62,9 @@ QueccExecutor::QueccExecutor(std::unique_ptr<Storage> storage)
     sorted_transactions_.push_back(
         std::vector<std::vector<KVOperation>>(thread_count_));
     batch_ready_.push_back(false);
+    multi_op_batches_.push_back(std::vector<KVRequest>());
+    multi_op_number_batches_.push_back(std::vector<int>());
+    multi_op_ready_.store(false);
 
     std::thread planner(&QueccExecutor::PlannerThread, this, thread_number);
 
@@ -192,6 +195,8 @@ std::unique_ptr<BatchUserResponse> 
QueccExecutor::ExecuteBatch(
   transaction_tracker_.clear();
   operation_list_.clear();
   key_weight_.clear();
+  multi_op_transactions_.clear();
+  multi_op_transactions_numbers_.clear();
   total_weight_=0;
 
   for (int i = 0; i < (int)batch_array_.size(); i++) {
@@ -210,6 +215,10 @@ std::unique_ptr<BatchUserResponse> 
QueccExecutor::ExecuteBatch(
     // printf("txn id: %lu\n", kv_request.txn_id());
     // printf("kv_request size: %d\n", kv_request.ops_size());    
     if (kv_request.ops_size()) {
+      if(kv_request.ops_size()>1){
+        multi_op_transactions_.push_back(kv_request);
+        multi_op_transactions_numbers_.push_back(txn_id);
+      }
       transaction_tracker_[txn_id] = kv_request.ops_size();
       for(const auto& op : kv_request.ops()){
         KVOperation newOp;
@@ -259,17 +268,41 @@ std::unique_ptr<BatchUserResponse> 
QueccExecutor::ExecuteBatch(
       batch_number++;
     }
   }
-/*
-  // RIDs in hash map are now equal to which range they go into
-  int range_count = 0;
-  int range_size = ((rid_to_range_.size() - 1) / thread_count_) + 1;
-  for (const auto& key : rid_to_range_) {
-    rid_to_range_[key.first] = range_count / range_size;
-    range_count++;
-  }
-*/
   CreateRanges();
-  
+
+  int multi_op_split_size = 
multi_op_transactions_numbers_.size()/thread_count_+1;
+  int count_per_split=0;
+  int op_batch_number=0;
+  //Send multi operation transactions to thread to gauge if they require waits
+  for(int i=0; i<multi_op_transactions_numbers_.size(); i++){
+    multi_op_ready_.store(true);
+    multi_op_batches_[op_batch_number].push_back(multi_op_transactions_[i]);
+    
multi_op_number_batches_[op_batch_number].push_back(multi_op_transactions_numbers_[i]);
+    count_per_split++;
+    if(count_per_split>multi_op_split_size){
+      count_per_split=0;
+      op_batch_number++;
+    }
+  }
+
+  if(multi_op_ready_.load()){
+    ready_planner_count_.fetch_add(thread_count_);
+    // Allows planner threads to start consuming
+    for (int i = 0; i < thread_count_; i++) {
+      batch_ready_[i] = true;
+    }
+    cv_.notify_all();
+
+    // Wait for threads to finish to get batch response
+    while (ready_planner_count_.load() != 0) {
+      std::unique_lock<std::mutex> lk2(mutex2_);
+      if (cv2_.wait_for(lk2, std::chrono::microseconds(100),
+                        [this] { return is_stop_; })) {
+        return std::move(this->batch_response_);
+      }
+    }
+  }
+
   ready_planner_count_.fetch_add(thread_count_);
   // Allows planner threads to start consuming
   for (int i = 0; i < thread_count_; i++) {
diff --git a/executor/kv/quecc_executor.h b/executor/kv/quecc_executor.h
index 48f7f99c..786ced11 100644
--- a/executor/kv/quecc_executor.h
+++ b/executor/kv/quecc_executor.h
@@ -97,6 +97,11 @@ class QueccExecutor : public TransactionManager {
   atomic<int> ready_planner_count_;
   std::unique_ptr<BatchUserResponse> batch_response_;
   std::unique_ptr<Storage> storage_;
+  vector<KVRequest> multi_op_transactions_;
+  vector<int> multi_op_transactions_numbers_;
+  vector<vector<KVRequest>> multi_op_batches_;
+  vector<vector<int>> multi_op_number_batches_;
+  atomic<bool> multi_op_ready_;
 };
 
 }  // namespace resdb

Reply via email to