zanmato1984 commented on code in PR #43698:
URL: https://github.com/apache/arrow/pull/43698#discussion_r2335291632


##########
cpp/src/arrow/dataset/file_parquet_test.cc:
##########
@@ -133,6 +135,30 @@ class ParquetFormatHelper {
   }
 };
 
+class DelayedBufferReader : public ::arrow::io::BufferReader {
+ public:
+  explicit DelayedBufferReader(const std::shared_ptr<::arrow::Buffer>& buffer)
+      : ::arrow::io::BufferReader(buffer) {}
+
+  ::arrow::Future<std::shared_ptr<Buffer>> ReadAsync(
+      const ::arrow::io::IOContext& io_context, int64_t position,
+      int64_t nbytes) override {
+    read_async_count.fetch_add(1);
+    auto self = 
std::dynamic_pointer_cast<DelayedBufferReader>(shared_from_this());
+    return DeferNotOk(::arrow::io::internal::SubmitIO(
+        io_context, [self, position, nbytes]() -> 
Result<std::shared_ptr<Buffer>> {
+          std::this_thread::sleep_for(std::chrono::seconds(1));
+          return self->DoReadAt(position, nbytes);
+        }));
+  }
+
+  std::atomic<int> read_async_count{0};
+};
+
+using OptionsCustomizer =
+    std::function<void(std::shared_ptr<ScanOptions>&,
+                       
std::vector<std::shared_ptr<arrow::internal::ThreadPool>>&)>;

Review Comment:
   1. Make the type name more verbose.
   2. Only one thread pool should do - we don't need a list of them.
   3. Ensure `ScanOptions`'s validity by using reference, instead of shared 
pointer.
   4. Use raw pointer of thread pool to decouple the lifecycle management.



##########
cpp/src/arrow/dataset/file_parquet_test.cc:
##########
@@ -183,6 +209,55 @@ class TestParquetFileFormat : public 
FileFormatFixtureMixin<ParquetFormatHelper>
       EXPECT_EQ(SingleBatch(parquet_fragment.get())->num_rows(), expected + 1);
     }
   }
+
+  void TestMultithreadedRegression(OptionsCustomizer customizer) {
+    // GH-38438: This test is similar to MultithreadedScan, but it try to use 
self
+    // designed Executor and DelayedBufferReader to mock async execution to 
make
+    // the state machine more complex.

Review Comment:
   These comments should probably remain in 
`TestParquetFileFormat.MultithreadedScanRegression` instead of here.



##########
cpp/src/arrow/dataset/file_parquet_test.cc:
##########
@@ -133,6 +135,30 @@ class ParquetFormatHelper {
   }
 };
 
+class DelayedBufferReader : public ::arrow::io::BufferReader {
+ public:
+  explicit DelayedBufferReader(const std::shared_ptr<::arrow::Buffer>& buffer)
+      : ::arrow::io::BufferReader(buffer) {}
+
+  ::arrow::Future<std::shared_ptr<Buffer>> ReadAsync(
+      const ::arrow::io::IOContext& io_context, int64_t position,
+      int64_t nbytes) override {
+    read_async_count.fetch_add(1);
+    auto self = 
std::dynamic_pointer_cast<DelayedBufferReader>(shared_from_this());
+    return DeferNotOk(::arrow::io::internal::SubmitIO(
+        io_context, [self, position, nbytes]() -> 
Result<std::shared_ptr<Buffer>> {
+          std::this_thread::sleep_for(std::chrono::seconds(1));
+          return self->DoReadAt(position, nbytes);
+        }));
+  }
+
+  std::atomic<int> read_async_count{0};
+};
+
+using OptionsCustomizer =
+    std::function<void(std::shared_ptr<ScanOptions>&,
+                       
std::vector<std::shared_ptr<arrow::internal::ThreadPool>>&)>;

Review Comment:
   ```suggestion
   using CustomizeScanOptionsWithThreadPool =
       std::function<void(ScanOptions &,
                          arrow::internal::ThreadPool*>;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to