This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 4d6d938ea75 [pipelineX](fix) Fix correctness problem due to local hash
shuffle (#29881)
4d6d938ea75 is described below
commit 4d6d938ea75eaed8591f5b682843cea252e7912f
Author: Gabriel <[email protected]>
AuthorDate: Thu Jan 11 22:28:00 2024 +0800
[pipelineX](fix) Fix correctness problem due to local hash shuffle (#29881)
---
.../local_exchange/local_exchange_sink_operator.h | 7 ++--
.../pipeline_x/local_exchange/local_exchanger.cpp | 8 +++--
.../pipeline_x/pipeline_x_fragment_context.cpp | 38 ++++++++++++++--------
.../pipeline_x/pipeline_x_fragment_context.h | 8 ++++-
.../main/java/org/apache/doris/qe/Coordinator.java | 14 +++++++-
gensrc/thrift/PaloInternalService.thrift | 2 ++
6 files changed, 56 insertions(+), 21 deletions(-)
diff --git
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h
index 87afb2b098d..daf75c966af 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h
@@ -86,11 +86,13 @@ public:
using Base = DataSinkOperatorX<LocalExchangeSinkLocalState>;
LocalExchangeSinkOperatorX(int sink_id, int dest_id, int num_partitions,
const std::vector<TExpr>& texprs,
- const std::map<int, int>&
bucket_seq_to_instance_idx)
+ const std::map<int, int>&
bucket_seq_to_instance_idx,
+ const std::map<int, int>&
shuffle_idx_to_instance_idx)
: Base(sink_id, dest_id, dest_id),
_num_partitions(num_partitions),
_texprs(texprs),
- _bucket_seq_to_instance_idx(bucket_seq_to_instance_idx) {}
+ _bucket_seq_to_instance_idx(bucket_seq_to_instance_idx),
+ _shuffle_idx_to_instance_idx(shuffle_idx_to_instance_idx) {}
Status init(const TPlanNode& tnode, RuntimeState* state) override {
return Status::InternalError("{} should not init with TPlanNode",
Base::_name);
@@ -143,6 +145,7 @@ private:
const std::vector<TExpr>& _texprs;
std::unique_ptr<vectorized::PartitionerBase> _partitioner;
const std::map<int, int> _bucket_seq_to_instance_idx;
+ const std::map<int, int> _shuffle_idx_to_instance_idx;
};
} // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
index b2f1bb00ebe..602020c4882 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
@@ -118,14 +118,16 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state,
const uint32_t* __rest
local_state._shared_state->add_total_mem_usage(new_block_wrapper->data_block.allocated_bytes());
new_block_wrapper->ref(_num_partitions);
if (get_type() == ExchangeType::HASH_SHUFFLE) {
+ auto map = local_state._parent->cast<LocalExchangeSinkOperatorX>()
+ ._shuffle_idx_to_instance_idx;
for (size_t i = 0; i < _num_partitions; i++) {
size_t start = local_state._partition_rows_histogram[i];
size_t size = local_state._partition_rows_histogram[i + 1] - start;
if (size > 0) {
local_state._shared_state->add_mem_usage(
- i, new_block_wrapper->data_block.allocated_bytes(),
false);
- data_queue[i].enqueue({new_block_wrapper, {row_idx, start,
size}});
- local_state._shared_state->set_ready_to_read(i);
+ map[i],
new_block_wrapper->data_block.allocated_bytes(), false);
+ data_queue[map[i]].enqueue({new_block_wrapper, {row_idx,
start, size}});
+ local_state._shared_state->set_ready_to_read(map[i]);
} else {
new_block_wrapper->unref(local_state._shared_state);
}
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index 4cc00312697..ffaccebe898 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -163,6 +163,7 @@ Status PipelineXFragmentContext::prepare(const
doris::TPipelineFragmentParams& r
return Status::InternalError("Already prepared");
}
_num_instances = request.local_params.size();
+ _total_instances = request.__isset.total_instances ?
request.total_instances : _num_instances;
_runtime_profile.reset(new RuntimeProfile("PipelineContext"));
_prepare_timer = ADD_TIMER(_runtime_profile, "PrepareTime");
SCOPED_TIMER(_prepare_timer);
@@ -235,8 +236,9 @@ Status PipelineXFragmentContext::prepare(const
doris::TPipelineFragmentParams& r
static_cast<void>(pipeline->sink_x()->set_child(pipeline->operator_xs().back()));
}
if (_enable_local_shuffle()) {
- RETURN_IF_ERROR(
- _plan_local_exchange(request.num_buckets,
request.bucket_seq_to_instance_idx));
+ RETURN_IF_ERROR(_plan_local_exchange(request.num_buckets,
+
request.bucket_seq_to_instance_idx,
+
request.shuffle_idx_to_instance_idx));
}
// 4. Initialize global states in pipelines.
for (PipelinePtr& pipeline : _pipelines) {
@@ -254,7 +256,8 @@ Status PipelineXFragmentContext::prepare(const
doris::TPipelineFragmentParams& r
}
Status PipelineXFragmentContext::_plan_local_exchange(
- int num_buckets, const std::map<int, int>& bucket_seq_to_instance_idx)
{
+ int num_buckets, const std::map<int, int>& bucket_seq_to_instance_idx,
+ const std::map<int, int>& shuffle_idx_to_instance_idx) {
for (int pip_idx = _pipelines.size() - 1; pip_idx >= 0; pip_idx--) {
_pipelines[pip_idx]->init_data_distribution();
// Set property if child pipeline is not join operator's child.
@@ -274,6 +277,7 @@ Status PipelineXFragmentContext::_plan_local_exchange(
? _num_instances
: num_buckets,
pip_idx, _pipelines[pip_idx], bucket_seq_to_instance_idx,
+ shuffle_idx_to_instance_idx,
_pipelines[pip_idx]->operator_xs().front()->ignore_data_hash_distribution()));
}
return Status::OK();
@@ -282,6 +286,7 @@ Status PipelineXFragmentContext::_plan_local_exchange(
Status PipelineXFragmentContext::_plan_local_exchange(
int num_buckets, int pip_idx, PipelinePtr pip,
const std::map<int, int>& bucket_seq_to_instance_idx,
+ const std::map<int, int>& shuffle_idx_to_instance_idx,
const bool ignore_data_hash_distribution) {
int idx = 1;
bool do_local_exchange = false;
@@ -294,7 +299,8 @@ Status PipelineXFragmentContext::_plan_local_exchange(
RETURN_IF_ERROR(_add_local_exchange(
pip_idx, idx, ops[idx]->node_id(),
_runtime_state->obj_pool(), pip,
ops[idx]->required_data_distribution(),
&do_local_exchange, num_buckets,
- bucket_seq_to_instance_idx,
ignore_data_hash_distribution));
+ bucket_seq_to_instance_idx,
shuffle_idx_to_instance_idx,
+ ignore_data_hash_distribution));
}
if (do_local_exchange) {
// If local exchange is needed for current operator, we will
split this pipeline to
@@ -311,7 +317,8 @@ Status PipelineXFragmentContext::_plan_local_exchange(
RETURN_IF_ERROR(_add_local_exchange(
pip_idx, idx, pip->sink_x()->node_id(),
_runtime_state->obj_pool(), pip,
pip->sink_x()->required_data_distribution(),
&do_local_exchange, num_buckets,
- bucket_seq_to_instance_idx, ignore_data_hash_distribution));
+ bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx,
+ ignore_data_hash_distribution));
}
return Status::OK();
}
@@ -713,6 +720,7 @@ Status PipelineXFragmentContext::_add_local_exchange_impl(
int idx, ObjectPool* pool, PipelinePtr cur_pipe, PipelinePtr new_pip,
DataDistribution data_distribution, bool* do_local_exchange, int
num_buckets,
const std::map<int, int>& bucket_seq_to_instance_idx,
+ const std::map<int, int>& shuffle_idx_to_instance_idx,
const bool ignore_data_hash_distribution) {
auto& operator_xs = cur_pipe->operator_xs();
const auto downstream_pipeline_id = cur_pipe->id();
@@ -720,9 +728,9 @@ Status PipelineXFragmentContext::_add_local_exchange_impl(
// 1. Create a new pipeline with local exchange sink.
DataSinkOperatorXPtr sink;
auto sink_id = next_sink_operator_id();
- sink.reset(new LocalExchangeSinkOperatorX(sink_id, local_exchange_id,
_num_instances,
-
data_distribution.partition_exprs,
- bucket_seq_to_instance_idx));
+ sink.reset(new LocalExchangeSinkOperatorX(
+ sink_id, local_exchange_id, _total_instances,
data_distribution.partition_exprs,
+ bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx));
RETURN_IF_ERROR(new_pip->set_sink(sink));
RETURN_IF_ERROR(new_pip->sink_x()->init(data_distribution.distribution_type,
num_buckets));
@@ -731,7 +739,7 @@ Status PipelineXFragmentContext::_add_local_exchange_impl(
switch (data_distribution.distribution_type) {
case ExchangeType::HASH_SHUFFLE:
shared_state->exchanger = ShuffleExchanger::create_unique(
- std::max(cur_pipe->num_tasks(), _num_instances),
_num_instances);
+ std::max(cur_pipe->num_tasks(), _num_instances),
_total_instances);
break;
case ExchangeType::BUCKET_HASH_SHUFFLE:
shared_state->exchanger = BucketShuffleExchanger::create_unique(
@@ -826,7 +834,9 @@ Status PipelineXFragmentContext::_add_local_exchange_impl(
Status PipelineXFragmentContext::_add_local_exchange(
int pip_idx, int idx, int node_id, ObjectPool* pool, PipelinePtr
cur_pipe,
DataDistribution data_distribution, bool* do_local_exchange, int
num_buckets,
- const std::map<int, int>& bucket_seq_to_instance_idx, const bool
ignore_data_distribution) {
+ const std::map<int, int>& bucket_seq_to_instance_idx,
+ const std::map<int, int>& shuffle_idx_to_instance_idx,
+ const bool ignore_data_distribution) {
DCHECK(_enable_local_shuffle());
if (_num_instances <= 1) {
return Status::OK();
@@ -840,9 +850,9 @@ Status PipelineXFragmentContext::_add_local_exchange(
auto& operator_xs = cur_pipe->operator_xs();
auto total_op_num = operator_xs.size();
auto new_pip = add_pipeline(cur_pipe, pip_idx + 1);
- RETURN_IF_ERROR(_add_local_exchange_impl(idx, pool, cur_pipe, new_pip,
data_distribution,
- do_local_exchange, num_buckets,
- bucket_seq_to_instance_idx,
ignore_data_distribution));
+ RETURN_IF_ERROR(_add_local_exchange_impl(
+ idx, pool, cur_pipe, new_pip, data_distribution,
do_local_exchange, num_buckets,
+ bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx,
ignore_data_distribution));
CHECK(total_op_num + 1 == cur_pipe->operator_xs().size() +
new_pip->operator_xs().size())
<< "total_op_num: " << total_op_num
@@ -855,7 +865,7 @@ Status PipelineXFragmentContext::_add_local_exchange(
RETURN_IF_ERROR(_add_local_exchange_impl(
new_pip->operator_xs().size(), pool, new_pip,
add_pipeline(new_pip, pip_idx + 2),
DataDistribution(ExchangeType::PASSTHROUGH),
do_local_exchange, num_buckets,
- bucket_seq_to_instance_idx, ignore_data_distribution));
+ bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx,
ignore_data_distribution));
}
return Status::OK();
}
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
index 34320d64f38..92178d359d9 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
@@ -126,6 +126,7 @@ private:
PipelinePtr cur_pipe, DataDistribution
data_distribution,
bool* do_local_exchange, int num_buckets,
const std::map<int, int>&
bucket_seq_to_instance_idx,
+ const std::map<int, int>&
shuffle_idx_to_instance_idx,
const bool ignore_data_distribution);
void _inherit_pipeline_properties(const DataDistribution&
data_distribution,
PipelinePtr pipe_with_source,
PipelinePtr pipe_with_sink);
@@ -133,6 +134,7 @@ private:
PipelinePtr new_pipe, DataDistribution
data_distribution,
bool* do_local_exchange, int num_buckets,
const std::map<int, int>&
bucket_seq_to_instance_idx,
+ const std::map<int, int>&
shuffle_idx_to_instance_idx,
const bool ignore_data_distribution);
[[nodiscard]] Status _build_pipelines(ObjectPool* pool,
@@ -160,9 +162,11 @@ private:
RuntimeState* state, DescriptorTbl& desc_tbl,
PipelineId cur_pipeline_id);
Status _plan_local_exchange(int num_buckets,
- const std::map<int, int>&
bucket_seq_to_instance_idx);
+ const std::map<int, int>&
bucket_seq_to_instance_idx,
+ const std::map<int, int>&
shuffle_idx_to_instance_idx);
Status _plan_local_exchange(int num_buckets, int pip_idx, PipelinePtr pip,
const std::map<int, int>&
bucket_seq_to_instance_idx,
+ const std::map<int, int>&
shuffle_idx_to_instance_idx,
const bool ignore_data_distribution);
bool _has_inverted_index_or_partial_update(TOlapTableSink sink);
@@ -239,6 +243,8 @@ private:
std::vector<std::unique_ptr<RuntimeState>> _task_runtime_states;
std::vector<std::unique_ptr<RuntimeFilterParamsContext>>
_runtime_filter_states;
+
+ int _total_instances = -1;
};
} // namespace pipeline
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 12be2b2d8ec..5e2d0ef85c8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -1611,6 +1611,7 @@ public class Coordinator implements CoordInterface {
dest.fragment_instance_id =
instanceExecParams.instanceId;
dest.server = toRpcHost(instanceExecParams.host);
dest.setBrpcServer(toBrpcHost(instanceExecParams.host));
+ instanceExecParams.recvrId =
params.destinations.size();
break;
}
}
@@ -1630,6 +1631,7 @@ public class Coordinator implements CoordInterface {
destHosts.put(param.host, param);
param.buildHashTableForBroadcastJoin = true;
TPlanFragmentDestination dest = new
TPlanFragmentDestination();
+ param.recvrId = params.destinations.size();
dest.fragment_instance_id = param.instanceId;
try {
dest.server = toRpcHost(param.host);
@@ -1653,6 +1655,7 @@ public class Coordinator implements CoordInterface {
dest.fragment_instance_id =
destParams.instanceExecParams.get(j).instanceId;
dest.server =
toRpcHost(destParams.instanceExecParams.get(j).host);
dest.setBrpcServer(toBrpcHost(destParams.instanceExecParams.get(j).host));
+ destParams.instanceExecParams.get(j).recvrId =
params.destinations.size();
params.destinations.add(dest);
}
}
@@ -1732,6 +1735,7 @@ public class Coordinator implements CoordInterface {
dest.fragment_instance_id =
instanceExecParams.instanceId;
dest.server =
toRpcHost(instanceExecParams.host);
dest.setBrpcServer(toBrpcHost(instanceExecParams.host));
+ instanceExecParams.recvrId =
params.destinations.size();
break;
}
}
@@ -1752,6 +1756,7 @@ public class Coordinator implements CoordInterface {
param.buildHashTableForBroadcastJoin = true;
TPlanFragmentDestination dest = new
TPlanFragmentDestination();
dest.fragment_instance_id = param.instanceId;
+ param.recvrId = params.destinations.size();
try {
dest.server = toRpcHost(param.host);
dest.setBrpcServer(toBrpcHost(param.host));
@@ -1773,6 +1778,7 @@ public class Coordinator implements CoordInterface {
dest.fragment_instance_id =
destParams.instanceExecParams.get(j).instanceId;
dest.server =
toRpcHost(destParams.instanceExecParams.get(j).host);
dest.brpc_server =
toBrpcHost(destParams.instanceExecParams.get(j).host);
+ destParams.instanceExecParams.get(j).recvrId =
params.destinations.size();
destinations.add(dest);
}
}
@@ -3755,22 +3761,26 @@ public class Coordinator implements CoordInterface {
params.setFileScanParams(fileScanRangeParamsMap);
params.setNumBuckets(fragment.getBucketNum());
params.setPerNodeSharedScans(perNodeSharedScans);
+ params.setTotalInstances(instanceExecParams.size());
if (ignoreDataDistribution) {
params.setParallelInstances(parallelTasksNum);
}
res.put(instanceExecParam.host, params);
res.get(instanceExecParam.host).setBucketSeqToInstanceIdx(new HashMap<Integer,
Integer>());
+
res.get(instanceExecParam.host).setShuffleIdxToInstanceIdx(new HashMap<Integer,
Integer>());
instanceIdx.put(instanceExecParam.host, 0);
}
// Set each bucket belongs to which instance on this BE.
// This is used for LocalExchange(BUCKET_HASH_SHUFFLE).
int instanceId = instanceIdx.get(instanceExecParam.host);
+
for (int bucket : instanceExecParam.bucketSeqSet) {
res.get(instanceExecParam.host).getBucketSeqToInstanceIdx().put(bucket,
instanceId);
-
}
instanceIdx.replace(instanceExecParam.host, ++instanceId);
TPipelineFragmentParams params =
res.get(instanceExecParam.host);
+
res.get(instanceExecParam.host).getShuffleIdxToInstanceIdx().put(instanceExecParam.recvrId,
+ params.getLocalParams().size());
TPipelineInstanceParams localParams = new
TPipelineInstanceParams();
localParams.setBuildHashTableForBroadcastJoin(instanceExecParam.buildHashTableForBroadcastJoin);
@@ -3919,6 +3929,8 @@ public class Coordinator implements CoordInterface {
boolean buildHashTableForBroadcastJoin = false;
+ int recvrId = -1;
+
List<TUniqueId> instancesSharingHashTable = Lists.newArrayList();
public void addBucketSeq(int bucketSeq) {
diff --git a/gensrc/thrift/PaloInternalService.thrift
b/gensrc/thrift/PaloInternalService.thrift
index 818643f6ba6..7559451e373 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -715,6 +715,8 @@ struct TPipelineFragmentParams {
35: optional map<i32, i32> bucket_seq_to_instance_idx
36: optional map<Types.TPlanNodeId, bool> per_node_shared_scans
37: optional i32 parallel_instances
+ 38: optional i32 total_instances
+ 39: optional map<i32, i32> shuffle_idx_to_instance_idx
// For cloud
1000: optional bool is_mow_table;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]