This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new fbe0e6e80e8 branch-3.0: [fix](local shuffle) Fix unbalanced data 
distribution #44137 (#44234)
fbe0e6e80e8 is described below

commit fbe0e6e80e8f4b0d046f4fe21fa9ca65f46c4398
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Nov 26 10:47:19 2024 +0800

    branch-3.0: [fix](local shuffle) Fix unbalanced data distribution #44137 
(#44234)
    
    Cherry-picked from #44137
    
    Co-authored-by: Gabriel <[email protected]>
---
 .../local_exchange/local_exchange_sink_operator.cpp         |  1 +
 be/src/pipeline/local_exchange/local_exchanger.cpp          | 13 -------------
 be/src/pipeline/local_exchange/local_exchanger.h            |  9 +++++----
 be/src/pipeline/pipeline_fragment_context.cpp               |  6 +++---
 4 files changed, 9 insertions(+), 20 deletions(-)

diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp 
b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
index 00fa7f5ae79..22007a4b220 100644
--- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
+++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
@@ -62,6 +62,7 @@ Status LocalExchangeSinkOperatorX::init(ExchangeType type, 
const int num_buckets
                 _num_partitions));
         RETURN_IF_ERROR(_partitioner->init(_texprs));
     } else if (_type == ExchangeType::BUCKET_HASH_SHUFFLE) {
+        DCHECK_GT(num_buckets, 0);
         _partitioner.reset(
                 new 
vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>(num_buckets));
         RETURN_IF_ERROR(_partitioner->init(_texprs));
diff --git a/be/src/pipeline/local_exchange/local_exchanger.cpp 
b/be/src/pipeline/local_exchange/local_exchanger.cpp
index c9f98db26a9..824843d970c 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.cpp
+++ b/be/src/pipeline/local_exchange/local_exchanger.cpp
@@ -226,19 +226,6 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, 
const uint32_t* __rest
                 new_block_wrapper->unref(local_state._shared_state, 
local_state._channel_id);
             }
         }
-    } else if (_num_senders != _num_sources) {
-        // In this branch, data just should be distributed equally into all 
instances.
-        new_block_wrapper->ref(_num_partitions);
-        for (size_t i = 0; i < _num_partitions; i++) {
-            uint32_t start = local_state._partition_rows_histogram[i];
-            uint32_t size = local_state._partition_rows_histogram[i + 1] - 
start;
-            if (size > 0) {
-                _enqueue_data_and_set_ready(i % _num_sources, local_state,
-                                            {new_block_wrapper, {row_idx, 
start, size}});
-            } else {
-                new_block_wrapper->unref(local_state._shared_state, 
local_state._channel_id);
-            }
-        }
     } else {
         DCHECK(!bucket_seq_to_instance_idx.empty());
         new_block_wrapper->ref(_num_partitions);
diff --git a/be/src/pipeline/local_exchange/local_exchanger.h 
b/be/src/pipeline/local_exchange/local_exchanger.h
index cc33efbb934..e8aa35c2f7c 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.h
+++ b/be/src/pipeline/local_exchange/local_exchanger.h
@@ -219,9 +219,7 @@ protected:
     ShuffleExchanger(int running_sink_operators, int num_sources, int 
num_partitions,
                      int free_block_limit)
             : Exchanger<PartitionedBlock>(running_sink_operators, num_sources, 
num_partitions,
-                                          free_block_limit) {
-        _data_queue.resize(num_partitions);
-    }
+                                          free_block_limit) {}
     Status _split_rows(RuntimeState* state, const uint32_t* __restrict 
channel_ids,
                        vectorized::Block* block, LocalExchangeSinkLocalState& 
local_state);
 };
@@ -231,7 +229,10 @@ class BucketShuffleExchanger final : public 
ShuffleExchanger {
     BucketShuffleExchanger(int running_sink_operators, int num_sources, int 
num_partitions,
                            int free_block_limit)
             : ShuffleExchanger(running_sink_operators, num_sources, 
num_partitions,
-                               free_block_limit) {}
+                               free_block_limit) {
+        DCHECK_GT(num_partitions, 0);
+        _data_queue.resize(std::max(num_partitions, num_sources));
+    }
     ~BucketShuffleExchanger() override = default;
     ExchangeType get_type() const override { return 
ExchangeType::BUCKET_HASH_SHUFFLE; }
 };
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index b7aa8518422..553e059d1a5 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -938,9 +938,9 @@ Status PipelineFragmentContext::_plan_local_exchange(
         // if 'num_buckets == 0' means the fragment is colocated by exchange 
node not the
         // scan node. so here use `_num_instance` to replace the `num_buckets` 
to prevent dividing 0
         // still keep colocate plan after local shuffle
-        RETURN_IF_ERROR(_plan_local_exchange(
-                _use_serial_source || num_buckets == 0 ? _num_instances : 
num_buckets, pip_idx,
-                _pipelines[pip_idx], bucket_seq_to_instance_idx, 
shuffle_idx_to_instance_idx));
+        RETURN_IF_ERROR(_plan_local_exchange(num_buckets, pip_idx, 
_pipelines[pip_idx],
+                                             bucket_seq_to_instance_idx,
+                                             shuffle_idx_to_instance_idx));
     }
     return Status::OK();
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to