pitrou commented on a change in pull request #10233:
URL: https://github.com/apache/arrow/pull/10233#discussion_r629378247



##########
File path: cpp/src/arrow/util/thread_pool_test.cc
##########
@@ -512,6 +514,74 @@ TEST_F(TestThreadPool, Submit) {
   }
 }
 
+TEST_F(TestThreadPool, ParallelSummationWithPerThreadState) {
+  // Sum all integers in [0, 1000000) in parallel.
+  struct {
+    void WriteInto(std::vector<int64_t>* addends) {
+      constexpr int kBatchSize = 1000;
+
+      addends->resize(kBatchSize);
+      std::iota(addends->begin(), addends->end(), 
begin_.fetch_add(kBatchSize));
+
+      expected_sum_ += (addends->front() + addends->back()) * kBatchSize / 2;
+    }
+
+    std::atomic<int64_t> begin_{0}, expected_sum_{0};
+  } addend_source;
+
+  constexpr int kThreadPoolCapacity = 5;
+  constexpr int kBatchCount = 1000;
+
+  auto pool = this->MakeThreadPool(kThreadPoolCapacity);
+
+  // Each thread will have a unique local sum
+  auto local_sums = BorrowSet<int64_t>::Make(pool.get());
+
+  // Each task needs a vector into which addends can be written by a source.
+  // Instead of allocating a vector in each task, we can provide a BorrowSet
+  // to allow reuse of a vector.
+  auto local_addend_batches = 
BorrowSet<std::vector<int64_t>>::Make(pool.get());
+
+  std::atomic<int64_t> sum{0}, state_count{0};
+  local_sums->OnDone([&](int64_t&& local_sum) {
+    // When we're done with a local sum, add it into the global sum.
+    sum += local_sum;
+
+    // Track the total number of constructed states; should == thread count
+    ++state_count;
+  });
+
+  std::vector<Future<>> futures(kBatchCount);
+
+  for (int i = 0; i < kBatchCount; ++i) {
+    ASSERT_OK_AND_ASSIGN(futures[i], pool->Submit([&, local_sums, 
local_addend_batches] {
+      // Acquire thread local state. Each task may safely mutate since tasks
+      // running in another thread will acquire a different local_sum
+      auto local_sum = local_sums->BorrowOne();
+
+      auto addends = local_addend_batches->BorrowOne();
+      addend_source.WriteInto(addends.get());

Review comment:
       I must admit I don't understand why this is creating a chunk of addends 
in each thread. I understand it's a test so it doesn't matter much, but it 
still looks a bit gratuitous.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to