pitrou commented on code in PR #44897:
URL: https://github.com/apache/arrow/pull/44897#discussion_r1869522078
##########
cpp/src/arrow/util/thread_pool_test.cc:
##########
@@ -578,6 +578,55 @@ TEST_F(TestThreadPool, Spawn) {
SpawnAdds(pool.get(), 7, task_add<int>);
}
+TEST_F(TestThreadPool, TasksRunInPriorityOrder) {
+ auto pool = this->MakeThreadPool(1);
+ auto recorded_times = std::vector<std::chrono::system_clock::time_point>(10);
Review Comment:
Let's avoid hardcoding 10 everywhere in this test?
```suggestion
constexpr int kNumTasks = 10;
auto recorded_times =
std::vector<std::chrono::system_clock::time_point>(kNumTasks);
```
(etc.)
##########
cpp/src/arrow/util/thread_pool.cc:
##########
@@ -449,8 +468,8 @@ static void WorkerLoop(std::shared_ptr<ThreadPool::State>
state,
DCHECK_GE(state->tasks_queued_or_running_, 0);
{
- Task task = std::move(state->pending_tasks_.front());
- state->pending_tasks_.pop_front();
+ Task task =
std::move(const_cast<Task&>(state->pending_tasks_.top().task));
Review Comment:
AFAICT we can remove the `const_cast` hack by writing:
```suggestion
Task task = std::move(state->pending_tasks_.c.front().task);
```
##########
cpp/src/arrow/util/thread_pool.cc:
##########
@@ -52,6 +52,24 @@ struct Task {
Executor::StopCallback stop_callback;
};
+struct QueuedTask {
+ Task task;
+ TaskHints hints;
Review Comment:
Storing priority would be sufficient I think.
##########
cpp/src/arrow/filesystem/test_util.cc:
##########
@@ -578,6 +578,67 @@ void GenericFileSystemTest::TestCopyFile(FileSystem* fs) {
AssertAllFiles(fs, {"AB/abc", "EF/ghi", "def"});
}
+void GenericFileSystemTest::TestCopyFiles(FileSystem* fs) {
+#if defined(ADDRESS_SANITIZER) || defined(ARROW_VALGRIND)
+ if (have_false_positive_memory_leak_with_async_close()) {
+ GTEST_SKIP() << "Filesystem have false positive memory leak with
generator";
+ }
+#endif
+ auto originalThreads = io::GetIOThreadPoolCapacity();
+ // Needs to be smaller than the number of files we test with to catch
GH-15233
+ ASSERT_OK(io::SetIOThreadPoolCapacity(2));
+ // Ensure the thread pool capacity is set back to the original value after
the test
+ auto resetThreadPool = [originalThreads](void*) {
+ ASSERT_OK(io::SetIOThreadPoolCapacity(originalThreads));
+ };
+ std::unique_ptr<void, decltype(resetThreadPool)>
resetThreadPoolGuard(nullptr,
+
resetThreadPool);
+
+ auto mock_fs = std::make_shared<arrow::fs::internal::MockFileSystem>(
+ std::chrono::system_clock::now());
+ std::shared_ptr<FileSystem> shared_ptr_fs(fs, [](FileSystem*) {});
Review Comment:
Uh, what is this for? Normally you can just use `fs->shared_from_this()`.
##########
cpp/src/arrow/util/thread_pool_test.cc:
##########
@@ -578,6 +578,55 @@ TEST_F(TestThreadPool, Spawn) {
SpawnAdds(pool.get(), 7, task_add<int>);
}
+TEST_F(TestThreadPool, TasksRunInPriorityOrder) {
+ auto pool = this->MakeThreadPool(1);
+ auto recorded_times = std::vector<std::chrono::system_clock::time_point>(10);
+ auto futures = std::vector<Future<int>>(10);
+ auto sleep_task = []() { SleepABit(); };
+
+ // Spawn a sleep task to block the pool while we add the other tasks. This
+ // ensures all the tasks are queued before any of them start running, so that
+ // their running order is fully determined by their priority.
+ ASSERT_OK(pool->Spawn(sleep_task));
+
+ for (int i = 0; i < 10; ++i) {
+ auto record_time = [&recorded_times, i]() {
+ recorded_times[i] = std::chrono::system_clock::now();
Review Comment:
We should use `steady_clock` to ensure that we get monotonic time
measurements.
##########
cpp/src/arrow/filesystem/test_util.cc:
##########
@@ -578,6 +578,67 @@ void GenericFileSystemTest::TestCopyFile(FileSystem* fs) {
AssertAllFiles(fs, {"AB/abc", "EF/ghi", "def"});
}
+void GenericFileSystemTest::TestCopyFiles(FileSystem* fs) {
+#if defined(ADDRESS_SANITIZER) || defined(ARROW_VALGRIND)
+ if (have_false_positive_memory_leak_with_async_close()) {
+ GTEST_SKIP() << "Filesystem have false positive memory leak with
generator";
+ }
+#endif
+ auto originalThreads = io::GetIOThreadPoolCapacity();
+ // Needs to be smaller than the number of files we test with to catch
GH-15233
+ ASSERT_OK(io::SetIOThreadPoolCapacity(2));
+ // Ensure the thread pool capacity is set back to the original value after
the test
+ auto resetThreadPool = [originalThreads](void*) {
Review Comment:
Here as well
##########
cpp/src/arrow/filesystem/test_util.cc:
##########
@@ -578,6 +578,67 @@ void GenericFileSystemTest::TestCopyFile(FileSystem* fs) {
AssertAllFiles(fs, {"AB/abc", "EF/ghi", "def"});
}
+void GenericFileSystemTest::TestCopyFiles(FileSystem* fs) {
+#if defined(ADDRESS_SANITIZER) || defined(ARROW_VALGRIND)
+ if (have_false_positive_memory_leak_with_async_close()) {
+ GTEST_SKIP() << "Filesystem have false positive memory leak with
generator";
+ }
+#endif
+ auto originalThreads = io::GetIOThreadPoolCapacity();
Review Comment:
Also it seems that we should actually call
`fs->io_context().executor()->capacity()`?
##########
cpp/src/arrow/filesystem/filesystem.cc:
##########
@@ -642,12 +645,31 @@ Status CopyFiles(const std::vector<FileLocator>& sources,
ARROW_ASSIGN_OR_RAISE(auto destination,
destinations[i].filesystem->OpenOutputStream(
destinations[i].path,
metadata));
RETURN_NOT_OK(internal::CopyStream(source, destination, chunk_size,
io_context));
- return destination->Close();
+ // Using the blocking Close() here can cause reduced performance and
deadlocks because
+ // FileSystem implementations that implement background_writes need to
queue and wait
+ // for other IO thread(s). There is a risk that most or all the threads in
the IO
+ // thread pool are blocking on a call Close(), leaving no IO threads left
to actually
+ // fulfil the background writes.
+ return destination->CloseAsync();
};
- return ::arrow::internal::OptionalParallelFor(
- use_threads, static_cast<int>(sources.size()), std::move(copy_one_file),
- io_context.executor());
+ // Spawn copy_one_file less urgently than default, so that background_writes
are done
+ // with higher priority. Otherwise copy_one_file will keep buffering more
data in memory
+ // without giving the background_writes any chance to upload the data and
drop it from
+ // memory. Therefore, without this large copies would cause OOMs.
Review Comment:
Thanks, taking a look now.
##########
cpp/src/arrow/filesystem/test_util.cc:
##########
@@ -578,6 +578,67 @@ void GenericFileSystemTest::TestCopyFile(FileSystem* fs) {
AssertAllFiles(fs, {"AB/abc", "EF/ghi", "def"});
}
+void GenericFileSystemTest::TestCopyFiles(FileSystem* fs) {
+#if defined(ADDRESS_SANITIZER) || defined(ARROW_VALGRIND)
+ if (have_false_positive_memory_leak_with_async_close()) {
+ GTEST_SKIP() << "Filesystem have false positive memory leak with
generator";
+ }
+#endif
+ auto originalThreads = io::GetIOThreadPoolCapacity();
Review Comment:
Please let's keep consistent with the coding conventions (`snake_case` for
local variables)
##########
cpp/src/arrow/io/interfaces.cc:
##########
@@ -68,8 +68,8 @@ Status SetIOThreadPoolCapacity(int threads) {
FileInterface::~FileInterface() = default;
Future<> FileInterface::CloseAsync() {
- return DeferNotOk(
- default_io_context().executor()->Submit([this]() { return Close(); }));
+ return DeferNotOk(default_io_context().executor()->Submit(
+ [self = shared_from_this()]() { return self->Close(); }));
Review Comment:
+1
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]