This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new bf737b12530 [Improvement](local shuffle) Improve local shuffle
strategy (#41789)
bf737b12530 is described below
commit bf737b12530301cd3e2dd6d50918cc3051e76a7c
Author: Gabriel <[email protected]>
AuthorDate: Fri Oct 25 10:20:23 2024 +0800
[Improvement](local shuffle) Improve local shuffle strategy (#41789)
Add local shuffle to unpartitioned fragment to add parallel for
perfomance
```sql
SELECT h1.UserID, h2.URL, COUNT(*) AS visit_count
FROM (
SELECT *
FROM hits_10m
LIMIT 5000
) AS h1
CROSS JOIN (
SELECT *
FROM hits_10m
LIMIT 5000
) AS h2
GROUP BY h1.UserID, h2.URL
ORDER BY visit_count DESC
LIMIT 1000
```
Add a rule to apply local exchanger:
```
┌───────────────────────┐ ┌───────────────────────┐
│ │ │ │
│Exchange(UNPARTITIONED)│ │Exchange(UNPARTITIONED)│
│ │ │ │
└───────────────────────┴──────┬────────┴───────────────────────┘
│
│
│
│
│
│
┌──────▼──────┐
│ │
│ CROSS JOIN │
│ │
└──────┬──────┘
│
│
│
┌──────────────────▼─────────────────────┐
│ │
│ LOCAL EXCHANGE (HASH PARTITION) 1 -> n │
│ │
└──────────────────┬─────────────────────┘
│
│
│
│
▼ ┌──▼────┐
│ │
│ AGG │
│ │
└───────┘
```
before: 1 min 17.79 sec
after: 16.73 sec
---
be/src/pipeline/dependency.h | 8 +-
be/src/pipeline/exec/aggregation_sink_operator.cpp | 5 +-
be/src/pipeline/exec/aggregation_sink_operator.h | 7 +-
.../pipeline/exec/aggregation_source_operator.cpp | 4 +-
be/src/pipeline/exec/analytic_sink_operator.cpp | 4 +-
be/src/pipeline/exec/analytic_source_operator.cpp | 1 +
be/src/pipeline/exec/assert_num_rows_operator.cpp | 1 +
.../distinct_streaming_aggregation_operator.cpp | 4 +-
.../exec/distinct_streaming_aggregation_operator.h | 4 +
be/src/pipeline/exec/exchange_sink_operator.h | 1 +
be/src/pipeline/exec/join_build_sink_operator.cpp | 2 +
be/src/pipeline/exec/join_probe_operator.cpp | 1 +
.../exec/nested_loop_join_probe_operator.h | 4 +-
be/src/pipeline/exec/operator.cpp | 9 +-
be/src/pipeline/exec/operator.h | 6 +-
.../partitioned_aggregation_source_operator.cpp | 4 +
.../exec/partitioned_aggregation_source_operator.h | 2 +
be/src/pipeline/exec/sort_sink_operator.cpp | 4 +-
be/src/pipeline/exec/sort_sink_operator.h | 3 +-
be/src/pipeline/exec/sort_source_operator.cpp | 4 +-
be/src/pipeline/exec/union_source_operator.h | 4 +-
.../local_exchange_sink_operator.cpp | 6 +-
.../local_exchange/local_exchange_sink_operator.h | 2 +-
be/src/pipeline/pipeline.cpp | 43 +++++++-
be/src/pipeline/pipeline.h | 44 ++++----
be/src/pipeline/pipeline_fragment_context.cpp | 120 +++++++++++++--------
be/src/pipeline/pipeline_fragment_context.h | 6 +-
.../org/apache/doris/planner/AggregationNode.java | 5 +
.../org/apache/doris/planner/AnalyticEvalNode.java | 5 +
.../apache/doris/planner/AssertNumRowsNode.java | 5 +
.../org/apache/doris/planner/DataPartition.java | 4 +
.../org/apache/doris/planner/EmptySetNode.java | 4 +
.../org/apache/doris/planner/ExchangeNode.java | 10 ++
.../org/apache/doris/planner/JoinNodeBase.java | 1 -
.../apache/doris/planner/NestedLoopJoinNode.java | 17 +++
.../org/apache/doris/planner/PlanFragment.java | 35 ++++++
.../java/org/apache/doris/planner/PlanNode.java | 10 ++
.../java/org/apache/doris/planner/RepeatNode.java | 5 +
.../java/org/apache/doris/planner/ScanNode.java | 5 +
.../java/org/apache/doris/planner/SelectNode.java | 5 +
.../java/org/apache/doris/planner/SortNode.java | 5 +
.../java/org/apache/doris/planner/UnionNode.java | 5 +
.../main/java/org/apache/doris/qe/Coordinator.java | 22 ++++
gensrc/thrift/PlanNodes.thrift | 1 +
gensrc/thrift/Planner.thrift | 4 +
.../insert_into_table/complex_insert.groovy | 6 +-
.../distribute/local_shuffle.groovy | 12 +--
47 files changed, 361 insertions(+), 108 deletions(-)
diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h
index 0885dbf380f..8060ee8362d 100644
--- a/be/src/pipeline/dependency.h
+++ b/be/src/pipeline/dependency.h
@@ -111,19 +111,19 @@ public:
// Notify downstream pipeline tasks this dependency is ready.
void set_ready();
void set_ready_to_read() {
- DCHECK(_shared_state->source_deps.size() == 1) << debug_string();
+ DCHECK_EQ(_shared_state->source_deps.size(), 1) << debug_string();
_shared_state->source_deps.front()->set_ready();
}
void set_block_to_read() {
- DCHECK(_shared_state->source_deps.size() == 1) << debug_string();
+ DCHECK_EQ(_shared_state->source_deps.size(), 1) << debug_string();
_shared_state->source_deps.front()->block();
}
void set_ready_to_write() {
- DCHECK(_shared_state->sink_deps.size() == 1) << debug_string();
+ DCHECK_EQ(_shared_state->sink_deps.size(), 1) << debug_string();
_shared_state->sink_deps.front()->set_ready();
}
void set_block_to_write() {
- DCHECK(_shared_state->sink_deps.size() == 1) << debug_string();
+ DCHECK_EQ(_shared_state->sink_deps.size(), 1) << debug_string();
_shared_state->sink_deps.front()->block();
}
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp
b/be/src/pipeline/exec/aggregation_sink_operator.cpp
index 4007f50f58a..5fb14c02585 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp
@@ -717,7 +717,10 @@ AggSinkOperatorX::AggSinkOperatorX(ObjectPool* pool, int
operator_id, const TPla
: tnode.agg_node.grouping_exprs),
_is_colocate(tnode.agg_node.__isset.is_colocate &&
tnode.agg_node.is_colocate),
_require_bucket_distribution(require_bucket_distribution),
- _agg_fn_output_row_descriptor(descs, tnode.row_tuples,
tnode.nullable_tuples) {}
+ _agg_fn_output_row_descriptor(descs, tnode.row_tuples,
tnode.nullable_tuples),
+ _without_key(tnode.agg_node.grouping_exprs.empty()) {
+ _is_serial_operator = tnode.__isset.is_serial_operator &&
tnode.is_serial_operator;
+}
Status AggSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
RETURN_IF_ERROR(DataSinkOperatorX<AggSinkLocalState>::init(tnode, state));
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h
b/be/src/pipeline/exec/aggregation_sink_operator.h
index 1f846ec88ff..8271f1451b4 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/aggregation_sink_operator.h
@@ -143,9 +143,8 @@ public:
DataDistribution required_data_distribution() const override {
if (_probe_expr_ctxs.empty()) {
- return _needs_finalize ||
DataSinkOperatorX<AggSinkLocalState>::_child
- ->ignore_data_distribution()
- ? DataDistribution(ExchangeType::PASSTHROUGH)
+ return _needs_finalize
+ ? DataDistribution(ExchangeType::NOOP)
:
DataSinkOperatorX<AggSinkLocalState>::required_data_distribution();
}
return _is_colocate && _require_bucket_distribution &&
!_followed_by_shuffled_operator
@@ -204,8 +203,8 @@ protected:
const std::vector<TExpr> _partition_exprs;
const bool _is_colocate;
const bool _require_bucket_distribution;
-
RowDescriptor _agg_fn_output_row_descriptor;
+ const bool _without_key;
};
} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/aggregation_source_operator.cpp
b/be/src/pipeline/exec/aggregation_source_operator.cpp
index c68601fcdca..6d4cd291079 100644
--- a/be/src/pipeline/exec/aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_source_operator.cpp
@@ -441,7 +441,9 @@ AggSourceOperatorX::AggSourceOperatorX(ObjectPool* pool,
const TPlanNode& tnode,
const DescriptorTbl& descs)
: Base(pool, tnode, operator_id, descs),
_needs_finalize(tnode.agg_node.need_finalize),
- _without_key(tnode.agg_node.grouping_exprs.empty()) {}
+ _without_key(tnode.agg_node.grouping_exprs.empty()) {
+ _is_serial_operator = tnode.__isset.is_serial_operator &&
tnode.is_serial_operator;
+}
Status AggSourceOperatorX::get_block(RuntimeState* state, vectorized::Block*
block, bool* eos) {
auto& local_state = get_local_state(state);
diff --git a/be/src/pipeline/exec/analytic_sink_operator.cpp
b/be/src/pipeline/exec/analytic_sink_operator.cpp
index 1da5c1f7c35..afe9aeab8fd 100644
--- a/be/src/pipeline/exec/analytic_sink_operator.cpp
+++ b/be/src/pipeline/exec/analytic_sink_operator.cpp
@@ -201,7 +201,9 @@ AnalyticSinkOperatorX::AnalyticSinkOperatorX(ObjectPool*
pool, int operator_id,
_require_bucket_distribution(require_bucket_distribution),
_partition_exprs(tnode.__isset.distribute_expr_lists &&
require_bucket_distribution
? tnode.distribute_expr_lists[0]
- : tnode.analytic_node.partition_exprs) {}
+ : tnode.analytic_node.partition_exprs) {
+ _is_serial_operator = tnode.__isset.is_serial_operator &&
tnode.is_serial_operator;
+}
Status AnalyticSinkOperatorX::init(const TPlanNode& tnode, RuntimeState*
state) {
RETURN_IF_ERROR(DataSinkOperatorX::init(tnode, state));
diff --git a/be/src/pipeline/exec/analytic_source_operator.cpp
b/be/src/pipeline/exec/analytic_source_operator.cpp
index 134a0ad82d7..019f95042c2 100644
--- a/be/src/pipeline/exec/analytic_source_operator.cpp
+++ b/be/src/pipeline/exec/analytic_source_operator.cpp
@@ -475,6 +475,7 @@
AnalyticSourceOperatorX::AnalyticSourceOperatorX(ObjectPool* pool, const TPlanNo
_has_range_window(tnode.analytic_node.window.type ==
TAnalyticWindowType::RANGE),
_has_window_start(tnode.analytic_node.window.__isset.window_start),
_has_window_end(tnode.analytic_node.window.__isset.window_end) {
+ _is_serial_operator = tnode.__isset.is_serial_operator &&
tnode.is_serial_operator;
_fn_scope = AnalyticFnScope::PARTITION;
if (tnode.analytic_node.__isset.window &&
tnode.analytic_node.window.type == TAnalyticWindowType::RANGE) {
diff --git a/be/src/pipeline/exec/assert_num_rows_operator.cpp
b/be/src/pipeline/exec/assert_num_rows_operator.cpp
index c1a02b6f838..345e42b7d96 100644
--- a/be/src/pipeline/exec/assert_num_rows_operator.cpp
+++ b/be/src/pipeline/exec/assert_num_rows_operator.cpp
@@ -27,6 +27,7 @@ AssertNumRowsOperatorX::AssertNumRowsOperatorX(ObjectPool*
pool, const TPlanNode
: StreamingOperatorX<AssertNumRowsLocalState>(pool, tnode,
operator_id, descs),
_desired_num_rows(tnode.assert_num_rows_node.desired_num_rows),
_subquery_string(tnode.assert_num_rows_node.subquery_string) {
+ _is_serial_operator = true;
if (tnode.assert_num_rows_node.__isset.assertion) {
_assertion = tnode.assert_num_rows_node.assertion;
} else {
diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
index ddec533a9ff..a59af8ce7b4 100644
--- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
@@ -326,7 +326,9 @@
DistinctStreamingAggOperatorX::DistinctStreamingAggOperatorX(ObjectPool* pool, i
? tnode.distribute_expr_lists[0]
: tnode.agg_node.grouping_exprs),
_is_colocate(tnode.agg_node.__isset.is_colocate &&
tnode.agg_node.is_colocate),
- _require_bucket_distribution(require_bucket_distribution) {
+ _require_bucket_distribution(require_bucket_distribution),
+ _without_key(tnode.agg_node.grouping_exprs.empty()) {
+ _is_serial_operator = tnode.__isset.is_serial_operator &&
tnode.is_serial_operator;
if (tnode.agg_node.__isset.use_streaming_preaggregation) {
_is_streaming_preagg = tnode.agg_node.use_streaming_preaggregation;
if (_is_streaming_preagg) {
diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
index 71d289402ec..1f7a21190ad 100644
--- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
@@ -104,6 +104,9 @@ public:
bool need_more_input_data(RuntimeState* state) const override;
DataDistribution required_data_distribution() const override {
+ if (_needs_finalize && _probe_expr_ctxs.empty()) {
+ return {ExchangeType::NOOP};
+ }
if (_needs_finalize || (!_probe_expr_ctxs.empty() &&
!_is_streaming_preagg)) {
return _is_colocate && _require_bucket_distribution &&
!_followed_by_shuffled_operator
?
DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
@@ -136,6 +139,7 @@ private:
/// The total size of the row from the aggregate functions.
size_t _total_size_of_aggregate_states = 0;
bool _is_streaming_preagg = false;
+ const bool _without_key;
};
} // namespace pipeline
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h
b/be/src/pipeline/exec/exchange_sink_operator.h
index 8af944728a2..689172dfc6b 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -224,6 +224,7 @@ public:
Status serialize_block(ExchangeSinkLocalState& stete, vectorized::Block*
src, PBlock* dest,
int num_receivers = 1);
DataDistribution required_data_distribution() const override;
+ bool is_serial_operator() const override { return true; }
private:
friend class ExchangeSinkLocalState;
diff --git a/be/src/pipeline/exec/join_build_sink_operator.cpp
b/be/src/pipeline/exec/join_build_sink_operator.cpp
index a1f3262d6ed..fc0d3b87460 100644
--- a/be/src/pipeline/exec/join_build_sink_operator.cpp
+++ b/be/src/pipeline/exec/join_build_sink_operator.cpp
@@ -82,6 +82,8 @@
JoinBuildSinkOperatorX<LocalStateType>::JoinBuildSinkOperatorX(ObjectPool* pool,
_short_circuit_for_null_in_build_side(_join_op ==
TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN &&
!_is_mark_join),
_runtime_filter_descs(tnode.runtime_filters) {
+ DataSinkOperatorX<LocalStateType>::_is_serial_operator =
+ tnode.__isset.is_serial_operator && tnode.is_serial_operator;
_init_join_op();
if (_is_mark_join) {
DCHECK(_join_op == TJoinOp::LEFT_ANTI_JOIN || _join_op ==
TJoinOp::LEFT_SEMI_JOIN ||
diff --git a/be/src/pipeline/exec/join_probe_operator.cpp
b/be/src/pipeline/exec/join_probe_operator.cpp
index 8e5010d7513..76dc75a90d8 100644
--- a/be/src/pipeline/exec/join_probe_operator.cpp
+++ b/be/src/pipeline/exec/join_probe_operator.cpp
@@ -220,6 +220,7 @@
JoinProbeOperatorX<LocalStateType>::JoinProbeOperatorX(ObjectPool* pool, const T
: true)
) {
+ Base::_is_serial_operator = tnode.__isset.is_serial_operator &&
tnode.is_serial_operator;
if (tnode.__isset.hash_join_node) {
_intermediate_row_desc.reset(new RowDescriptor(
descs, tnode.hash_join_node.vintermediate_tuple_id_list,
diff --git a/be/src/pipeline/exec/nested_loop_join_probe_operator.h
b/be/src/pipeline/exec/nested_loop_join_probe_operator.h
index 4121de64210..5b0fec159e2 100644
--- a/be/src/pipeline/exec/nested_loop_join_probe_operator.h
+++ b/be/src/pipeline/exec/nested_loop_join_probe_operator.h
@@ -203,7 +203,9 @@ public:
}
DataDistribution required_data_distribution() const override {
- if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
+ if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
+ _join_op == TJoinOp::RIGHT_OUTER_JOIN || _join_op ==
TJoinOp::RIGHT_ANTI_JOIN ||
+ _join_op == TJoinOp::RIGHT_SEMI_JOIN || _join_op ==
TJoinOp::FULL_OUTER_JOIN) {
return {ExchangeType::NOOP};
}
return {ExchangeType::ADAPTIVE_PASSTHROUGH};
diff --git a/be/src/pipeline/exec/operator.cpp
b/be/src/pipeline/exec/operator.cpp
index 5a13fdcbd84..6e3099db748 100644
--- a/be/src/pipeline/exec/operator.cpp
+++ b/be/src/pipeline/exec/operator.cpp
@@ -141,8 +141,9 @@ std::string
PipelineXSinkLocalState<SharedStateArg>::debug_string(int indentatio
std::string OperatorXBase::debug_string(int indentation_level) const {
fmt::memory_buffer debug_string_buffer;
- fmt::format_to(debug_string_buffer, "{}{}: id={}, parallel_tasks={}",
- std::string(indentation_level * 2, ' '), _op_name,
node_id(), _parallel_tasks);
+ fmt::format_to(debug_string_buffer, "{}{}: id={}, parallel_tasks={},
_is_serial_operator={}",
+ std::string(indentation_level * 2, ' '), _op_name,
node_id(), _parallel_tasks,
+ _is_serial_operator);
return fmt::to_string(debug_string_buffer);
}
@@ -363,8 +364,8 @@ void
PipelineXLocalStateBase::reached_limit(vectorized::Block* block, bool* eos)
std::string DataSinkOperatorXBase::debug_string(int indentation_level) const {
fmt::memory_buffer debug_string_buffer;
- fmt::format_to(debug_string_buffer, "{}{}: id={}",
std::string(indentation_level * 2, ' '),
- _name, node_id());
+ fmt::format_to(debug_string_buffer, "{}{}: id={}, _is_serial_operator={}",
+ std::string(indentation_level * 2, ' '), _name, node_id(),
_is_serial_operator);
return fmt::to_string(debug_string_buffer);
}
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index b5bd0fe4713..5df0a19498f 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -101,6 +101,9 @@ public:
return Status::OK();
}
+ // Operators need to be executed serially. (e.g. finalized agg without key)
+ [[nodiscard]] virtual bool is_serial_operator() const { return
_is_serial_operator; }
+
[[nodiscard]] bool is_closed() const { return _is_closed; }
virtual size_t revocable_mem_size(RuntimeState* state) const { return 0; }
@@ -122,6 +125,7 @@ protected:
bool _is_closed;
bool _followed_by_shuffled_operator = false;
+ bool _is_serial_operator = false;
};
class PipelineXLocalStateBase {
@@ -444,7 +448,7 @@ public:
Status init(const TDataSink& tsink) override;
[[nodiscard]] virtual Status init(ExchangeType type, const int num_buckets,
- const bool is_shuffled_hash_join,
+ const bool use_global_hash_shuffle,
const std::map<int, int>&
shuffle_idx_to_instance_idx) {
return Status::InternalError("init() is only implemented in local
exchange!");
}
diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
index 48df5587198..655a6e19725 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
@@ -118,6 +118,10 @@ Status PartitionedAggSourceOperatorX::close(RuntimeState*
state) {
return _agg_source_operator->close(state);
}
+bool PartitionedAggSourceOperatorX::is_serial_operator() const {
+ return _agg_source_operator->is_serial_operator();
+}
+
Status PartitionedAggSourceOperatorX::get_block(RuntimeState* state,
vectorized::Block* block,
bool* eos) {
auto& local_state = get_local_state(state);
diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
index edae99c716a..7e73241745e 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
@@ -91,6 +91,8 @@ public:
bool is_source() const override { return true; }
+ bool is_serial_operator() const override;
+
private:
friend class PartitionedAggLocalState;
diff --git a/be/src/pipeline/exec/sort_sink_operator.cpp
b/be/src/pipeline/exec/sort_sink_operator.cpp
index ee8689a8084..6d6684437b8 100644
--- a/be/src/pipeline/exec/sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/sort_sink_operator.cpp
@@ -90,7 +90,9 @@ SortSinkOperatorX::SortSinkOperatorX(ObjectPool* pool, int
operator_id, const TP
:
std::vector<TExpr> {}),
_algorithm(tnode.sort_node.__isset.algorithm ?
tnode.sort_node.algorithm
:
TSortAlgorithm::FULL_SORT),
- _reuse_mem(_algorithm != TSortAlgorithm::HEAP_SORT) {}
+ _reuse_mem(_algorithm != TSortAlgorithm::HEAP_SORT) {
+ _is_serial_operator = tnode.__isset.is_serial_operator &&
tnode.is_serial_operator;
+}
Status SortSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
RETURN_IF_ERROR(DataSinkOperatorX::init(tnode, state));
diff --git a/be/src/pipeline/exec/sort_sink_operator.h
b/be/src/pipeline/exec/sort_sink_operator.h
index 8462472dd02..0829c38b40f 100644
--- a/be/src/pipeline/exec/sort_sink_operator.h
+++ b/be/src/pipeline/exec/sort_sink_operator.h
@@ -69,8 +69,9 @@ public:
} else if (_merge_by_exchange) {
// The current sort node is used for the ORDER BY
return {ExchangeType::PASSTHROUGH};
+ } else {
+ return {ExchangeType::NOOP};
}
- return
DataSinkOperatorX<SortSinkLocalState>::required_data_distribution();
}
bool require_shuffled_data_distribution() const override { return
_is_analytic_sort; }
bool require_data_distribution() const override { return _is_colocate; }
diff --git a/be/src/pipeline/exec/sort_source_operator.cpp
b/be/src/pipeline/exec/sort_source_operator.cpp
index 02a99e183c8..7f801b79c0b 100644
--- a/be/src/pipeline/exec/sort_source_operator.cpp
+++ b/be/src/pipeline/exec/sort_source_operator.cpp
@@ -30,7 +30,9 @@ SortSourceOperatorX::SortSourceOperatorX(ObjectPool* pool,
const TPlanNode& tnod
const DescriptorTbl& descs)
: OperatorX<SortLocalState>(pool, tnode, operator_id, descs),
_merge_by_exchange(tnode.sort_node.merge_by_exchange),
- _offset(tnode.sort_node.__isset.offset ? tnode.sort_node.offset : 0)
{}
+ _offset(tnode.sort_node.__isset.offset ? tnode.sort_node.offset : 0)
{
+ _is_serial_operator = tnode.__isset.is_serial_operator &&
tnode.is_serial_operator;
+}
Status SortSourceOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
RETURN_IF_ERROR(Base::init(tnode, state));
diff --git a/be/src/pipeline/exec/union_source_operator.h
b/be/src/pipeline/exec/union_source_operator.h
index 2d112ebf2df..200e7de8597 100644
--- a/be/src/pipeline/exec/union_source_operator.h
+++ b/be/src/pipeline/exec/union_source_operator.h
@@ -63,7 +63,9 @@ public:
using Base = OperatorX<UnionSourceLocalState>;
UnionSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, int
operator_id,
const DescriptorTbl& descs)
- : Base(pool, tnode, operator_id, descs),
_child_size(tnode.num_children) {};
+ : Base(pool, tnode, operator_id, descs),
_child_size(tnode.num_children) {
+ _is_serial_operator = tnode.__isset.is_serial_operator &&
tnode.is_serial_operator;
+ }
~UnionSourceOperatorX() override = default;
Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos)
override;
diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
index d87113ca80a..ff243186c47 100644
--- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
+++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
@@ -36,17 +36,17 @@ std::vector<Dependency*>
LocalExchangeSinkLocalState::dependencies() const {
}
Status LocalExchangeSinkOperatorX::init(ExchangeType type, const int
num_buckets,
- const bool
should_disable_bucket_shuffle,
+ const bool use_global_hash_shuffle,
const std::map<int, int>&
shuffle_idx_to_instance_idx) {
_name = "LOCAL_EXCHANGE_SINK_OPERATOR (" + get_exchange_type_name(type) +
")";
_type = type;
if (_type == ExchangeType::HASH_SHUFFLE) {
- _use_global_shuffle = should_disable_bucket_shuffle;
+ _use_global_shuffle = use_global_hash_shuffle;
// For shuffle join, if data distribution has been broken by previous
operator, we
// should use a HASH_SHUFFLE local exchanger to shuffle data again. To
be mentioned,
// we should use map shuffle idx to instance idx because all instances
will be
// distributed to all BEs. Otherwise, we should use shuffle idx
directly.
- if (should_disable_bucket_shuffle) {
+ if (use_global_hash_shuffle) {
std::for_each(shuffle_idx_to_instance_idx.begin(),
shuffle_idx_to_instance_idx.end(),
[&](const auto& item) {
DCHECK(item.first != -1);
diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.h
b/be/src/pipeline/local_exchange/local_exchange_sink_operator.h
index 1cd9736d429..09b1f2cc310 100644
--- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.h
+++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.h
@@ -100,7 +100,7 @@ public:
return Status::InternalError("{} should not init with TPlanNode",
Base::_name);
}
- Status init(ExchangeType type, const int num_buckets, const bool
should_disable_bucket_shuffle,
+ Status init(ExchangeType type, const int num_buckets, const bool
use_global_hash_shuffle,
const std::map<int, int>& shuffle_idx_to_instance_idx)
override;
Status open(RuntimeState* state) override;
diff --git a/be/src/pipeline/pipeline.cpp b/be/src/pipeline/pipeline.cpp
index 6e83c7805e4..5b93fbdf1f8 100644
--- a/be/src/pipeline/pipeline.cpp
+++ b/be/src/pipeline/pipeline.cpp
@@ -22,6 +22,7 @@
#include <utility>
#include "pipeline/exec/operator.h"
+#include "pipeline/pipeline_fragment_context.h"
#include "pipeline/pipeline_task.h"
namespace doris::pipeline {
@@ -31,7 +32,47 @@ void Pipeline::_init_profile() {
_pipeline_profile = std::make_unique<RuntimeProfile>(std::move(s));
}
-Status Pipeline::add_operator(OperatorPtr& op) {
+bool Pipeline::need_to_local_exchange(const DataDistribution
target_data_distribution,
+ const int idx) const {
+ // If serial operator exists after `idx`-th operator, we should not
improve parallelism.
+ if (std::any_of(_operators.begin() + idx, _operators.end(),
+ [&](OperatorPtr op) -> bool { return
op->is_serial_operator(); })) {
+ return false;
+ }
+ if (std::all_of(_operators.begin(), _operators.end(),
+ [&](OperatorPtr op) -> bool { return
op->is_serial_operator(); })) {
+ if (!_sink->is_serial_operator()) {
+ return true;
+ }
+ } else if (std::any_of(_operators.begin(), _operators.end(),
+ [&](OperatorPtr op) -> bool { return
op->is_serial_operator(); })) {
+ return true;
+ }
+
+ if (target_data_distribution.distribution_type !=
ExchangeType::BUCKET_HASH_SHUFFLE &&
+ target_data_distribution.distribution_type !=
ExchangeType::HASH_SHUFFLE) {
+ return true;
+ } else if (_operators.front()->ignore_data_hash_distribution()) {
+ if (_data_distribution.distribution_type ==
target_data_distribution.distribution_type &&
+ (_data_distribution.partition_exprs.empty() ||
+ target_data_distribution.partition_exprs.empty())) {
+ return true;
+ }
+ return _data_distribution.distribution_type !=
target_data_distribution.distribution_type &&
+ !(is_hash_exchange(_data_distribution.distribution_type) &&
+ is_hash_exchange(target_data_distribution.distribution_type));
+ } else {
+ return _data_distribution.distribution_type !=
target_data_distribution.distribution_type &&
+ !(is_hash_exchange(_data_distribution.distribution_type) &&
+ is_hash_exchange(target_data_distribution.distribution_type));
+ }
+}
+
+Status Pipeline::add_operator(OperatorPtr& op, const int parallelism) {
+ if (parallelism > 0 && op->is_serial_operator()) {
+ set_num_tasks(parallelism);
+ op->set_ignore_data_distribution();
+ }
op->set_parallel_tasks(num_tasks());
_operators.emplace_back(op);
if (op->is_source()) {
diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h
index 8a20ccb631c..ef0ae9e9a75 100644
--- a/be/src/pipeline/pipeline.h
+++ b/be/src/pipeline/pipeline.h
@@ -44,14 +44,16 @@ class Pipeline : public
std::enable_shared_from_this<Pipeline> {
public:
explicit Pipeline(PipelineId pipeline_id, int num_tasks,
- std::weak_ptr<PipelineFragmentContext> context)
- : _pipeline_id(pipeline_id), _num_tasks(num_tasks) {
+ std::weak_ptr<PipelineFragmentContext> context, int
num_tasks_of_parent)
+ : _pipeline_id(pipeline_id),
+ _num_tasks(num_tasks),
+ _num_tasks_of_parent(num_tasks_of_parent) {
_init_profile();
_tasks.resize(_num_tasks, nullptr);
}
// Add operators for pipelineX
- Status add_operator(OperatorPtr& op);
+ Status add_operator(OperatorPtr& op, const int parallelism);
// prepare operators for pipelineX
Status prepare(RuntimeState* state);
@@ -71,28 +73,8 @@ public:
return idx == ExchangeType::HASH_SHUFFLE || idx ==
ExchangeType::BUCKET_HASH_SHUFFLE;
}
- bool need_to_local_exchange(const DataDistribution
target_data_distribution) const {
- if (target_data_distribution.distribution_type !=
ExchangeType::BUCKET_HASH_SHUFFLE &&
- target_data_distribution.distribution_type !=
ExchangeType::HASH_SHUFFLE) {
- return true;
- } else if (_operators.front()->ignore_data_hash_distribution()) {
- if (_data_distribution.distribution_type ==
- target_data_distribution.distribution_type &&
- (_data_distribution.partition_exprs.empty() ||
- target_data_distribution.partition_exprs.empty())) {
- return true;
- }
- return _data_distribution.distribution_type !=
- target_data_distribution.distribution_type &&
- !(is_hash_exchange(_data_distribution.distribution_type) &&
-
is_hash_exchange(target_data_distribution.distribution_type));
- } else {
- return _data_distribution.distribution_type !=
- target_data_distribution.distribution_type &&
- !(is_hash_exchange(_data_distribution.distribution_type) &&
-
is_hash_exchange(target_data_distribution.distribution_type));
- }
- }
+ bool need_to_local_exchange(const DataDistribution
target_data_distribution,
+ const int idx) const;
void init_data_distribution() {
set_data_distribution(_operators.front()->required_data_distribution());
}
@@ -120,6 +102,14 @@ public:
for (auto& op : _operators) {
op->set_parallel_tasks(_num_tasks);
}
+
+#ifndef NDEBUG
+ if (num_tasks > 1 &&
+ std::any_of(_operators.begin(), _operators.end(),
+ [&](OperatorPtr op) -> bool { return
op->is_serial_operator(); })) {
+ DCHECK(false) << debug_string();
+ }
+#endif
}
int num_tasks() const { return _num_tasks; }
bool close_task() { return _num_tasks_running.fetch_sub(1) == 1; }
@@ -136,6 +126,8 @@ public:
return fmt::to_string(debug_string_buffer);
}
+ int num_tasks_of_parent() const { return _num_tasks_of_parent; }
+
private:
void _init_profile();
@@ -173,6 +165,8 @@ private:
std::atomic<int> _num_tasks_running = 0;
// Tasks in this pipeline.
std::vector<PipelineTask*> _tasks;
+ // Parallelism of parent pipeline.
+ const int _num_tasks_of_parent;
};
} // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index 28cfefbf6c1..fd3baefa76f 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -214,8 +214,9 @@ void PipelineFragmentContext::cancel(const Status reason) {
PipelinePtr PipelineFragmentContext::add_pipeline(PipelinePtr parent, int idx)
{
PipelineId id = _next_pipeline_id++;
auto pipeline = std::make_shared<Pipeline>(
- id, _num_instances,
-
std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this()));
+ id, parent ? std::min(parent->num_tasks(), _num_instances) :
_num_instances,
+
std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this()),
+ parent ? parent->num_tasks() : _num_instances);
if (idx >= 0) {
_pipelines.insert(_pipelines.begin() + idx, pipeline);
} else {
@@ -235,6 +236,8 @@ Status PipelineFragmentContext::prepare(const
doris::TPipelineFragmentParams& re
if (request.__isset.query_options &&
request.query_options.__isset.execution_timeout) {
_timeout = request.query_options.execution_timeout;
}
+ _use_serial_source =
+ request.fragment.__isset.use_serial_source &&
request.fragment.use_serial_source;
_fragment_level_profile =
std::make_unique<RuntimeProfile>("PipelineContext");
_prepare_timer = ADD_TIMER(_fragment_level_profile, "PrepareTime");
@@ -749,13 +752,12 @@ Status PipelineFragmentContext::_add_local_exchange_impl(
const bool followed_by_shuffled_operator =
operators.size() > idx ?
operators[idx]->followed_by_shuffled_operator()
:
cur_pipe->sink()->followed_by_shuffled_operator();
- const bool should_disable_bucket_shuffle =
+ const bool use_global_hash_shuffle =
bucket_seq_to_instance_idx.empty() &&
shuffle_idx_to_instance_idx.find(-1) ==
shuffle_idx_to_instance_idx.end() &&
- followed_by_shuffled_operator;
+ followed_by_shuffled_operator && !_use_serial_source;
sink.reset(new LocalExchangeSinkOperatorX(
- sink_id, local_exchange_id,
- should_disable_bucket_shuffle ? _total_instances : _num_instances,
+ sink_id, local_exchange_id, use_global_hash_shuffle ?
_total_instances : _num_instances,
data_distribution.partition_exprs, bucket_seq_to_instance_idx));
if (bucket_seq_to_instance_idx.empty() &&
data_distribution.distribution_type ==
ExchangeType::BUCKET_HASH_SHUFFLE) {
@@ -763,8 +765,7 @@ Status PipelineFragmentContext::_add_local_exchange_impl(
}
RETURN_IF_ERROR(new_pip->set_sink(sink));
RETURN_IF_ERROR(new_pip->sink()->init(data_distribution.distribution_type,
num_buckets,
- should_disable_bucket_shuffle,
- shuffle_idx_to_instance_idx));
+ use_global_hash_shuffle,
shuffle_idx_to_instance_idx));
// 2. Create and initialize LocalExchangeSharedState.
std::shared_ptr<LocalExchangeSharedState> shared_state =
@@ -775,7 +776,7 @@ Status PipelineFragmentContext::_add_local_exchange_impl(
case ExchangeType::HASH_SHUFFLE:
shared_state->exchanger = ShuffleExchanger::create_unique(
std::max(cur_pipe->num_tasks(), _num_instances),
- should_disable_bucket_shuffle ? _total_instances :
_num_instances,
+ use_global_hash_shuffle ? _total_instances : _num_instances,
_runtime_state->query_options().__isset.local_exchange_free_blocks_limit
?
_runtime_state->query_options().local_exchange_free_blocks_limit
: 0);
@@ -915,11 +916,11 @@ Status PipelineFragmentContext::_add_local_exchange(
const std::map<int, int>& bucket_seq_to_instance_idx,
const std::map<int, int>& shuffle_idx_to_instance_idx,
const bool ignore_data_distribution) {
- if (_num_instances <= 1) {
+ if (_num_instances <= 1 || cur_pipe->num_tasks_of_parent() <= 1) {
return Status::OK();
}
- if (!cur_pipe->need_to_local_exchange(data_distribution)) {
+ if (!cur_pipe->need_to_local_exchange(data_distribution, idx)) {
return Status::OK();
}
*do_local_exchange = true;
@@ -1154,7 +1155,8 @@ Status
PipelineFragmentContext::_create_data_sink(ObjectPool* pool, const TDataS
// 1. create and set the source operator of
multi_cast_data_stream_source for new pipeline
source_op.reset(new MultiCastDataStreamerSourceOperatorX(
i, pool, thrift_sink.multi_cast_stream_sink.sinks[i],
row_desc, source_id));
- RETURN_IF_ERROR(new_pipeline->add_operator(source_op));
+ RETURN_IF_ERROR(new_pipeline->add_operator(
+ source_op, params.__isset.parallel_instances ?
params.parallel_instances : 0));
// 2. create and set sink operator of data stream sender for new
pipeline
DataSinkOperatorPtr sink_op;
@@ -1203,7 +1205,8 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
op.reset(new OlapScanOperatorX(
pool, tnode, next_operator_id(), descs, _num_instances,
enable_query_cache ? request.fragment.query_cache_param :
TQueryCacheParam {}));
- RETURN_IF_ERROR(cur_pipe->add_operator(op));
+ RETURN_IF_ERROR(cur_pipe->add_operator(
+ op, request.__isset.parallel_instances ?
request.parallel_instances : 0));
if (request.__isset.parallel_instances) {
cur_pipe->set_num_tasks(request.parallel_instances);
op->set_ignore_data_distribution();
@@ -1216,7 +1219,8 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
_query_ctx->query_mem_tracker->is_group_commit_load = true;
#endif
op.reset(new GroupCommitOperatorX(pool, tnode, next_operator_id(),
descs, _num_instances));
- RETURN_IF_ERROR(cur_pipe->add_operator(op));
+ RETURN_IF_ERROR(cur_pipe->add_operator(
+ op, request.__isset.parallel_instances ?
request.parallel_instances : 0));
if (request.__isset.parallel_instances) {
cur_pipe->set_num_tasks(request.parallel_instances);
op->set_ignore_data_distribution();
@@ -1226,7 +1230,8 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
case doris::TPlanNodeType::JDBC_SCAN_NODE: {
if (config::enable_java_support) {
op.reset(new JDBCScanOperatorX(pool, tnode, next_operator_id(),
descs, _num_instances));
- RETURN_IF_ERROR(cur_pipe->add_operator(op));
+ RETURN_IF_ERROR(cur_pipe->add_operator(
+ op, request.__isset.parallel_instances ?
request.parallel_instances : 0));
} else {
return Status::InternalError(
"Jdbc scan node is disabled, you can change be config
enable_java_support "
@@ -1240,7 +1245,8 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
}
case doris::TPlanNodeType::FILE_SCAN_NODE: {
op.reset(new FileScanOperatorX(pool, tnode, next_operator_id(), descs,
_num_instances));
- RETURN_IF_ERROR(cur_pipe->add_operator(op));
+ RETURN_IF_ERROR(cur_pipe->add_operator(
+ op, request.__isset.parallel_instances ?
request.parallel_instances : 0));
if (request.__isset.parallel_instances) {
cur_pipe->set_num_tasks(request.parallel_instances);
op->set_ignore_data_distribution();
@@ -1250,7 +1256,8 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
case TPlanNodeType::ES_SCAN_NODE:
case TPlanNodeType::ES_HTTP_SCAN_NODE: {
op.reset(new EsScanOperatorX(pool, tnode, next_operator_id(), descs,
_num_instances));
- RETURN_IF_ERROR(cur_pipe->add_operator(op));
+ RETURN_IF_ERROR(cur_pipe->add_operator(
+ op, request.__isset.parallel_instances ?
request.parallel_instances : 0));
if (request.__isset.parallel_instances) {
cur_pipe->set_num_tasks(request.parallel_instances);
op->set_ignore_data_distribution();
@@ -1261,7 +1268,8 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
int num_senders = find_with_default(request.per_exch_num_senders,
tnode.node_id, 0);
DCHECK_GT(num_senders, 0);
op.reset(new ExchangeSourceOperatorX(pool, tnode, next_operator_id(),
descs, num_senders));
- RETURN_IF_ERROR(cur_pipe->add_operator(op));
+ RETURN_IF_ERROR(cur_pipe->add_operator(
+ op, request.__isset.parallel_instances ?
request.parallel_instances : 0));
if (request.__isset.parallel_instances) {
op->set_ignore_data_distribution();
cur_pipe->set_num_tasks(request.parallel_instances);
@@ -1280,7 +1288,8 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
auto cache_source_id = next_operator_id();
op.reset(new CacheSourceOperatorX(pool, cache_node_id,
cache_source_id,
request.fragment.query_cache_param));
- RETURN_IF_ERROR(cur_pipe->add_operator(op));
+ RETURN_IF_ERROR(cur_pipe->add_operator(
+ op, request.__isset.parallel_instances ?
request.parallel_instances : 0));
const auto downstream_pipeline_id = cur_pipe->id();
if (_dag.find(downstream_pipeline_id) == _dag.end()) {
@@ -1315,7 +1324,8 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
_require_bucket_distribution));
op->set_followed_by_shuffled_operator(false);
_require_bucket_distribution = true;
- RETURN_IF_ERROR(new_pipe->add_operator(op));
+ RETURN_IF_ERROR(new_pipe->add_operator(
+ op, request.__isset.parallel_instances ?
request.parallel_instances : 0));
RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
cur_pipe = new_pipe;
} else {
@@ -1324,7 +1334,8 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
op->set_followed_by_shuffled_operator(followed_by_shuffled_operator);
_require_bucket_distribution =
_require_bucket_distribution ||
op->require_data_distribution();
- RETURN_IF_ERROR(cur_pipe->add_operator(op));
+ RETURN_IF_ERROR(cur_pipe->add_operator(
+ op, request.__isset.parallel_instances ?
request.parallel_instances : 0));
}
} else if (tnode.agg_node.__isset.use_streaming_preaggregation &&
tnode.agg_node.use_streaming_preaggregation &&
@@ -1335,11 +1346,13 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
op.reset(new StreamingAggOperatorX(pool, next_operator_id(),
tnode, descs));
RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
- RETURN_IF_ERROR(new_pipe->add_operator(op));
+ RETURN_IF_ERROR(new_pipe->add_operator(
+ op, request.__isset.parallel_instances ?
request.parallel_instances : 0));
cur_pipe = new_pipe;
} else {
op.reset(new StreamingAggOperatorX(pool, next_operator_id(),
tnode, descs));
- RETURN_IF_ERROR(cur_pipe->add_operator(op));
+ RETURN_IF_ERROR(cur_pipe->add_operator(
+ op, request.__isset.parallel_instances ?
request.parallel_instances : 0));
}
} else {
// create new pipeline to add query cache operator
@@ -1355,10 +1368,12 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
}
if (enable_query_cache) {
RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
- RETURN_IF_ERROR(new_pipe->add_operator(op));
+ RETURN_IF_ERROR(new_pipe->add_operator(
+ op, request.__isset.parallel_instances ?
request.parallel_instances : 0));
cur_pipe = new_pipe;
} else {
- RETURN_IF_ERROR(cur_pipe->add_operator(op));
+ RETURN_IF_ERROR(cur_pipe->add_operator(
+ op, request.__isset.parallel_instances ?
request.parallel_instances : 0));
}
const auto downstream_pipeline_id = cur_pipe->id();
@@ -1406,7 +1421,8 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
pool, tnode_, next_operator_id(), descs, partition_count);
probe_operator->set_inner_operators(inner_sink_operator,
inner_probe_operator);
op = std::move(probe_operator);
- RETURN_IF_ERROR(cur_pipe->add_operator(op));
+ RETURN_IF_ERROR(cur_pipe->add_operator(
+ op, request.__isset.parallel_instances ?
request.parallel_instances : 0));
const auto downstream_pipeline_id = cur_pipe->id();
if (_dag.find(downstream_pipeline_id) == _dag.end()) {
@@ -1430,7 +1446,8 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
op->set_followed_by_shuffled_operator(op->is_shuffled_operator());
} else {
op.reset(new HashJoinProbeOperatorX(pool, tnode,
next_operator_id(), descs));
- RETURN_IF_ERROR(cur_pipe->add_operator(op));
+ RETURN_IF_ERROR(cur_pipe->add_operator(
+ op, request.__isset.parallel_instances ?
request.parallel_instances : 0));
const auto downstream_pipeline_id = cur_pipe->id();
if (_dag.find(downstream_pipeline_id) == _dag.end()) {
@@ -1457,7 +1474,8 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
}
case TPlanNodeType::CROSS_JOIN_NODE: {
op.reset(new NestedLoopJoinProbeOperatorX(pool, tnode,
next_operator_id(), descs));
- RETURN_IF_ERROR(cur_pipe->add_operator(op));
+ RETURN_IF_ERROR(cur_pipe->add_operator(
+ op, request.__isset.parallel_instances ?
request.parallel_instances : 0));
const auto downstream_pipeline_id = cur_pipe->id();
if (_dag.find(downstream_pipeline_id) == _dag.end()) {
@@ -1480,7 +1498,8 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
int child_count = tnode.num_children;
op.reset(new UnionSourceOperatorX(pool, tnode, next_operator_id(),
descs));
op->set_followed_by_shuffled_operator(_require_bucket_distribution);
- RETURN_IF_ERROR(cur_pipe->add_operator(op));
+ RETURN_IF_ERROR(cur_pipe->add_operator(
+ op, request.__isset.parallel_instances ?
request.parallel_instances : 0));
const auto downstream_pipeline_id = cur_pipe->id();
if (_dag.find(downstream_pipeline_id) == _dag.end()) {
@@ -1508,7 +1527,8 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
} else {
op.reset(new SortSourceOperatorX(pool, tnode, next_operator_id(),
descs));
}
- RETURN_IF_ERROR(cur_pipe->add_operator(op));
+ RETURN_IF_ERROR(cur_pipe->add_operator(
+ op, request.__isset.parallel_instances ?
request.parallel_instances : 0));
const auto downstream_pipeline_id = cur_pipe->id();
if (_dag.find(downstream_pipeline_id) == _dag.end()) {
@@ -1535,7 +1555,8 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
}
case doris::TPlanNodeType::PARTITION_SORT_NODE: {
op.reset(new PartitionSortSourceOperatorX(pool, tnode,
next_operator_id(), descs));
- RETURN_IF_ERROR(cur_pipe->add_operator(op));
+ RETURN_IF_ERROR(cur_pipe->add_operator(
+ op, request.__isset.parallel_instances ?
request.parallel_instances : 0));
const auto downstream_pipeline_id = cur_pipe->id();
if (_dag.find(downstream_pipeline_id) == _dag.end()) {
@@ -1553,7 +1574,8 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
}
case TPlanNodeType::ANALYTIC_EVAL_NODE: {
op.reset(new AnalyticSourceOperatorX(pool, tnode, next_operator_id(),
descs));
- RETURN_IF_ERROR(cur_pipe->add_operator(op));
+ RETURN_IF_ERROR(cur_pipe->add_operator(
+ op, request.__isset.parallel_instances ?
request.parallel_instances : 0));
const auto downstream_pipeline_id = cur_pipe->id();
if (_dag.find(downstream_pipeline_id) == _dag.end()) {
@@ -1575,39 +1597,44 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
}
case TPlanNodeType::INTERSECT_NODE: {
RETURN_IF_ERROR(_build_operators_for_set_operation_node<true>(
- pool, tnode, descs, op, cur_pipe, parent_idx, child_idx));
+ pool, tnode, descs, op, cur_pipe, parent_idx, child_idx,
request));
op->set_followed_by_shuffled_operator(_require_bucket_distribution);
break;
}
case TPlanNodeType::EXCEPT_NODE: {
RETURN_IF_ERROR(_build_operators_for_set_operation_node<false>(
- pool, tnode, descs, op, cur_pipe, parent_idx, child_idx));
+ pool, tnode, descs, op, cur_pipe, parent_idx, child_idx,
request));
op->set_followed_by_shuffled_operator(_require_bucket_distribution);
break;
}
case TPlanNodeType::REPEAT_NODE: {
op.reset(new RepeatOperatorX(pool, tnode, next_operator_id(), descs));
- RETURN_IF_ERROR(cur_pipe->add_operator(op));
+ RETURN_IF_ERROR(cur_pipe->add_operator(
+ op, request.__isset.parallel_instances ?
request.parallel_instances : 0));
break;
}
case TPlanNodeType::TABLE_FUNCTION_NODE: {
op.reset(new TableFunctionOperatorX(pool, tnode, next_operator_id(),
descs));
- RETURN_IF_ERROR(cur_pipe->add_operator(op));
+ RETURN_IF_ERROR(cur_pipe->add_operator(
+ op, request.__isset.parallel_instances ?
request.parallel_instances : 0));
break;
}
case TPlanNodeType::ASSERT_NUM_ROWS_NODE: {
op.reset(new AssertNumRowsOperatorX(pool, tnode, next_operator_id(),
descs));
- RETURN_IF_ERROR(cur_pipe->add_operator(op));
+ RETURN_IF_ERROR(cur_pipe->add_operator(
+ op, request.__isset.parallel_instances ?
request.parallel_instances : 0));
break;
}
case TPlanNodeType::EMPTY_SET_NODE: {
op.reset(new EmptySetSourceOperatorX(pool, tnode, next_operator_id(),
descs));
- RETURN_IF_ERROR(cur_pipe->add_operator(op));
+ RETURN_IF_ERROR(cur_pipe->add_operator(
+ op, request.__isset.parallel_instances ?
request.parallel_instances : 0));
break;
}
case TPlanNodeType::DATA_GEN_SCAN_NODE: {
op.reset(new DataGenSourceOperatorX(pool, tnode, next_operator_id(),
descs));
- RETURN_IF_ERROR(cur_pipe->add_operator(op));
+ RETURN_IF_ERROR(cur_pipe->add_operator(
+ op, request.__isset.parallel_instances ?
request.parallel_instances : 0));
if (request.__isset.parallel_instances) {
cur_pipe->set_num_tasks(request.parallel_instances);
op->set_ignore_data_distribution();
@@ -1616,17 +1643,20 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
}
case TPlanNodeType::SCHEMA_SCAN_NODE: {
op.reset(new SchemaScanOperatorX(pool, tnode, next_operator_id(),
descs));
- RETURN_IF_ERROR(cur_pipe->add_operator(op));
+ RETURN_IF_ERROR(cur_pipe->add_operator(
+ op, request.__isset.parallel_instances ?
request.parallel_instances : 0));
break;
}
case TPlanNodeType::META_SCAN_NODE: {
op.reset(new MetaScanOperatorX(pool, tnode, next_operator_id(),
descs));
- RETURN_IF_ERROR(cur_pipe->add_operator(op));
+ RETURN_IF_ERROR(cur_pipe->add_operator(
+ op, request.__isset.parallel_instances ?
request.parallel_instances : 0));
break;
}
case TPlanNodeType::SELECT_NODE: {
op.reset(new SelectOperatorX(pool, tnode, next_operator_id(), descs));
- RETURN_IF_ERROR(cur_pipe->add_operator(op));
+ RETURN_IF_ERROR(cur_pipe->add_operator(
+ op, request.__isset.parallel_instances ?
request.parallel_instances : 0));
break;
}
default:
@@ -1642,9 +1672,11 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
template <bool is_intersect>
Status PipelineFragmentContext::_build_operators_for_set_operation_node(
ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs,
OperatorPtr& op,
- PipelinePtr& cur_pipe, int parent_idx, int child_idx) {
+ PipelinePtr& cur_pipe, int parent_idx, int child_idx,
+ const doris::TPipelineFragmentParams& request) {
op.reset(new SetSourceOperatorX<is_intersect>(pool, tnode,
next_operator_id(), descs));
- RETURN_IF_ERROR(cur_pipe->add_operator(op));
+ RETURN_IF_ERROR(cur_pipe->add_operator(
+ op, request.__isset.parallel_instances ?
request.parallel_instances : 0));
const auto downstream_pipeline_id = cur_pipe->id();
if (_dag.find(downstream_pipeline_id) == _dag.end()) {
diff --git a/be/src/pipeline/pipeline_fragment_context.h
b/be/src/pipeline/pipeline_fragment_context.h
index 0749729789e..6caa0e5c106 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -121,7 +121,7 @@ public:
_tasks[j][i]->stop_if_finished();
}
}
- };
+ }
private:
Status _build_pipelines(ObjectPool* pool, const
doris::TPipelineFragmentParams& request,
@@ -140,7 +140,8 @@ private:
Status _build_operators_for_set_operation_node(ObjectPool* pool, const
TPlanNode& tnode,
const DescriptorTbl& descs,
OperatorPtr& op,
PipelinePtr& cur_pipe, int
parent_idx,
- int child_idx);
+ int child_idx,
+ const
doris::TPipelineFragmentParams& request);
Status _create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink,
const std::vector<TExpr>& output_exprs,
@@ -224,6 +225,7 @@ private:
int _num_instances = 1;
int _timeout = -1;
+ bool _use_serial_source = false;
OperatorPtr _root_op = nullptr;
// this is a [n * m] matrix. n is parallelism of pipeline engine and m is
the number of pipelines.
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java
index 4dca9384d65..55d1b4b50c0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java
@@ -488,6 +488,11 @@ public class AggregationNode extends PlanNode {
}
}
+ @Override
+ public boolean isSerialOperator() {
+ return aggInfo.getGroupingExprs().isEmpty() && needsFinalize;
+ }
+
public void setColocate(boolean colocate) {
isColocate = colocate;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/AnalyticEvalNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/AnalyticEvalNode.java
index cdbf827aed9..dce6c3d1b04 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/AnalyticEvalNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/AnalyticEvalNode.java
@@ -296,4 +296,9 @@ public class AnalyticEvalNode extends PlanNode {
return output.toString();
}
+
+ @Override
+ public boolean isSerialOperator() {
+ return partitionExprs.isEmpty();
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/AssertNumRowsNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/AssertNumRowsNode.java
index 57d9ce8742f..a4c4aa42c65 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/AssertNumRowsNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/AssertNumRowsNode.java
@@ -116,4 +116,9 @@ public class AssertNumRowsNode extends PlanNode {
public int getNumInstances() {
return 1;
}
+
+ @Override
+ public boolean isSerialOperator() {
+ return true;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java
index 9c6ba83408a..ce57a57c377 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java
@@ -90,6 +90,10 @@ public class DataPartition {
return type == TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED;
}
+ public boolean isTabletSinkShufflePartition() {
+ return type == TPartitionType.TABLET_SINK_SHUFFLE_PARTITIONED;
+ }
+
public TPartitionType getType() {
return type;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/EmptySetNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/EmptySetNode.java
index 867c220d9fe..f6ddf23429e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/EmptySetNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/EmptySetNode.java
@@ -81,4 +81,8 @@ public class EmptySetNode extends PlanNode {
return 1;
}
+ @Override
+ public boolean isSerialOperator() {
+ return true;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
index 4ada9a82f7c..7af09287191 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
@@ -195,6 +195,11 @@ public class ExchangeNode extends PlanNode {
return prefix + "offset: " + offset + "\n";
}
+ @Override
+ public boolean isMerging() {
+ return mergeInfo != null;
+ }
+
public boolean isRightChildOfBroadcastHashJoin() {
return isRightChildOfBroadcastHashJoin;
}
@@ -202,4 +207,9 @@ public class ExchangeNode extends PlanNode {
public void setRightChildOfBroadcastHashJoin(boolean value) {
isRightChildOfBroadcastHashJoin = value;
}
+
+ @Override
+ public boolean isSerialOperator() {
+ return true;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/JoinNodeBase.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/JoinNodeBase.java
index 91a3c26e770..5dc81e29d85 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/JoinNodeBase.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/JoinNodeBase.java
@@ -597,7 +597,6 @@ public abstract class JoinNodeBase extends PlanNode {
this.useSpecificProjections = useSpecificProjections;
}
-
public boolean isUseSpecificProjections() {
return useSpecificProjections;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java
index 30c0a2d0394..c7b3525e4cd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java
@@ -281,4 +281,21 @@ public class NestedLoopJoinNode extends JoinNodeBase {
}
return output.toString();
}
+
+ /**
+ * If joinOp is one of type below:
+ * 1. NULL_AWARE_LEFT_ANTI_JOIN
+ * 2. RIGHT_OUTER_JOIN
+ * 3. RIGHT_ANTI_JOIN
+ * 4. RIGHT_SEMI_JOIN
+ *
+ * We will
+ * @return
+ */
+ @Override
+ public boolean isSerialOperator() {
+ return joinOp == JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN || joinOp ==
JoinOperator.RIGHT_OUTER_JOIN
+ || joinOp == JoinOperator.RIGHT_ANTI_JOIN || joinOp ==
JoinOperator.RIGHT_SEMI_JOIN
+ || joinOp == JoinOperator.FULL_OUTER_JOIN;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java
index ae1d34308a3..3e3c49bf675 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java
@@ -341,6 +341,7 @@ public class PlanFragment extends TreeNode<PlanFragment> {
// TODO chenhao , calculated by cost
result.setMinReservationBytes(0);
result.setInitialReservationTotalClaims(0);
+ result.setUseSerialSource(useSerialSource(ConnectContext.get()));
return result;
}
@@ -502,4 +503,38 @@ public class PlanFragment extends TreeNode<PlanFragment> {
public boolean hasNullAwareLeftAntiJoin() {
return planRoot.isNullAwareLeftAntiJoin();
}
+
+ private boolean isMergingFragment() {
+ return planRoot.isMerging();
+ }
+
+ public boolean useSerialSource(ConnectContext context) {
+ return context != null
+ &&
context.getSessionVariable().isIgnoreStorageDataDistribution()
+ && !hasNullAwareLeftAntiJoin()
+ // If input data partition is UNPARTITIONED and sink is
DataStreamSink and root node is not a serial
+ // operator, we use local exchange to improve parallelism
+ && getDataPartition() == DataPartition.UNPARTITIONED &&
!children.isEmpty()
+ && sink instanceof DataStreamSink &&
!planRoot.isSerialOperator()
+ /**
+ * If table `t1` has unique key `k1` and value column `v1`.
+ * Now use plan below to load data into `t1`:
+ * ```
+ * FRAGMENT 0:
+ * Merging Exchange (id = 1)
+ * NL Join (id = 2)
+ * DataStreamSender (id = 3, dst_id = 3)
(TABLET_SINK_SHUFFLE_PARTITIONED)
+ *
+ * FRAGMENT 1:
+ * Exchange (id = 3)
+ * OlapTableSink (id = 4) ```
+ *
+ * In this plan, `Exchange (id = 1)` needs to do merge sort
using column `k1` and `v1` so parallelism
+ * of FRAGMENT 0 must be 1 and data will be shuffled to
FRAGMENT 1 which also has only 1 instance
+ * because this loading job relies on the global ordering of
column `k1` and `v1`.
+ *
+ * So FRAGMENT 0 should not use serial source.
+ */
+ && !isMergingFragment();
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
index 1e9d5646939..d1ba493682b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
@@ -279,6 +279,10 @@ public abstract class PlanNode extends TreeNode<PlanNode>
implements PlanStats {
return children.stream().anyMatch(PlanNode::isNullAwareLeftAntiJoin);
}
+ public boolean isMerging() {
+ return children.stream().anyMatch(PlanNode::isMerging);
+ }
+
public PlanFragment getFragment() {
return fragment;
}
@@ -639,6 +643,7 @@ public abstract class PlanNode extends TreeNode<PlanNode>
implements PlanStats {
TPlanNode msg = new TPlanNode();
msg.node_id = id.asInt();
msg.setNereidsId(nereidsId);
+ msg.setIsSerialOperator(isSerialOperator());
msg.num_children = children.size();
msg.limit = limit;
for (TupleId tid : tupleIds) {
@@ -1374,4 +1379,9 @@ public abstract class PlanNode extends TreeNode<PlanNode>
implements PlanStats {
return true;
});
}
+
+ // Operators need to be executed serially. (e.g. finalized agg without key)
+ public boolean isSerialOperator() {
+ return false;
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/RepeatNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/RepeatNode.java
index 3c6a88cea08..407d8a6444c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/RepeatNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/RepeatNode.java
@@ -200,4 +200,9 @@ public class RepeatNode extends PlanNode {
}
return output.toString();
}
+
+ @Override
+ public boolean isSerialOperator() {
+ return children.get(0).isSerialOperator();
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
index a92cac7b510..1681699d651 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
@@ -848,4 +848,9 @@ public abstract class ScanNode extends PlanNode implements
SplitGenerator {
public long getSelectedSplitNum() {
return selectedSplitNum;
}
+
+ @Override
+ public boolean isSerialOperator() {
+ return true;
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SelectNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/SelectNode.java
index 6c6b665b00a..b3b088837a6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/SelectNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SelectNode.java
@@ -109,4 +109,9 @@ public class SelectNode extends PlanNode {
}
return output.toString();
}
+
+ @Override
+ public boolean isSerialOperator() {
+ return children.get(0).isSerialOperator();
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java
index e3c405bcbab..fc1c50c0bba 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java
@@ -389,6 +389,11 @@ public class SortNode extends PlanNode {
return new HashSet<>(result);
}
+ @Override
+ public boolean isSerialOperator() {
+ return !isAnalyticSort && !mergeByexchange;
+ }
+
public void setColocate(boolean colocate) {
isColocate = colocate;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/UnionNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/UnionNode.java
index 40982d07e77..bf48a770f1c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/UnionNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/UnionNode.java
@@ -42,4 +42,9 @@ public class UnionNode extends SetOperationNode {
protected void toThrift(TPlanNode msg) {
toThrift(msg, TPlanNodeType.UNION_NODE);
}
+
+ @Override
+ public boolean isSerialOperator() {
+ return children.isEmpty();
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 8e580c549df..4eda6775b5c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -1779,6 +1779,20 @@ public class Coordinator implements CoordInterface {
FInstanceExecParam instanceParam = new
FInstanceExecParam(null, execHostport,
0, params);
params.instanceExecParams.add(instanceParam);
+
+ // Using serial source means a serial source operator will be
used in this fragment (e.g. data will be
+ // shuffled to only 1 exchange operator) and then splitted by
followed local exchanger
+ int expectedInstanceNum = fragment.getParallelExecNum();
+ boolean useSerialSource = fragment.useSerialSource(context) &&
useNereids
+ && fragment.queryCacheParam == null;
+ if (useSerialSource) {
+ for (int j = 1; j < expectedInstanceNum; j++) {
+ params.instanceExecParams.add(new FInstanceExecParam(
+ null, execHostport, 0, params));
+ }
+ params.ignoreDataDistribution = true;
+ params.parallelTasksNum = 1;
+ }
continue;
}
@@ -1808,6 +1822,10 @@ public class Coordinator implements CoordInterface {
if (leftMostNode.getNumInstances() == 1) {
exchangeInstances = 1;
}
+ // Using serial source means a serial source operator will be
used in this fragment (e.g. data will be
+ // shuffled to only 1 exchange operator) and then splitted by
followed local exchanger
+ boolean useSerialSource = fragment.useSerialSource(context) &&
useNereids
+ && fragment.queryCacheParam == null;
if (exchangeInstances > 0 &&
fragmentExecParamsMap.get(inputFragmentId)
.instanceExecParams.size() > exchangeInstances) {
// random select some instance
@@ -1825,12 +1843,16 @@ public class Coordinator implements CoordInterface {
hosts.get(index % hosts.size()), 0, params);
params.instanceExecParams.add(instanceParam);
}
+ params.ignoreDataDistribution = useSerialSource;
+ params.parallelTasksNum = useSerialSource ? 1 :
params.instanceExecParams.size();
} else {
for (FInstanceExecParam execParams
:
fragmentExecParamsMap.get(inputFragmentId).instanceExecParams) {
FInstanceExecParam instanceParam = new
FInstanceExecParam(null, execParams.host, 0, params);
params.instanceExecParams.add(instanceParam);
}
+ params.ignoreDataDistribution = useSerialSource;
+ params.parallelTasksNum = useSerialSource ? 1 :
params.instanceExecParams.size();
}
// When group by cardinality is smaller than number of
backend, only some backends always
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 5c0273da791..eb5266942c0 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -1366,6 +1366,7 @@ struct TPlanNode {
49: optional i64 push_down_count
50: optional list<list<Exprs.TExpr>> distribute_expr_lists
+ 51: optional bool is_serial_operator
// projections is final projections, which means projecting into results and
materializing them into the output block.
101: optional list<Exprs.TExpr> projections
102: optional Types.TTupleId output_tuple_id
diff --git a/gensrc/thrift/Planner.thrift b/gensrc/thrift/Planner.thrift
index 866d8d45320..ffcc33638db 100644
--- a/gensrc/thrift/Planner.thrift
+++ b/gensrc/thrift/Planner.thrift
@@ -64,6 +64,10 @@ struct TPlanFragment {
8: optional i64 initial_reservation_total_claims
9: optional QueryCache.TQueryCacheParam query_cache_param
+
+ // Using serial source means a serial source operator will be used in this
fragment (e.g. data will be shuffled to
+ // only 1 exchange operator) and then splitted by followed local exchanger
+ 10: optional bool use_serial_source
}
// location information for a single scan range
diff --git
a/regression-test/suites/nereids_p0/insert_into_table/complex_insert.groovy
b/regression-test/suites/nereids_p0/insert_into_table/complex_insert.groovy
index 2493a7df5de..049cbe0b4d7 100644
--- a/regression-test/suites/nereids_p0/insert_into_table/complex_insert.groovy
+++ b/regression-test/suites/nereids_p0/insert_into_table/complex_insert.groovy
@@ -177,15 +177,15 @@ suite('complex_insert') {
sql 'insert into t1(id, c1, c2, c3) select id, c1 * 2, c2, c3 from t1'
sql 'sync'
- qt_sql_1 'select * from t1, t2, t3 order by t1.id, t1.id1, t2.id, t3.id'
+ qt_sql_1 'select * from t1, t2, t3 order by t1.id, t1.id1, t2.id, t2.c1,
t3.id'
sql 'insert into t2(id, c1, c2, c3) select id, c1, c2 * 2, c3 from t2'
sql 'sync'
- qt_sql_2 'select * from t1, t2, t3 order by t1.id, t1.id1, t2.id, t3.id'
+ qt_sql_2 'select * from t1, t2, t3 order by t1.id, t1.id1, t2.id, t2.c1,
t3.id'
sql 'insert into t2(c1, c3) select c1 + 1, c3 + 1 from (select id, c1, c3
from t1 order by id, c1 limit 10) t1, t3'
sql 'sync'
- qt_sql_3 'select * from t1, t2, t3 order by t1.id, t1.id1, t2.id, t3.id'
+ qt_sql_3 'select * from t1, t2, t3 order by t1.id, t1.id1, t2.id, t2.c1,
t3.id'
sql 'drop table if exists agg_have_dup_base'
diff --git
a/regression-test/suites/nereids_syntax_p0/distribute/local_shuffle.groovy
b/regression-test/suites/nereids_syntax_p0/distribute/local_shuffle.groovy
index 997230b1a06..950b6171c7c 100644
--- a/regression-test/suites/nereids_syntax_p0/distribute/local_shuffle.groovy
+++ b/regression-test/suites/nereids_syntax_p0/distribute/local_shuffle.groovy
@@ -52,7 +52,7 @@ suite("local_shuffle") {
set force_to_local_shuffle=true;
"""
- order_qt_read_single_olap_table "select * from test_local_shuffle1"
+ order_qt_read_single_olap_table "select * from test_local_shuffle1 order
by id, id2"
order_qt_broadcast_join """
select *
@@ -96,7 +96,7 @@ suite("local_shuffle") {
) a
right outer join [shuffle]
test_local_shuffle2
- on a.id=test_local_shuffle2.id2
+ on a.id=test_local_shuffle2.id2 order by test_local_shuffle2.id,
test_local_shuffle2.id2
"""
order_qt_bucket_shuffle_with_prune_tablets2 """
@@ -109,7 +109,7 @@ suite("local_shuffle") {
from test_local_shuffle1
where id=1
) a
- on a.id=test_local_shuffle2.id2
+ on a.id=test_local_shuffle2.id2 order by test_local_shuffle2.id,
test_local_shuffle2.id2
"""
order_qt_bucket_shuffle_with_prune_tablets3 """
@@ -150,11 +150,11 @@ suite("local_shuffle") {
"""
order_qt_fillup_bucket """
- SELECT cast(a.c0 as int), cast(b.c0 as int) FROM
+ SELECT cast(a.c0 as int), cast(b.c0 as int) res FROM
(select * from test_local_shuffle3 where c0 =1)a
RIGHT OUTER JOIN
(select * from test_local_shuffle4)b
- ON a.c0 = b.c0
+ ON a.c0 = b.c0 order by res
"""
multi_sql """
@@ -182,6 +182,6 @@ suite("local_shuffle") {
) a
inner join [shuffle]
test_shuffle_left_with_local_shuffle b
- on a.id2=b.id;
+ on a.id2=b.id order by a.id2;
"""
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]