pitrou commented on code in PR #44897:
URL: https://github.com/apache/arrow/pull/44897#discussion_r1876172195
##########
cpp/src/arrow/filesystem/test_util.cc:
##########
@@ -578,6 +578,67 @@ void GenericFileSystemTest::TestCopyFile(FileSystem* fs) {
AssertAllFiles(fs, {"AB/abc", "EF/ghi", "def"});
}
+void GenericFileSystemTest::TestCopyFiles(FileSystem* fs) {
+#if defined(ADDRESS_SANITIZER) || defined(ARROW_VALGRIND)
+ if (have_false_positive_memory_leak_with_async_close()) {
+ GTEST_SKIP() << "Filesystem have false positive memory leak with
generator";
+ }
+#endif
+ auto io_thread_pool =
+ static_cast<arrow::internal::ThreadPool*>(fs->io_context().executor());
+ auto original_threads = io_thread_pool->GetCapacity();
+ // Needs to be smaller than the number of files we test with to catch
GH-15233
+ ASSERT_OK(io_thread_pool->SetCapacity(2));
+ // Ensure the thread pool capacity is set back to the original value after
the test
+ auto reset_thread_pool = [io_thread_pool, original_threads](void*) {
+ ASSERT_OK(io_thread_pool->SetCapacity(original_threads));
+ };
+ std::unique_ptr<void, decltype(reset_thread_pool)> resetThreadPoolGuard(
Review Comment:
Nit
```suggestion
std::unique_ptr<void, decltype(reset_thread_pool)> reset_thread_pool_guard(
```
##########
cpp/src/arrow/util/thread_pool.cc:
##########
@@ -52,6 +52,24 @@ struct Task {
Executor::StopCallback stop_callback;
};
+struct QueuedTask {
+ Task task;
+ int32_t priority;
+ uint64_t spawn_index;
+
+ // Implement comparison so that std::priority_queue will pop the low
priorities more
+ // urgently.
+ bool operator<(const QueuedTask& other) const {
+ if (priority == other.priority) {
+ // Maintain spawn order for tasks with the same priority. TODO: Decide
if this is
+ // really needed. Currently several test cases in
arrow-acero-hash-aggregate-test
+ // depend on it.
Review Comment:
I think this is preferrable for deterministic execution, so we can probably
remove the TODO.
##########
cpp/src/arrow/util/thread_pool_test.cc:
##########
@@ -578,6 +578,57 @@ TEST_F(TestThreadPool, Spawn) {
SpawnAdds(pool.get(), 7, task_add<int>);
}
+TEST_F(TestThreadPool, TasksRunInPriorityOrder) {
+ auto pool = this->MakeThreadPool(1);
+ constexpr int kNumTasks = 10;
+ auto recorded_times =
std::vector<std::chrono::steady_clock::time_point>(kNumTasks);
+ auto futures = std::vector<Future<int>>(kNumTasks);
+ auto sleep_task = []() { SleepABit(); };
+
+ // Spawn a sleep task to block the pool while we add the other tasks. This
+ // ensures all the tasks are queued before any of them start running, so that
+ // their running order is fully determined by their priority.
+ ASSERT_OK(pool->Spawn(sleep_task));
Review Comment:
This may well fail to fulfill its purpose if run on a slow or busy machine
(which unfortunately can happen with public CI infrastructure).
Using a synchronization primitive would be better, something like:
1) sleep_task waits for a condition
2) main thread spawns all record_time tasks, which can't run until
sleep_task finishes
3) main thread sets the condition so that sleep_task returns, unblocking the
thread pool
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]