pitrou commented on code in PR #44897:
URL: https://github.com/apache/arrow/pull/44897#discussion_r1876172195


##########
cpp/src/arrow/filesystem/test_util.cc:
##########
@@ -578,6 +578,67 @@ void GenericFileSystemTest::TestCopyFile(FileSystem* fs) {
   AssertAllFiles(fs, {"AB/abc", "EF/ghi", "def"});
 }
 
+void GenericFileSystemTest::TestCopyFiles(FileSystem* fs) {
+#if defined(ADDRESS_SANITIZER) || defined(ARROW_VALGRIND)
+  if (have_false_positive_memory_leak_with_async_close()) {
+    GTEST_SKIP() << "Filesystem have false positive memory leak with 
generator";
+  }
+#endif
+  auto io_thread_pool =
+      static_cast<arrow::internal::ThreadPool*>(fs->io_context().executor());
+  auto original_threads = io_thread_pool->GetCapacity();
+  // Needs to be smaller than the number of files we test with to catch 
GH-15233
+  ASSERT_OK(io_thread_pool->SetCapacity(2));
+  // Ensure the thread pool capacity is set back to the original value after 
the test
+  auto reset_thread_pool = [io_thread_pool, original_threads](void*) {
+    ASSERT_OK(io_thread_pool->SetCapacity(original_threads));
+  };
+  std::unique_ptr<void, decltype(reset_thread_pool)> resetThreadPoolGuard(

Review Comment:
   Nit
   ```suggestion
     std::unique_ptr<void, decltype(reset_thread_pool)> reset_thread_pool_guard(
   ```



##########
cpp/src/arrow/util/thread_pool.cc:
##########
@@ -52,6 +52,24 @@ struct Task {
   Executor::StopCallback stop_callback;
 };
 
+struct QueuedTask {
+  Task task;
+  int32_t priority;
+  uint64_t spawn_index;
+
+  // Implement comparison so that std::priority_queue will pop the low 
priorities more
+  // urgently.
+  bool operator<(const QueuedTask& other) const {
+    if (priority == other.priority) {
+      // Maintain spawn order for tasks with the same priority. TODO: Decide 
if this is
+      // really needed. Currently several test cases in 
arrow-acero-hash-aggregate-test
+      // depend on it.

Review Comment:
   I think this is preferrable for deterministic execution, so we can probably 
remove the TODO.



##########
cpp/src/arrow/util/thread_pool_test.cc:
##########
@@ -578,6 +578,57 @@ TEST_F(TestThreadPool, Spawn) {
   SpawnAdds(pool.get(), 7, task_add<int>);
 }
 
+TEST_F(TestThreadPool, TasksRunInPriorityOrder) {
+  auto pool = this->MakeThreadPool(1);
+  constexpr int kNumTasks = 10;
+  auto recorded_times = 
std::vector<std::chrono::steady_clock::time_point>(kNumTasks);
+  auto futures = std::vector<Future<int>>(kNumTasks);
+  auto sleep_task = []() { SleepABit(); };
+
+  // Spawn a sleep task to block the pool while we add the other tasks. This
+  // ensures all the tasks are queued before any of them start running, so that
+  // their running order is fully determined by their priority.
+  ASSERT_OK(pool->Spawn(sleep_task));

Review Comment:
   This may well fail to fulfill its purpose if run on a slow or busy machine 
(which unfortunately can happen with public CI infrastructure). 
   
   Using a synchronization primitive would be better, something like:
   1) sleep_task waits for a condition
   2) main thread spawns all record_time tasks, which can't run until 
sleep_task finishes
   3) main thread sets the condition so that sleep_task returns, unblocking the 
thread pool



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to