kou commented on code in PR #35672:
URL: https://github.com/apache/arrow/pull/35672#discussion_r1218911341
##########
cpp/src/arrow/testing/gtest_util.cc:
##########
@@ -725,8 +726,24 @@ void TestInitialized(const ArrayData& array) {
}
void SleepFor(double seconds) {
+#ifdef ARROW_ENABLE_THREADING
std::this_thread::sleep_for(
std::chrono::nanoseconds(static_cast<int64_t>(seconds * 1e9)));
+#else
+ std::chrono::duration<double> secs_left =
std::chrono::duration<double>(seconds);
Review Comment:
```suggestion
auto secs_left = std::chrono::duration<double>(seconds);
```
##########
cpp/src/arrow/testing/gtest_util.cc:
##########
@@ -1048,16 +1118,32 @@ class GatingTask::Impl : public
std::enable_shared_from_this<GatingTask::Impl> {
" seconds) waiting for the gating task to be
unlocked");
}
num_finished_++;
+#else
+ // can't wait here for anything, so make a future to do the waiting
+ num_running_++;
+ auto future = RunTaskFuture();
+ future.Wait();
+ return;
Review Comment:
```suggestion
```
##########
cpp/src/arrow/testing/gtest_util.cc:
##########
@@ -725,8 +726,24 @@ void TestInitialized(const ArrayData& array) {
}
void SleepFor(double seconds) {
+#ifdef ARROW_ENABLE_THREADING
std::this_thread::sleep_for(
std::chrono::nanoseconds(static_cast<int64_t>(seconds * 1e9)));
+#else
+ std::chrono::duration<double> secs_left =
std::chrono::duration<double>(seconds);
+ auto start_time = std::chrono::steady_clock::now();
+ auto end_time = start_time + secs_left;
+ auto now = start_time;
+ while (now < end_time) {
Review Comment:
```suggestion
while (std::chrono::steady_clock::now() < end_time) {
```
##########
cpp/src/arrow/testing/gtest_util.cc:
##########
@@ -1048,16 +1118,32 @@ class GatingTask::Impl : public
std::enable_shared_from_this<GatingTask::Impl> {
" seconds) waiting for the gating task to be
unlocked");
}
num_finished_++;
+#else
+ // can't wait here for anything, so make a future to do the waiting
+ num_running_++;
+ auto future = RunTaskFuture();
+ future.Wait();
Review Comment:
Can we just use `BusyWait()` here?
##########
cpp/src/arrow/util/async_generator_test.cc:
##########
@@ -1486,13 +1490,24 @@ TEST(TestAsyncUtil, ReadaheadFailed) {
// should all pass
auto source = [&]() -> Future<TestInt> {
auto count = counter++;
+#ifdef ARROW_ENABLE_THREADING
return DeferNotOk(thread_pool->Submit([&, count]() -> Result<TestInt> {
gating_task->Task()();
if (count == 0) {
return Status::Invalid("X");
}
return TestInt(count);
}));
+#else
+ // if threading is disabled, we can't call Task() as we do below because
it will
+ // never return and will block everything
Review Comment:
Should we skip this test instead of adding codes here?
##########
cpp/src/arrow/util/thread_pool_test.cc:
##########
@@ -845,8 +877,11 @@ TEST_F(TestThreadPoolForkSafety, Basics) {
}
TEST_F(TestThreadPoolForkSafety, MultipleChildThreads) {
- // ARROW-15593: race condition in after-fork ThreadPool reinitialization
- // when SpawnReal() was called from multiple threads in a forked child.
+// ARROW-15593: race condition in after-fork ThreadPool reinitialization
+// when SpawnReal() was called from multiple threads in a forked child.
Review Comment:
Could you move this after `GTEST_SKIP()`?
##########
cpp/src/arrow/testing/gtest_util.cc:
##########
@@ -725,8 +726,24 @@ void TestInitialized(const ArrayData& array) {
}
void SleepFor(double seconds) {
+#ifdef ARROW_ENABLE_THREADING
std::this_thread::sleep_for(
std::chrono::nanoseconds(static_cast<int64_t>(seconds * 1e9)));
+#else
+ std::chrono::duration<double> secs_left =
std::chrono::duration<double>(seconds);
+ auto start_time = std::chrono::steady_clock::now();
+ auto end_time = start_time + secs_left;
+ auto now = start_time;
+ while (now < end_time) {
+ bool run_task =
arrow::internal::SerialExecutor::RunTasksOnAllExecutors(true);
+ now = std::chrono::steady_clock::now();
+ if (!run_task) {
+ // all executors are empty, just sleep for the rest of the time
+ std::this_thread::sleep_for(end_time - now);
Review Comment:
```suggestion
std::this_thread::sleep_for(end_time -
std::chrono::steady_clock::now());
```
##########
cpp/src/arrow/util/async_generator_test.cc:
##########
@@ -1530,6 +1547,16 @@ TEST(TestAsyncUtil, ReadaheadFailedWaitForInFlight) {
// These are our in-flight tasks
return TestInt(0);
}));
+#else
+ // if threading is disabled, we can't call Task() as we do below because
it will
+ // never return and will block everything
Review Comment:
Should we skip this test instead of adding codes here?
##########
cpp/src/arrow/util/future.cc:
##########
@@ -89,7 +90,7 @@ class ConcreteFutureImpl : public FutureImpl {
case ShouldSchedule::IfUnfinished:
return !in_add_callback;
case ShouldSchedule::IfDifferentExecutor:
- return !callback_record.options.executor->OwnsThisThread();
+ return !(callback_record.options.executor->IsCurrentExecutor());
Review Comment:
Why do we need this change?
Can we just disable (or error) this feature without threading?
##########
cpp/src/arrow/util/thread_pool_test.cc:
##########
@@ -341,6 +342,7 @@ TEST(SerialExecutor, AsyncGeneratorWithCleanup) {
// must run before the terminal item is delivered from the iterator.
bool follow_up_ran = false;
Iterator<TestInt> iter =
+
Review Comment:
```suggestion
```
##########
cpp/src/arrow/util/thread_pool.cc:
##########
@@ -183,6 +247,105 @@ void SerialExecutor::RunLoop() {
}
state_->current_thread = {};
}
+#else // ARROW_ENABLE_THREADING
+bool SerialExecutor::RunTasksOnAllExecutors(bool once_only) {
Review Comment:
Why do we need this?
Can we disable (or error) features that need this method?
##########
cpp/src/arrow/util/thread_pool.cc:
##########
@@ -532,6 +695,65 @@ int ThreadPool::DefaultCapacity() {
return capacity;
}
+#else // ARROW_ENABLE_THREADING
+ThreadPool::ThreadPool() {
Review Comment:
Can we return `Status::NotImplemented` from all `ThreadPool` methods instead
of implementing `ThreadPool` without threading?
##########
cpp/src/arrow/util/thread_pool_test.cc:
##########
@@ -598,8 +603,11 @@ TEST_F(TestThreadPool, OwnsCurrentThread) {
ASSERT_FALSE(pool->OwnsThisThread());
ASSERT_FALSE(one_failed);
}
-
Review Comment:
Could you revert all needless changes?
##########
cpp/src/arrow/util/future.cc:
##########
@@ -149,17 +150,38 @@ class ConcreteFutureImpl : public FutureImpl {
}
void DoWait() {
+#ifdef ARROW_ENABLE_THREADING
std::unique_lock<std::mutex> lock(mutex_);
-
Review Comment:
Could you revert needless change?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]