zanmato1984 commented on code in PR #45421:
URL: https://github.com/apache/arrow/pull/45421#discussion_r1979218164
##########
cpp/src/arrow/acero/concurrent_queue_internal.h:
##########
@@ -35,7 +36,12 @@ class ConcurrentQueue {
//
T Pop() {
Review Comment:
Shall we just remove the `Pop()` method considering there is no usage of it
at all.
##########
cpp/src/arrow/acero/concurrent_queue_internal.h:
##########
@@ -33,9 +34,15 @@ class ConcurrentQueue {
public:
// Pops the last item from the queue. Must be called on a non-empty queue
//
+
T Pop() {
std::unique_lock<std::mutex> lock(mutex_);
- cond_.wait(lock, [&] { return !queue_.empty(); });
+ return PopUnlocked();
+ }
+
+ T WaitAndPop() {
Review Comment:
```suggestion
// Pops the last item from the queue but waits if the queue is empty until
new items are
// pushed.
T WaitAndPop() {
```
##########
cpp/src/arrow/acero/util_test.cc:
##########
@@ -184,5 +185,120 @@ TEST(FieldMap, ExtensionTypeHashJoin) {
EXPECT_EQ(i.get(0), 0);
}
+template <typename Queue>
+void ConcurrentQueueBasicTest(Queue& queue) {
+#ifndef ARROW_ENABLE_THREADING
+ GTEST_SKIP() << "Test requires threading enabled";
+#endif
+ ASSERT_TRUE(queue.Empty());
+ queue.Push(1);
+ ASSERT_FALSE(queue.Empty());
+ ASSERT_EQ(queue.Pop(), 1);
+ ASSERT_TRUE(queue.Empty());
+
+ auto fut_pop = std::async(std::launch::async, [&]() { return
queue.WaitAndPop(); });
+ ASSERT_EQ(fut_pop.wait_for(std::chrono::milliseconds(10)),
std::future_status::timeout);
+ queue.Push(2);
+ queue.Push(3);
+ queue.Push(4);
+ ASSERT_EQ(fut_pop.wait_for(std::chrono::milliseconds(10)),
std::future_status::ready);
+ ASSERT_EQ(fut_pop.get(), 2);
+ fut_pop = std::async(std::launch::async, [&]() { return queue.WaitAndPop();
});
+ ASSERT_EQ(fut_pop.wait_for(std::chrono::milliseconds(10)),
std::future_status::ready);
+ ASSERT_EQ(fut_pop.get(), 3);
+ ASSERT_FALSE(queue.Empty());
+ ASSERT_EQ(queue.TryPop(), std::make_optional(4));
+ ASSERT_EQ(queue.TryPop(), std::nullopt);
+ queue.Push(5);
+ ASSERT_FALSE(queue.Empty());
+ ASSERT_EQ(queue.Front(), 5);
+ ASSERT_FALSE(queue.Empty());
+ queue.Clear();
+ ASSERT_TRUE(queue.Empty());
+}
+
+TEST(ConcurrentQueue, BasicTest) {
+ ConcurrentQueue<int> queue;
+ ConcurrentQueueBasicTest(queue);
+}
+
+class BackpressureTestExecNode : public ExecNode {
+ public:
+ BackpressureTestExecNode() : ExecNode(nullptr, {}, {}, nullptr) {}
+ const char* kind_name() const { return "BackpressureTestNode"; }
+ Status InputReceived(ExecNode* input, ExecBatch batch) override {
+ return Status::NotImplemented("Test only node");
+ }
+ Status InputFinished(ExecNode* input, int total_batches) override {
+ return Status::NotImplemented("Test only node");
+ }
+ Status StartProducing() override { return Status::NotImplemented("Test only
node"); }
+
+ protected:
+ Status StopProducingImpl() override {
+ stopped = true;
+ return Status::OK();
+ }
+
+ public:
+ void PauseProducing(ExecNode* output, int32_t counter) override { paused =
true; }
+ void ResumeProducing(ExecNode* output, int32_t counter) override { paused =
false; }
+ bool paused{false};
+ bool stopped{false};
+};
+
+class TestBackpressureControl : public BackpressureControl {
+ public:
+ explicit TestBackpressureControl(BackpressureTestExecNode* testNode)
+ : testNode(testNode) {}
+ virtual void Pause() { testNode->PauseProducing(nullptr, 0); }
+ virtual void Resume() { testNode->ResumeProducing(nullptr, 0); }
+ BackpressureTestExecNode* testNode;
Review Comment:
Please use snake case for this variable and the ones in the following test.
##########
cpp/src/arrow/acero/concurrent_queue_internal.h:
##########
@@ -33,9 +34,15 @@ class ConcurrentQueue {
public:
// Pops the last item from the queue. Must be called on a non-empty queue
//
+
T Pop() {
std::unique_lock<std::mutex> lock(mutex_);
- cond_.wait(lock, [&] { return !queue_.empty(); });
+ return PopUnlocked();
+ }
+
+ T WaitAndPop() {
Review Comment:
Let's add some comments like other methods in this class.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]