kou commented on code in PR #35672:
URL: https://github.com/apache/arrow/pull/35672#discussion_r1218911341


##########
cpp/src/arrow/testing/gtest_util.cc:
##########
@@ -725,8 +726,24 @@ void TestInitialized(const ArrayData& array) {
 }
 
 void SleepFor(double seconds) {
+#ifdef ARROW_ENABLE_THREADING
   std::this_thread::sleep_for(
       std::chrono::nanoseconds(static_cast<int64_t>(seconds * 1e9)));
+#else
+  std::chrono::duration<double> secs_left = 
std::chrono::duration<double>(seconds);

Review Comment:
   ```suggestion
     auto secs_left = std::chrono::duration<double>(seconds);
   ```



##########
cpp/src/arrow/testing/gtest_util.cc:
##########
@@ -1048,16 +1118,32 @@ class GatingTask::Impl : public 
std::enable_shared_from_this<GatingTask::Impl> {
                                  " seconds) waiting for the gating task to be 
unlocked");
     }
     num_finished_++;
+#else
+    // can't wait here for anything, so make a future to do the waiting
+    num_running_++;
+    auto future = RunTaskFuture();
+    future.Wait();
+    return;

Review Comment:
   ```suggestion
   ```



##########
cpp/src/arrow/testing/gtest_util.cc:
##########
@@ -725,8 +726,24 @@ void TestInitialized(const ArrayData& array) {
 }
 
 void SleepFor(double seconds) {
+#ifdef ARROW_ENABLE_THREADING
   std::this_thread::sleep_for(
       std::chrono::nanoseconds(static_cast<int64_t>(seconds * 1e9)));
+#else
+  std::chrono::duration<double> secs_left = 
std::chrono::duration<double>(seconds);
+  auto start_time = std::chrono::steady_clock::now();
+  auto end_time = start_time + secs_left;
+  auto now = start_time;
+  while (now < end_time) {

Review Comment:
   ```suggestion
     while (std::chrono::steady_clock::now() < end_time) {
   ```



##########
cpp/src/arrow/testing/gtest_util.cc:
##########
@@ -1048,16 +1118,32 @@ class GatingTask::Impl : public 
std::enable_shared_from_this<GatingTask::Impl> {
                                  " seconds) waiting for the gating task to be 
unlocked");
     }
     num_finished_++;
+#else
+    // can't wait here for anything, so make a future to do the waiting
+    num_running_++;
+    auto future = RunTaskFuture();
+    future.Wait();

Review Comment:
   Can we just use `BusyWait()` here?



##########
cpp/src/arrow/util/async_generator_test.cc:
##########
@@ -1486,13 +1490,24 @@ TEST(TestAsyncUtil, ReadaheadFailed) {
   // should all pass
   auto source = [&]() -> Future<TestInt> {
     auto count = counter++;
+#ifdef ARROW_ENABLE_THREADING
     return DeferNotOk(thread_pool->Submit([&, count]() -> Result<TestInt> {
       gating_task->Task()();
       if (count == 0) {
         return Status::Invalid("X");
       }
       return TestInt(count);
     }));
+#else
+    // if threading is disabled, we can't call Task() as we do below because 
it will
+    // never return and will block everything

Review Comment:
   Should we skip this test instead of adding codes here?



##########
cpp/src/arrow/util/thread_pool_test.cc:
##########
@@ -845,8 +877,11 @@ TEST_F(TestThreadPoolForkSafety, Basics) {
 }
 
 TEST_F(TestThreadPoolForkSafety, MultipleChildThreads) {
-  // ARROW-15593: race condition in after-fork ThreadPool reinitialization
-  // when SpawnReal() was called from multiple threads in a forked child.
+// ARROW-15593: race condition in after-fork ThreadPool reinitialization
+// when SpawnReal() was called from multiple threads in a forked child.

Review Comment:
   Could you move this after `GTEST_SKIP()`?



##########
cpp/src/arrow/testing/gtest_util.cc:
##########
@@ -725,8 +726,24 @@ void TestInitialized(const ArrayData& array) {
 }
 
 void SleepFor(double seconds) {
+#ifdef ARROW_ENABLE_THREADING
   std::this_thread::sleep_for(
       std::chrono::nanoseconds(static_cast<int64_t>(seconds * 1e9)));
+#else
+  std::chrono::duration<double> secs_left = 
std::chrono::duration<double>(seconds);
+  auto start_time = std::chrono::steady_clock::now();
+  auto end_time = start_time + secs_left;
+  auto now = start_time;
+  while (now < end_time) {
+    bool run_task = 
arrow::internal::SerialExecutor::RunTasksOnAllExecutors(true);
+    now = std::chrono::steady_clock::now();
+    if (!run_task) {
+      // all executors are empty, just sleep for the rest of the time
+      std::this_thread::sleep_for(end_time - now);

Review Comment:
   ```suggestion
         std::this_thread::sleep_for(end_time - 
std::chrono::steady_clock::now());
   ```



##########
cpp/src/arrow/util/async_generator_test.cc:
##########
@@ -1530,6 +1547,16 @@ TEST(TestAsyncUtil, ReadaheadFailedWaitForInFlight) {
       // These are our in-flight tasks
       return TestInt(0);
     }));
+#else
+    // if threading is disabled, we can't call Task() as we do below because 
it will
+    // never return and will block everything

Review Comment:
   Should we skip this test instead of adding codes here?



##########
cpp/src/arrow/util/future.cc:
##########
@@ -89,7 +90,7 @@ class ConcreteFutureImpl : public FutureImpl {
       case ShouldSchedule::IfUnfinished:
         return !in_add_callback;
       case ShouldSchedule::IfDifferentExecutor:
-        return !callback_record.options.executor->OwnsThisThread();
+        return !(callback_record.options.executor->IsCurrentExecutor());

Review Comment:
   Why do we need this change?
   Can we just disable (or error) this feature without threading?



##########
cpp/src/arrow/util/thread_pool_test.cc:
##########
@@ -341,6 +342,7 @@ TEST(SerialExecutor, AsyncGeneratorWithCleanup) {
   // must run before the terminal item is delivered from the iterator.
   bool follow_up_ran = false;
   Iterator<TestInt> iter =
+

Review Comment:
   ```suggestion
   ```



##########
cpp/src/arrow/util/thread_pool.cc:
##########
@@ -183,6 +247,105 @@ void SerialExecutor::RunLoop() {
   }
   state_->current_thread = {};
 }
+#else   // ARROW_ENABLE_THREADING
+bool SerialExecutor::RunTasksOnAllExecutors(bool once_only) {

Review Comment:
   Why do we need this?
   Can we disable (or error) features that need this method?



##########
cpp/src/arrow/util/thread_pool.cc:
##########
@@ -532,6 +695,65 @@ int ThreadPool::DefaultCapacity() {
   return capacity;
 }
 
+#else  // ARROW_ENABLE_THREADING
+ThreadPool::ThreadPool() {

Review Comment:
   Can we return `Status::NotImplemented` from all `ThreadPool` methods instead 
of implementing `ThreadPool` without threading?



##########
cpp/src/arrow/util/thread_pool_test.cc:
##########
@@ -598,8 +603,11 @@ TEST_F(TestThreadPool, OwnsCurrentThread) {
   ASSERT_FALSE(pool->OwnsThisThread());
   ASSERT_FALSE(one_failed);
 }
-

Review Comment:
   Could you revert all needless changes?



##########
cpp/src/arrow/util/future.cc:
##########
@@ -149,17 +150,38 @@ class ConcreteFutureImpl : public FutureImpl {
   }
 
   void DoWait() {
+#ifdef ARROW_ENABLE_THREADING
     std::unique_lock<std::mutex> lock(mutex_);
-

Review Comment:
   Could you revert needless change?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to