This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new dae604b7964 [pipelineX](improvement) Adjust local exchange strategy
(#29915)
dae604b7964 is described below
commit dae604b7964d038ae78336b89ce97b743e81e6e5
Author: Gabriel <[email protected]>
AuthorDate: Sat Jan 13 03:46:41 2024 +0800
[pipelineX](improvement) Adjust local exchange strategy (#29915)
---
.../exec/distinct_streaming_aggregation_sink_operator.h | 4 ++++
be/src/pipeline/exec/hashjoin_build_sink.h | 4 ++++
be/src/pipeline/exec/hashjoin_probe_operator.h | 4 ++++
.../local_exchange/local_exchange_sink_operator.h | 15 +++++++++++++--
.../pipeline_x/local_exchange/local_exchanger.cpp | 3 +++
be/src/pipeline/pipeline_x/operator.h | 12 +++++++++---
.../pipeline/pipeline_x/pipeline_x_fragment_context.cpp | 5 ++++-
7 files changed, 41 insertions(+), 6 deletions(-)
diff --git
a/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.h
b/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.h
index d62178460ea..6607516d6cb 100644
--- a/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.h
@@ -110,6 +110,10 @@ public:
Status init(const TPlanNode& tnode, RuntimeState* state) override;
Status sink(RuntimeState* state, vectorized::Block* in_block,
SourceState source_state) override;
+
+ DataDistribution required_data_distribution() const override {
+ return
DataSinkOperatorX<DistinctStreamingAggSinkLocalState>::required_data_distribution();
+ }
};
} // namespace pipeline
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h
b/be/src/pipeline/exec/hashjoin_build_sink.h
index 3c1b772b30a..8420719330c 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.h
+++ b/be/src/pipeline/exec/hashjoin_build_sink.h
@@ -165,6 +165,10 @@ public:
: DataDistribution(ExchangeType::HASH_SHUFFLE,
_partition_exprs);
}
+ bool is_shuffled_hash_join() const override {
+ return _join_distribution == TJoinDistributionType::PARTITIONED;
+ }
+
private:
friend class HashJoinBuildSinkLocalState;
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h
b/be/src/pipeline/exec/hashjoin_probe_operator.h
index 093884b6d0f..ac7954af13b 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.h
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.h
@@ -172,6 +172,10 @@ public:
:
DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs));
}
+ bool is_shuffled_hash_join() const override {
+ return _join_distribution == TJoinDistributionType::PARTITIONED;
+ }
+
private:
Status _do_evaluate(vectorized::Block& block,
vectorized::VExprContextSPtrs& exprs,
RuntimeProfile::Counter& expr_call_timer,
diff --git
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h
index daf75c966af..7275e545205 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h
@@ -102,10 +102,21 @@ public:
return Status::InternalError("{} should not init with TPlanNode",
Base::_name);
}
- Status init(ExchangeType type, int num_buckets) override {
+ Status init(ExchangeType type, const int num_buckets,
+ const bool is_shuffled_hash_join) override {
_name = "LOCAL_EXCHANGE_SINK_OPERATOR (" +
get_exchange_type_name(type) + ")";
_type = type;
if (_type == ExchangeType::HASH_SHUFFLE) {
+ // For shuffle join, if data distribution has been broken by
previous operator, we
+ // should use a HASH_SHUFFLE local exchanger to shuffle data
again. To be mentioned,
+ // we should use map shuffle idx to instance idx because all
instances will be
+ // distributed to all BEs. Otherwise, we should use shuffle idx
directly.
+ if (!is_shuffled_hash_join) {
+ _shuffle_idx_to_instance_idx.clear();
+ for (int i = 0; i < _num_partitions; i++) {
+ _shuffle_idx_to_instance_idx.insert({i, i});
+ }
+ }
_partitioner.reset(
new
vectorized::Crc32HashPartitioner<LocalExchangeChannelIds>(_num_partitions));
RETURN_IF_ERROR(_partitioner->init(_texprs));
@@ -145,7 +156,7 @@ private:
const std::vector<TExpr>& _texprs;
std::unique_ptr<vectorized::PartitionerBase> _partitioner;
const std::map<int, int> _bucket_seq_to_instance_idx;
- const std::map<int, int> _shuffle_idx_to_instance_idx;
+ std::map<int, int> _shuffle_idx_to_instance_idx;
};
} // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
index 602020c4882..900e31e6631 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
@@ -121,6 +121,9 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state,
const uint32_t* __rest
auto map = local_state._parent->cast<LocalExchangeSinkOperatorX>()
._shuffle_idx_to_instance_idx;
for (size_t i = 0; i < _num_partitions; i++) {
+ DCHECK(map.contains(i)) << " i: " << i << " _num_partitions: " <<
_num_partitions
+ << " map.size(): " << map.size();
+ DCHECK(map[i] >= 0 && map[i] < _num_partitions) << map[i] << " "
<< _num_partitions;
size_t start = local_state._partition_rows_histogram[i];
size_t size = local_state._partition_rows_histogram[i + 1] - start;
if (size > 0) {
diff --git a/be/src/pipeline/pipeline_x/operator.h
b/be/src/pipeline/pipeline_x/operator.h
index 5304d0074f6..6792ce35f36 100644
--- a/be/src/pipeline/pipeline_x/operator.h
+++ b/be/src/pipeline/pipeline_x/operator.h
@@ -206,6 +206,8 @@ public:
[[nodiscard]] virtual bool can_terminate_early(RuntimeState* state) {
return false; }
+ [[nodiscard]] virtual bool is_shuffled_hash_join() const { return false; }
+
bool can_read() override {
LOG(FATAL) << "should not reach here!";
return false;
@@ -467,14 +469,16 @@ public:
virtual Status init(const TPlanNode& tnode, RuntimeState* state);
Status init(const TDataSink& tsink) override;
- virtual Status init(ExchangeType type, int num_buckets) {
+ [[nodiscard]] virtual Status init(ExchangeType type, const int num_buckets,
+ const bool is_shuffled_hash_join) {
return Status::InternalError("init() is only implemented in local
exchange!");
}
Status prepare(RuntimeState* state) override { return Status::OK(); }
Status open(RuntimeState* state) override { return Status::OK(); }
- virtual Status setup_local_state(RuntimeState* state, LocalSinkStateInfo&
info) = 0;
+ [[nodiscard]] virtual Status setup_local_state(RuntimeState* state,
+ LocalSinkStateInfo& info) =
0;
template <class TARGET>
TARGET& cast() {
@@ -492,12 +496,14 @@ public:
}
virtual void get_dependency(std::vector<DependencySPtr>& dependency,
QueryContext* ctx) = 0;
- virtual DataDistribution required_data_distribution() const {
+ [[nodiscard]] virtual DataDistribution required_data_distribution() const {
return _child_x && _child_x->ignore_data_distribution()
? DataDistribution(ExchangeType::PASSTHROUGH)
: DataDistribution(ExchangeType::NOOP);
}
+ [[nodiscard]] virtual bool is_shuffled_hash_join() const { return false; }
+
Status close(RuntimeState* state) override {
return Status::InternalError("Should not reach here!");
}
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index ffaccebe898..b479e1d9334 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -732,7 +732,10 @@ Status PipelineXFragmentContext::_add_local_exchange_impl(
sink_id, local_exchange_id, _total_instances,
data_distribution.partition_exprs,
bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx));
RETURN_IF_ERROR(new_pip->set_sink(sink));
-
RETURN_IF_ERROR(new_pip->sink_x()->init(data_distribution.distribution_type,
num_buckets));
+
RETURN_IF_ERROR(new_pip->sink_x()->init(data_distribution.distribution_type,
num_buckets,
+ operator_xs.size() > idx
+ ?
operator_xs[idx]->is_shuffled_hash_join()
+ :
cur_pipe->sink_x()->is_shuffled_hash_join()));
// 2. Create and initialize LocalExchangeSharedState.
auto shared_state = LocalExchangeSharedState::create_shared();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]