This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit e841d82ffba577b26b1bc07e83856d188ede7a24
Author: Qi Chen <[email protected]>
AuthorDate: Fri Apr 12 13:09:44 2024 +0800

    [Enhancement](hive-writer) Adjust table sink exchange rebalancer params. 
(#33397)
    
    Issue Number:  #31442
    
    Change table sink exchange rebalancer params to node level and adjust these 
params to improve write performance by better balance.
    
    rebalancer params:
    ```
    
DEFINE_mInt64(table_sink_partition_write_min_data_processed_rebalance_threshold,
                  "26214400"); // 25MB
    // Minimum partition data processed to rebalance writers in exchange when 
partition writing
    
DEFINE_mInt64(table_sink_partition_write_min_partition_data_processed_rebalance_threshold,
                  "15728640"); // 15MB
    ```
---
 be/src/common/config.cpp                             | 13 +++++++------
 be/src/common/config.h                               |  8 ++++----
 be/src/pipeline/exec/exchange_sink_operator.cpp      | 20 +++++++++++++-------
 .../pipeline_x/pipeline_x_fragment_context.cpp       |  1 +
 be/src/runtime/runtime_state.h                       |  5 +++++
 5 files changed, 30 insertions(+), 17 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 17a5587f073..dd5d68dfbf0 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1184,14 +1184,15 @@ DEFINE_mString(ca_cert_file_paths,
                "/etc/ssl/ca-bundle.pem");
 
 /** Table sink configurations(currently contains only external table types) **/
-// Minimum data processed to scale writers when non partition writing
+// Minimum data processed to scale writers in exchange when non partition 
writing
 DEFINE_mInt64(table_sink_non_partition_write_scaling_data_processed_threshold,
-              "125829120"); // 120MB
-// Minimum data processed to start rebalancing in exchange when partition 
writing
-DEFINE_mInt64(table_sink_partition_write_data_processed_threshold, 
"209715200"); // 200MB
+              "26214400"); // 25MB
 // Minimum data processed to trigger skewed partition rebalancing in exchange 
when partition writing
-DEFINE_mInt64(table_sink_partition_write_skewed_data_processed_rebalance_threshold,
-              "209715200"); // 200MB
+DEFINE_mInt64(table_sink_partition_write_min_data_processed_rebalance_threshold,
+              "26214400"); // 25MB
+// Minimum partition data processed to rebalance writers in exchange when 
partition writing
+DEFINE_mInt64(table_sink_partition_write_min_partition_data_processed_rebalance_threshold,
+              "15728640"); // 15MB
 // Maximum processed partition nums of per writer when partition writing
 DEFINE_mInt32(table_sink_partition_write_max_partition_nums_per_writer, "128");
 
diff --git a/be/src/common/config.h b/be/src/common/config.h
index ae39a6e5eb4..d8c77bf9496 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1258,12 +1258,12 @@ DECLARE_String(tmp_file_dir);
 DECLARE_mString(ca_cert_file_paths);
 
 /** Table sink configurations(currently contains only external table types) **/
-// Minimum data processed to scale writers when non partition writing
+// Minimum data processed to scale writers in exchange when non partition 
writing
 
DECLARE_mInt64(table_sink_non_partition_write_scaling_data_processed_threshold);
-// Minimum data processed to start rebalancing in exchange when partition 
writing
-DECLARE_mInt64(table_sink_partition_write_data_processed_threshold);
 // Minimum data processed to trigger skewed partition rebalancing in exchange 
when partition writing
-DECLARE_mInt64(table_sink_partition_write_skewed_data_processed_rebalance_threshold);
+DECLARE_mInt64(table_sink_partition_write_min_data_processed_rebalance_threshold);
+// Minimum partition data processed to rebalance writers in exchange when 
partition writing
+DECLARE_mInt64(table_sink_partition_write_min_partition_data_processed_rebalance_threshold);
 // Maximum processed partition nums of per writer when partition writing
 DECLARE_mInt32(table_sink_partition_write_max_partition_nums_per_writer);
 
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp 
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index bc55bc8f805..8323e20cfd1 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -257,17 +257,23 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& inf
         _partitioner.reset(
                 new 
vectorized::Crc32HashPartitioner<LocalExchangeChannelIds>(_partition_count));
         _partition_function.reset(new 
HashPartitionFunction(_partitioner.get()));
-        //        const long MEGABYTE = 1024 * 1024;
-        //        const long MIN_PARTITION_DATA_PROCESSED_REBALANCE_THRESHOLD 
= 10000 * MEGABYTE; // 1MB
-        //        const long MIN_DATA_PROCESSED_REBALANCE_THRESHOLD = 50000 * 
MEGABYTE;           // 50MB
 
-        //        const long MIN_PARTITION_DATA_PROCESSED_REBALANCE_THRESHOLD 
= 1; // 1MB
-        //        const long MIN_DATA_PROCESSED_REBALANCE_THRESHOLD = 1;       
    // 50MB
         scale_writer_partitioning_exchanger.reset(new 
vectorized::ScaleWriterPartitioningExchanger<
                                                   HashPartitionFunction>(
                 channels.size(), *_partition_function, _partition_count, 
channels.size(), 1,
-                config::table_sink_partition_write_data_processed_threshold,
-                
config::table_sink_partition_write_skewed_data_processed_rebalance_threshold));
+                
config::table_sink_partition_write_min_partition_data_processed_rebalance_threshold
 /
+                                        state->task_num() ==
+                                0
+                        ? 
config::table_sink_partition_write_min_partition_data_processed_rebalance_threshold
+                        : 
config::table_sink_partition_write_min_partition_data_processed_rebalance_threshold
 /
+                                  state->task_num(),
+                
config::table_sink_partition_write_min_data_processed_rebalance_threshold /
+                                        state->task_num() ==
+                                0
+                        ? 
config::table_sink_partition_write_min_data_processed_rebalance_threshold
+                        : 
config::table_sink_partition_write_min_data_processed_rebalance_threshold /
+                                  state->task_num()));
+
         RETURN_IF_ERROR(_partitioner->init(p._texprs));
         RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc));
         _profile->add_info_string("Partitioner",
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index 61c0bbfbf96..5d49aaf408e 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -591,6 +591,7 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
                 init_runtime_state(task_runtime_state);
                 auto cur_task_id = _total_tasks++;
                 task_runtime_state->set_task_id(cur_task_id);
+                task_runtime_state->set_task_num(pipeline->num_tasks());
                 auto task = std::make_unique<PipelineXTask>(
                         pipeline, cur_task_id, 
get_task_runtime_state(cur_task_id), this,
                         pipeline_id_to_profile[pip_idx].get(), 
get_local_exchange_state(pipeline),
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 07655c71b6c..644db3e32eb 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -619,6 +619,10 @@ public:
 
     int task_id() const { return _task_id; }
 
+    void set_task_num(int task_num) { _task_num = task_num; }
+
+    int task_num() const { return _task_num; }
+
 private:
     Status create_error_log_file();
 
@@ -729,6 +733,7 @@ private:
     std::vector<TErrorTabletInfo> _error_tablet_infos;
     int _max_operator_id = 0;
     int _task_id = -1;
+    int _task_num = 0;
 
     std::vector<THivePartitionUpdate> _hive_partition_updates;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to