westonpace commented on code in PR #13771:
URL: https://github.com/apache/arrow/pull/13771#discussion_r938201199
##########
cpp/src/arrow/compute/exec/asof_join_benchmark.cc:
##########
@@ -49,6 +49,20 @@ static Result<TableStats> MakeTable(const
TableGenerationProperties& properties)
return Result<TableStats>({table, rows, rows * row_size});
}
+// As opposed to using table_source, we create a make a ReaderGenerator for
the specified
+// table. This allows us to specify a thread pool to isolate the threads used
for each
+// source as an anti-deadlocking mechanism.
+static ExecNode* MakeTableSourceNode(std::shared_ptr<arrow::compute::ExecPlan>
plan,
Review Comment:
This is a minor nitpick but you can probably just pass `ExecPlan*` here
since that is all `MakeExecNode` requires. When you pass `std::shared_ptr` you
are either forcing a copy of the `shared_ptr` (unless the caller can
`std::move` which I don't think is the case here). In general that isn't a big
deal (one atomic increment) but it's good to get in the practice of only using
what you need.
##########
cpp/src/arrow/compute/exec/asof_join_benchmark.cc:
##########
@@ -77,14 +91,20 @@ static void TableJoinOverhead(benchmark::State& state,
state.PauseTiming();
ASSERT_OK_AND_ASSIGN(std::shared_ptr<arrow::compute::ExecPlan> plan,
ExecPlan::Make(&ctx));
- std::vector<ExecNode*> input_nodes = {*arrow::compute::MakeExecNode(
- "table_source", plan.get(), {},
- arrow::compute::TableSourceNodeOptions(left_table_stats.table,
batch_size))};
+ // Each source (for each table) is dedicated its own thread pool to prevent
+ // deadlocking.
+ std::vector<std::shared_ptr<arrow::internal::ThreadPool>>
right_thread_pools;
+ std::shared_ptr<arrow::internal::ThreadPool> left_thread_pool =
+ *arrow::internal::ThreadPool::MakeEternal(1);
+ std::vector<ExecNode*> input_nodes = {MakeTableSourceNode(
+ plan, left_table_stats.table, batch_size, left_thread_pool.get())};
input_nodes.reserve(right_input_tables.size() + 1);
for (TableStats table_stats : right_input_tables) {
- input_nodes.push_back(*arrow::compute::MakeExecNode(
- "table_source", plan.get(), {},
- arrow::compute::TableSourceNodeOptions(table_stats.table,
batch_size)));
+ std::shared_ptr<arrow::internal::ThreadPool> right_thread_pool =
+ *arrow::internal::ThreadPool::MakeEternal(1);
+ right_thread_pools.push_back(right_thread_pool);
+ input_nodes.push_back(MakeTableSourceNode(plan, table_stats.table,
batch_size,
+ right_thread_pool.get()));
Review Comment:
```suggestion
input_nodes.push_back(MakeTableSourceNode(plan, table_stats.table,
batch_size,
right_thread_pool.get()));
right_thread_pools.push_back(std::move(right_thread_pool));
```
##########
cpp/src/arrow/compute/exec/asof_join_node.cc:
##########
@@ -249,12 +263,23 @@ class InputState {
return updated;
}
- void Push(const std::shared_ptr<arrow::RecordBatch>& rb) {
+ Status Push(const std::shared_ptr<arrow::RecordBatch>& rb) {
if (rb->num_rows() > 0) {
+ int64_t batch_earliest_time =
+ rb->column_data(time_col_index_)->GetValues<int64_t>(1)[0];
+ int64_t batch_latest_time =
+
rb->column_data(time_col_index_)->GetValues<int64_t>(1)[rb->num_rows() - 1];
+ // Batches must be in order
+ if (batch_earliest_time < latest_time_) {
+ ARROW_RETURN_NOT_OK(Status::Invalid("Batches out of order."));
Review Comment:
```suggestion
return Status::Invalid("Batches out of order.");
```
##########
cpp/src/arrow/compute/exec/asof_join_benchmark.cc:
##########
@@ -49,6 +49,20 @@ static Result<TableStats> MakeTable(const
TableGenerationProperties& properties)
return Result<TableStats>({table, rows, rows * row_size});
}
+// As opposed to using table_source, we create a make a ReaderGenerator for
the specified
+// table. This allows us to specify a thread pool to isolate the threads used
for each
+// source as an anti-deadlocking mechanism.
Review Comment:
```suggestion
// As opposed to using table_source, we create a ReaderGenerator for the
specified
// table. This allows us to specify a thread pool to isolate the threads
used for each
// source as an anti-deadlocking mechanism.
```
##########
cpp/src/arrow/compute/exec/asof_join_benchmark.cc:
##########
@@ -49,6 +49,20 @@ static Result<TableStats> MakeTable(const
TableGenerationProperties& properties)
return Result<TableStats>({table, rows, rows * row_size});
}
+// As opposed to using table_source, we create a make a ReaderGenerator for
the specified
+// table. This allows us to specify a thread pool to isolate the threads used
for each
+// source as an anti-deadlocking mechanism.
+static ExecNode* MakeTableSourceNode(std::shared_ptr<arrow::compute::ExecPlan>
plan,
+ std::shared_ptr<Table> table, int
batch_size,
+ arrow::internal::ThreadPool* thread_pool)
{
+ std::shared_ptr<TableBatchReader> reader =
std::make_shared<TableBatchReader>(table);
Review Comment:
```suggestion
std::shared_ptr<TableBatchReader> reader =
std::make_shared<TableBatchReader>(std::move(table));
```
Another nitpick to avoid a shared_ptr copy.
##########
cpp/src/arrow/compute/exec/asof_join_node.cc:
##########
@@ -46,52 +46,66 @@ typedef uint64_t row_index_t;
typedef int col_index_t;
/**
- * Simple implementation for an unbound concurrent queue
+ * Simple implementation for an bounded concurrent queue
*/
template <class T>
-class ConcurrentQueue {
+class ConcurrentBoundedQueue {
+ size_t _remaining;
+ std::vector<T> _buffer;
+ mutable std::mutex _gate;
+ std::condition_variable _not_full;
+ std::condition_variable _not_empty;
+
+ size_t _next_push = 0;
+ size_t _next_pop = 0;
Review Comment:
```suggestion
size_t remaining_;
std::vector<T> buffer_;
mutable std::mutex gate_;
std::condition_variable not_full_;
std::condition_variable not_empty_;
size_t next_push_ = 0;
size_t next_pop_ = 0;
```
##########
cpp/src/arrow/compute/exec/asof_join_node.cc:
##########
@@ -46,52 +46,66 @@ typedef uint64_t row_index_t;
typedef int col_index_t;
/**
- * Simple implementation for an unbound concurrent queue
+ * Simple implementation for an bounded concurrent queue
Review Comment:
```suggestion
* Simple implementation for a bounded concurrent queue
```
##########
cpp/src/arrow/compute/exec/asof_join_benchmark.cc:
##########
@@ -49,6 +49,20 @@ static Result<TableStats> MakeTable(const
TableGenerationProperties& properties)
return Result<TableStats>({table, rows, rows * row_size});
}
+// As opposed to using table_source, we create a make a ReaderGenerator for
the specified
+// table. This allows us to specify a thread pool to isolate the threads used
for each
+// source as an anti-deadlocking mechanism.
+static ExecNode* MakeTableSourceNode(std::shared_ptr<arrow::compute::ExecPlan>
plan,
+ std::shared_ptr<Table> table, int
batch_size,
+ arrow::internal::ThreadPool* thread_pool)
{
+ std::shared_ptr<TableBatchReader> reader =
std::make_shared<TableBatchReader>(table);
+ reader->set_chunksize(batch_size);
+ auto batch_gen = *arrow::compute::MakeReaderGenerator(std::move(reader),
thread_pool);
+ return *arrow::compute::MakeExecNode(
+ "source", plan.get(), {},
+ arrow::compute::SourceNodeOptions(table->schema(), batch_gen));
Review Comment:
```suggestion
arrow::compute::SourceNodeOptions(table->schema(),
std::move(batch_gen)));
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]