This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 92e0221bf77 [branch-3.0](pick) pick #44421 #44459 #44545 (#44591)
92e0221bf77 is described below
commit 92e0221bf77c54e72f27d4f91dda33e3db1bd162
Author: Gabriel <[email protected]>
AuthorDate: Tue Nov 26 17:48:03 2024 +0800
[branch-3.0](pick) pick #44421 #44459 #44545 (#44591)
pick #44421 #44459 #44545
---
be/src/exprs/runtime_filter.cpp | 7 ++++---
be/src/pipeline/local_exchange/local_exchanger.h | 16 +++++++---------
be/src/pipeline/pipeline_fragment_context.cpp | 2 +-
be/src/runtime/fragment_mgr.cpp | 8 ++++----
.../src/main/java/org/apache/doris/qe/Coordinator.java | 14 ++++++--------
.../java/org/apache/doris/qe/NereidsCoordinator.java | 2 --
gensrc/thrift/PaloInternalService.thrift | 4 ++--
7 files changed, 24 insertions(+), 29 deletions(-)
diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index 78d13aac279..7c5de4891f5 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -1259,7 +1259,8 @@ void IRuntimeFilter::update_state() {
// In pipelineX, runtime filters will be ready or timeout before open
phase.
if (expected == RuntimeFilterState::NOT_READY) {
DCHECK(MonotonicMillis() - registration_time_ >= wait_times_ms);
- COUNTER_SET(_wait_timer, MonotonicMillis() - registration_time_);
+ COUNTER_SET(_wait_timer,
+ int64_t((MonotonicMillis() - registration_time_) *
NANOS_PER_MILLIS));
_rf_state_atomic = RuntimeFilterState::TIME_OUT;
}
}
@@ -1278,7 +1279,7 @@ PrimitiveType IRuntimeFilter::column_type() const {
void IRuntimeFilter::signal() {
DCHECK(is_consumer());
- COUNTER_SET(_wait_timer, MonotonicMillis() - registration_time_);
+ COUNTER_SET(_wait_timer, int64_t((MonotonicMillis() - registration_time_)
* NANOS_PER_MILLIS));
_rf_state_atomic.store(RuntimeFilterState::READY);
if (!_filter_timer.empty()) {
for (auto& timer : _filter_timer) {
@@ -1524,7 +1525,7 @@ void IRuntimeFilter::init_profile(RuntimeProfile*
parent_profile) {
void IRuntimeFilter::update_runtime_filter_type_to_profile(uint64_t
local_merge_time) {
_profile->add_info_string("RealRuntimeFilterType",
to_string(_wrapper->get_real_type()));
_profile->add_info_string("LocalMergeTime",
- std::to_string(local_merge_time / 1000000000.0)
+ " s");
+ std::to_string((double)local_merge_time /
NANOS_PER_SEC) + " s");
}
std::string IRuntimeFilter::debug_string() const {
diff --git a/be/src/pipeline/local_exchange/local_exchanger.h
b/be/src/pipeline/local_exchange/local_exchanger.h
index e8aa35c2f7c..af95e5348c8 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.h
+++ b/be/src/pipeline/local_exchange/local_exchanger.h
@@ -52,7 +52,7 @@ public:
ExchangerBase(int running_sink_operators, int num_sources, int
num_partitions,
int free_block_limit)
: _running_sink_operators(running_sink_operators),
- _running_source_operators(num_partitions),
+ _running_source_operators(num_sources),
_num_partitions(num_partitions),
_num_senders(running_sink_operators),
_num_sources(num_sources),
@@ -201,10 +201,13 @@ struct BlockWrapper {
class ShuffleExchanger : public Exchanger<PartitionedBlock> {
public:
ENABLE_FACTORY_CREATOR(ShuffleExchanger);
- ShuffleExchanger(int running_sink_operators, int num_partitions, int
free_block_limit)
- : Exchanger<PartitionedBlock>(running_sink_operators,
num_partitions,
+ ShuffleExchanger(int running_sink_operators, int num_sources, int
num_partitions,
+ int free_block_limit)
+ : Exchanger<PartitionedBlock>(running_sink_operators, num_sources,
num_partitions,
free_block_limit) {
- _data_queue.resize(num_partitions);
+ DCHECK_GT(num_partitions, 0);
+ DCHECK_GT(num_sources, 0);
+ _data_queue.resize(num_sources);
}
~ShuffleExchanger() override = default;
Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos,
@@ -216,10 +219,6 @@ public:
ExchangeType get_type() const override { return
ExchangeType::HASH_SHUFFLE; }
protected:
- ShuffleExchanger(int running_sink_operators, int num_sources, int
num_partitions,
- int free_block_limit)
- : Exchanger<PartitionedBlock>(running_sink_operators, num_sources,
num_partitions,
- free_block_limit) {}
Status _split_rows(RuntimeState* state, const uint32_t* __restrict
channel_ids,
vectorized::Block* block, LocalExchangeSinkLocalState&
local_state);
};
@@ -231,7 +230,6 @@ class BucketShuffleExchanger final : public
ShuffleExchanger {
: ShuffleExchanger(running_sink_operators, num_sources,
num_partitions,
free_block_limit) {
DCHECK_GT(num_partitions, 0);
- _data_queue.resize(std::max(num_partitions, num_sources));
}
~BucketShuffleExchanger() override = default;
ExchangeType get_type() const override { return
ExchangeType::BUCKET_HASH_SHUFFLE; }
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index 553e059d1a5..01c14f1ddb3 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -747,7 +747,7 @@ Status PipelineFragmentContext::_add_local_exchange_impl(
switch (data_distribution.distribution_type) {
case ExchangeType::HASH_SHUFFLE:
shared_state->exchanger = ShuffleExchanger::create_unique(
- std::max(cur_pipe->num_tasks(), _num_instances),
+ std::max(cur_pipe->num_tasks(), _num_instances),
_num_instances,
use_global_hash_shuffle ? _total_instances : _num_instances,
_runtime_state->query_options().__isset.local_exchange_free_blocks_limit
?
_runtime_state->query_options().local_exchange_free_blocks_limit
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index fbbc079c30a..81f2416d3b3 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -1261,8 +1261,8 @@ Status FragmentMgr::sync_filter_size(const
PSyncFilterSizeRequest* request) {
if (auto q_ctx = _get_or_erase_query_ctx(query_id)) {
query_ctx = q_ctx;
} else {
- return Status::InvalidArgument(
- "Sync filter size failed: Query context (query-id: {}) not
found",
+ return Status::EndOfFile(
+ "Sync filter size failed: Query context (query-id: {})
already finished",
queryid.to_string());
}
}
@@ -1282,8 +1282,8 @@ Status FragmentMgr::merge_filter(const
PMergeFilterRequest* request,
if (auto q_ctx = _get_or_erase_query_ctx(query_id)) {
query_ctx = q_ctx;
} else {
- return Status::InvalidArgument(
- "Merge filter size failed: Query context (query-id: {})
not found",
+ return Status::EndOfFile(
+ "Merge filter size failed: Query context (query-id: {})
already finished",
queryid.to_string());
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 8be50772094..ca8a397bd5a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -1938,7 +1938,6 @@ public class Coordinator implements CoordInterface {
FInstanceExecParam instanceParam = new
FInstanceExecParam(null, key, 0, params);
instanceParam.perNodeScanRanges.put(planNodeId,
scanRangeParams);
- instanceParam.perNodeSharedScans.put(planNodeId,
sharedScan);
params.instanceExecParams.add(instanceParam);
}
params.ignoreDataDistribution = sharedScan;
@@ -2760,13 +2759,11 @@ public class Coordinator implements CoordInterface {
null, addressScanRange.getKey(), 0, params);
for (Pair<Integer, Map<Integer, List<TScanRangeParams>>>
nodeScanRangeMap : scanRange) {
- instanceParam.addBucketSeq(nodeScanRangeMap.first);
for (Map.Entry<Integer, List<TScanRangeParams>>
nodeScanRange
: nodeScanRangeMap.second.entrySet()) {
if
(!instanceParam.perNodeScanRanges.containsKey(nodeScanRange.getKey())) {
range.put(nodeScanRange.getKey(),
Lists.newArrayList());
instanceParam.perNodeScanRanges.put(nodeScanRange.getKey(),
Lists.newArrayList());
-
instanceParam.perNodeSharedScans.put(nodeScanRange.getKey(), true);
}
range.get(nodeScanRange.getKey()).addAll(nodeScanRange.getValue());
instanceParam.perNodeScanRanges.get(nodeScanRange.getKey())
@@ -2778,6 +2775,12 @@ public class Coordinator implements CoordInterface {
params.instanceExecParams.add(new FInstanceExecParam(
null, addressScanRange.getKey(), 0, params));
}
+ int index = 0;
+ for (Pair<Integer, Map<Integer, List<TScanRangeParams>>>
nodeScanRangeMap : scanRange) {
+ params.instanceExecParams.get(index %
params.instanceExecParams.size())
+ .addBucketSeq(nodeScanRangeMap.first);
+ index++;
+ }
} else {
int expectedInstanceNum = 1;
if (parallelExecInstanceNum > 1) {
@@ -3111,10 +3114,8 @@ public class Coordinator implements CoordInterface {
for (int i = 0; i < instanceExecParams.size(); ++i) {
final FInstanceExecParam instanceExecParam =
instanceExecParams.get(i);
Map<Integer, List<TScanRangeParams>> scanRanges =
instanceExecParam.perNodeScanRanges;
- Map<Integer, Boolean> perNodeSharedScans =
instanceExecParam.perNodeSharedScans;
if (scanRanges == null) {
scanRanges = Maps.newHashMap();
- perNodeSharedScans = Maps.newHashMap();
}
if (!res.containsKey(instanceExecParam.host)) {
TPipelineFragmentParams params = new
TPipelineFragmentParams();
@@ -3142,7 +3143,6 @@ public class Coordinator implements CoordInterface {
params.setFileScanParams(fileScanRangeParamsMap);
params.setNumBuckets(fragment.getBucketNum());
- params.setPerNodeSharedScans(perNodeSharedScans);
params.setTotalInstances(instanceExecParams.size());
if (ignoreDataDistribution) {
params.setParallelInstances(parallelTasksNum);
@@ -3167,7 +3167,6 @@ public class Coordinator implements CoordInterface {
localParams.setFragmentInstanceId(instanceExecParam.instanceId);
localParams.setPerNodeScanRanges(scanRanges);
- localParams.setPerNodeSharedScans(perNodeSharedScans);
localParams.setSenderId(i);
localParams.setBackendNum(backendNum++);
localParams.setRuntimeFilterParams(new TRuntimeFilterParams());
@@ -3315,7 +3314,6 @@ public class Coordinator implements CoordInterface {
TUniqueId instanceId;
TNetworkAddress host;
Map<Integer, List<TScanRangeParams>> perNodeScanRanges =
Maps.newHashMap();
- Map<Integer, Boolean> perNodeSharedScans = Maps.newHashMap();
int perFragmentInstanceIdx;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/qe/NereidsCoordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/NereidsCoordinator.java
index 4f5af3762c5..6cb42a116e6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/NereidsCoordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/NereidsCoordinator.java
@@ -140,7 +140,6 @@ public class NereidsCoordinator extends Coordinator {
ScanNode scanNode = scanNodeIdToReplicaIds.getKey();
ScanRanges scanReplicas = scanNodeIdToReplicaIds.getValue();
instanceExecParam.perNodeScanRanges.put(scanNode.getId().asInt(),
scanReplicas.params);
- instanceExecParam.perNodeSharedScans.put(scanNode.getId().asInt(),
isShareScan);
}
}
@@ -157,7 +156,6 @@ public class NereidsCoordinator extends Coordinator {
List<TScanRangeParams> scanBucketTablets =
instanceExecParam.perNodeScanRanges.computeIfAbsent(
scanNode.getId().asInt(), id -> Lists.newArrayList());
scanBucketTablets.addAll(scanRanges.params);
-
instanceExecParam.perNodeSharedScans.put(scanNode.getId().asInt(), isShareScan);
if (scanNode instanceof OlapScanNode) {
OlapScanNode olapScanNode = (OlapScanNode) scanNode;
diff --git a/gensrc/thrift/PaloInternalService.thrift
b/gensrc/thrift/PaloInternalService.thrift
index 11924f7ba8b..f3f9e7668d1 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -772,7 +772,7 @@ struct TPipelineInstanceParams {
4: optional i32 sender_id
5: optional TRuntimeFilterParams runtime_filter_params
6: optional i32 backend_num
- 7: optional map<Types.TPlanNodeId, bool> per_node_shared_scans
+ 7: optional map<Types.TPlanNodeId, bool> per_node_shared_scans // deprecated
8: optional list<i32> topn_filter_source_node_ids // deprecated after we set
topn_filter_descs
9: optional list<PlanNodes.TTopnFilterDesc> topn_filter_descs
}
@@ -816,7 +816,7 @@ struct TPipelineFragmentParams {
33: optional i32 num_local_sink
34: optional i32 num_buckets
35: optional map<i32, i32> bucket_seq_to_instance_idx
- 36: optional map<Types.TPlanNodeId, bool> per_node_shared_scans
+ 36: optional map<Types.TPlanNodeId, bool> per_node_shared_scans // deprecated
37: optional i32 parallel_instances
38: optional i32 total_instances
39: optional map<i32, i32> shuffle_idx_to_instance_idx
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]