save-buffer commented on code in PR #13669:
URL: https://github.com/apache/arrow/pull/13669#discussion_r1068674968


##########
cpp/src/arrow/compute/exec/accumulation_queue.h:
##########
@@ -42,16 +45,88 @@ class AccumulationQueue {
 
   void Concatenate(AccumulationQueue&& that);
   void InsertBatch(ExecBatch batch);
-  int64_t row_count() { return row_count_; }
-  size_t batch_count() { return batches_.size(); }
+  void SetBatch(size_t idx, ExecBatch batch);
+  size_t batch_count() const { return batches_.size(); }
   bool empty() const { return batches_.empty(); }
+  size_t CalculateRowCount() const;
+
+  // Resizes the accumulation queue to contain size batches. The
+  // new batches will be empty and have length 0, but they will be
+  // usable (useful for concurrent modification of the AccumulationQueue
+  // of separate elements).
+  void Resize(size_t size) { batches_.resize(size); }
   void Clear();
-  ExecBatch& operator[](size_t i);
+  ExecBatch& operator[](size_t i) { return batches_[i]; }
+  const ExecBatch& operator[](size_t i) const { return batches_[i]; }
 
  private:
-  int64_t row_count_;
   std::vector<ExecBatch> batches_;
 };
 
-}  // namespace util
+class SpillingAccumulationQueue {
+ public:
+  // Number of partitions must be a power of two, since we assign partitions by
+  // looking at bottom few bits.
+  static constexpr int kLogNumPartitions = 6;
+  static constexpr int kNumPartitions = 1 << kLogNumPartitions;
+  Status Init(QueryContext* ctx);
+  // Assumes that the final column in batch contains 64-bit hashes of the 
columns.
+  Status InsertBatch(size_t thread_index, ExecBatch batch);
+  Status GetPartition(size_t thread_index, size_t partition,
+                      std::function<Status(size_t, size_t, ExecBatch)>
+                          on_batch,  // thread_index, batch_index, batch
+                      std::function<Status(size_t)> on_finished);
+
+  // Returns hashes of the given partition and batch index.
+  // partition MUST be at least hash_cursor, as if partition < hash_cursor,
+  // these hashes will have been deleted.
+  const uint64_t* GetHashes(size_t partition, size_t batch_idx);
+  inline size_t batch_count(size_t partition) const {
+    size_t num_full_batches = partition >= spilling_cursor_
+                                  ? queues_[partition].batch_count()
+                                  : files_[partition].num_batches();
+
+    return num_full_batches + (builders_[partition].num_rows() > 0);
+  }
+  inline size_t row_count(size_t partition, size_t batch_idx) const {
+    if (batch_idx < hash_queues_[partition].batch_count())
+      return hash_queues_[partition][batch_idx].length;
+    else
+      return builders_[partition].num_rows();
+  }
+
+  static inline constexpr size_t partition_id(uint64_t hash) {
+    // Hash Table uses the top bits of the hash, so we really really
+    // need to use the bottom bits of the hash for spilling to avoid
+    // a huge number of hash collisions per partition.
+    return static_cast<size_t>(hash & (kNumPartitions - 1));
+  }
+
+  size_t CalculatePartitionRowCount(size_t partition) const;
+
+  Result<bool> AdvanceSpillCursor();
+  Result<bool> AdvanceHashCursor();
+  inline size_t spill_cursor() const { return spilling_cursor_.load(); }
+  inline size_t hash_cursor() const { return hash_cursor_.load(); }
+
+ private:
+  std::atomic<size_t> spilling_cursor_{0};  // denotes the first in-memory 
partition
+  std::atomic<size_t> hash_cursor_{0};
+
+  QueryContext* ctx_;
+  PartitionLocks partition_locks_;
+
+  AccumulationQueue queues_[kNumPartitions];
+  AccumulationQueue hash_queues_[kNumPartitions];
+
+  ExecBatchBuilder builders_[kNumPartitions];
+

Review Comment:
   Yeah it might make it a little easier in the future if we need to add more 
fields. I changed it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to