This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 819ab6fc002d5763040a781b059a45c3429038fd
Author: zhangstar333 <[email protected]>
AuthorDate: Thu Feb 29 20:09:24 2024 +0800

    [feature](sink) support paritition tablet sink shuffle (#30914)
    
    Co-authored-by: morrySnow <[email protected]>
---
 be/src/vec/sink/vdata_stream_sender.cpp            | 102 ++++++++++++++++++++-
 be/src/vec/sink/vdata_stream_sender.h              |  41 ++++++++-
 be/src/vec/sink/vrow_distribution.h                |   3 +
 .../glue/translator/PhysicalPlanTranslator.java    |   3 +
 .../DistributionSpecTabletIdShuffle.java           |  35 +++++++
 .../nereids/properties/PhysicalProperties.java     |   3 +
 .../trees/plans/commands/InsertExecutor.java       |  26 +++++-
 .../plans/commands/InsertIntoTableCommand.java     |  15 ++-
 .../plans/physical/PhysicalOlapTableSink.java      |  21 +----
 .../org/apache/doris/planner/DataPartition.java    |  34 +++----
 .../org/apache/doris/planner/DataStreamSink.java   |  46 ++++++++++
 .../org/apache/doris/planner/OlapTableSink.java    |  22 ++++-
 .../java/org/apache/doris/qe/SessionVariable.java  |   9 ++
 gensrc/thrift/DataSinks.thrift                     |   7 +-
 14 files changed, 310 insertions(+), 57 deletions(-)

diff --git a/be/src/vec/sink/vdata_stream_sender.cpp 
b/be/src/vec/sink/vdata_stream_sender.cpp
index e690c8eb591..a7f1b4b8584 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -23,15 +23,19 @@
 #include <gen_cpp/Metrics_types.h>
 #include <gen_cpp/data.pb.h>
 #include <gen_cpp/internal_service.pb.h>
+#include <glog/logging.h>
 #include <stddef.h>
 
 #include <algorithm>
+#include <cstdint>
 #include <functional>
 #include <map>
+#include <memory>
 #include <random>
 
 #include "common/object_pool.h"
 #include "common/status.h"
+#include "exec/tablet_info.h"
 #include "pipeline/exec/exchange_sink_operator.h"
 #include "pipeline/exec/result_file_sink_operator.h"
 #include "runtime/descriptors.h"
@@ -41,10 +45,13 @@
 #include "runtime/types.h"
 #include "util/proto_util.h"
 #include "vec/columns/column_const.h"
+#include "vec/columns/columns_number.h"
 #include "vec/common/sip_hash.h"
 #include "vec/exprs/vexpr.h"
 #include "vec/runtime/vdata_stream_mgr.h"
 #include "vec/runtime/vdata_stream_recvr.h"
+#include "vec/sink/vrow_distribution.h"
+#include "vec/sink/writer/vtablet_writer_v2.h"
 
 namespace doris::vectorized {
 
@@ -329,6 +336,7 @@ VDataStreamSender::VDataStreamSender(RuntimeState* state, 
ObjectPool* pool, int
            sink.output_partition.type == TPartitionType::HASH_PARTITIONED ||
            sink.output_partition.type == TPartitionType::RANDOM ||
            sink.output_partition.type == TPartitionType::RANGE_PARTITIONED ||
+           sink.output_partition.type == 
TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED ||
            sink.output_partition.type == 
TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED);
 
     std::map<int64_t, int64_t> fragment_id_to_channel_index;
@@ -412,6 +420,34 @@ Status VDataStreamSender::init(const TDataSink& tsink) {
         
RETURN_IF_ERROR(_partitioner->init(t_stream_sink.output_partition.partition_exprs));
     } else if (_part_type == TPartitionType::RANGE_PARTITIONED) {
         return Status::InternalError("TPartitionType::RANGE_PARTITIONED should 
not be used");
+    } else if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) {
+        _txn_id = t_stream_sink.tablet_sink_txn_id;
+        _schema = std::make_shared<OlapTableSchemaParam>();
+        RETURN_IF_ERROR(_schema->init(t_stream_sink.tablet_sink_schema));
+        _vpartition = std::make_unique<VOlapTablePartitionParam>(
+                _schema, t_stream_sink.tablet_sink_partition);
+        RETURN_IF_ERROR(_vpartition->init());
+        auto find_tablet_mode = 
OlapTabletFinder::FindTabletMode::FIND_TABLET_EVERY_ROW;
+        _tablet_finder = std::make_unique<OlapTabletFinder>(_vpartition.get(), 
find_tablet_mode);
+        _tablet_sink_tuple_desc =
+                
_state->desc_tbl().get_tuple_descriptor(t_stream_sink.tablet_sink_tuple_id);
+        _tablet_sink_row_desc = _pool->add(new 
RowDescriptor(_tablet_sink_tuple_desc, false));
+        //_block_convertor no need init_autoinc_info here
+        _block_convertor =
+                
std::make_unique<vectorized::OlapTableBlockConvertor>(_tablet_sink_tuple_desc);
+        _location = _pool->add(new 
OlapTableLocationParam(t_stream_sink.tablet_sink_location));
+        _row_distribution.init({.state = _state,
+                                .block_convertor = _block_convertor.get(),
+                                .tablet_finder = _tablet_finder.get(),
+                                .vpartition = _vpartition.get(),
+                                .add_partition_request_timer = 
_add_partition_request_timer,
+                                .txn_id = _txn_id,
+                                .pool = _pool,
+                                .location = _location,
+                                .vec_output_expr_ctxs = &_fake_expr_ctxs,
+                                .schema = _schema,
+                                .caller = (void*)this,
+                                .create_partition_callback = 
&empty_callback_function});
     } else {
         // UNPARTITIONED
     }
@@ -488,6 +524,8 @@ Status VDataStreamSender::open(RuntimeState* state) {
     if (_part_type == TPartitionType::HASH_PARTITIONED ||
         _part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) {
         RETURN_IF_ERROR(_partitioner->open(state));
+    } else if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) {
+        RETURN_IF_ERROR(_row_distribution.open(_tablet_sink_row_desc));
     }
 
     _compression_type = state->fragement_transmission_compression_type();
@@ -502,6 +540,25 @@ void VDataStreamSender::_handle_eof_channel(RuntimeState* 
state, ChannelPtrType
     static_cast<void>(channel->close(state, Status::OK()));
 }
 
+Status VDataStreamSender::_send_new_partition_batch() {
+    if (_row_distribution.need_deal_batching()) { // maybe try_close more than 
1 time
+        RETURN_IF_ERROR(_row_distribution.automatic_create_partition());
+        Block tmp_block = _row_distribution._batching_block->to_block(); // 
Borrow out, for lval ref
+
+        // these order is only.
+        //  1. clear batching stats(and flag goes true) so that we won't make 
a new batching process in dealing batched block.
+        //  2. deal batched block
+        //  3. now reuse the column of lval block. cuz write doesn't real 
adjust it. it generate a new block from that.
+        _row_distribution.clear_batching_stats();
+        RETURN_IF_ERROR(this->send(_state, &tmp_block, false));
+        // Recovery back
+        
_row_distribution._batching_block->set_mutable_columns(tmp_block.mutate_columns());
+        _row_distribution._batching_block->clear_column_data();
+        _row_distribution._deal_batched = false;
+    }
+    return Status::OK();
+}
+
 Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) {
     SCOPED_TIMER(_profile->total_time_counter());
     SCOPED_TIMER(_exec_timer);
@@ -627,6 +684,38 @@ Status VDataStreamSender::send(RuntimeState* state, Block* 
block, bool eos) {
                                              
(uint32_t*)_partitioner->get_channel_ids(), rows,
                                              block, _enable_pipeline_exec ? 
eos : false));
         }
+    } else if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) {
+        // check out of limit
+        RETURN_IF_ERROR(_send_new_partition_batch());
+        if (UNLIKELY(block->rows() == 0)) {
+            return Status::OK();
+        }
+        std::shared_ptr<vectorized::Block> convert_block;
+        bool has_filtered_rows = false;
+        int64_t filtered_rows = 0;
+        _number_input_rows += block->rows();
+        RETURN_IF_ERROR(_row_distribution.generate_rows_distribution(
+                *block, convert_block, filtered_rows, has_filtered_rows, 
_row_part_tablet_ids,
+                _number_input_rows));
+
+        const auto& row_ids = _row_part_tablet_ids[0].row_ids;
+        const auto& tablet_ids = _row_part_tablet_ids[0].tablet_ids;
+        const auto& num_channels = _channels.size();
+        std::vector<std::vector<uint32>> channel2rows;
+        channel2rows.resize(num_channels);
+        for (int idx = 0; idx < row_ids.size(); ++idx) {
+            const auto& row = row_ids[idx];
+            const auto& tablet_id = tablet_ids[idx];
+            channel2rows[tablet_id % num_channels].emplace_back(row);
+        }
+
+        RETURN_IF_ERROR(channel_add_rows_with_idx(state, _channels, 
num_channels, channel2rows,
+                                                  convert_block.get(),
+                                                  _enable_pipeline_exec ? eos 
: false));
+        if (eos) {
+            _row_distribution._deal_batched = true;
+            RETURN_IF_ERROR(_send_new_partition_batch());
+        }
     } else {
         // Range partition
         // 1. calculate range
@@ -662,6 +751,12 @@ Status VDataStreamSender::close(RuntimeState* state, 
Status exec_status) {
         {
             // send last block
             SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
+            // non pipeline engin not pass eos in send function, and maybe 
have create partition at last block
+            // so at here to check again
+            if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) 
{
+                _row_distribution._deal_batched = true;
+                RETURN_IF_ERROR(_send_new_partition_batch());
+            }
             if (_serializer.get_block() && _serializer.get_block()->rows() > 
0) {
                 Block block = _serializer.get_block()->to_block();
                 RETURN_IF_ERROR(
@@ -694,7 +789,12 @@ Status VDataStreamSender::close(RuntimeState* state, 
Status exec_status) {
             }
         }
     }
-
+    if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) {
+        
_state->update_num_rows_load_filtered(_block_convertor->num_filtered_rows() +
+                                              
_tablet_finder->num_filtered_rows());
+        _state->update_num_rows_load_unselected(
+                _tablet_finder->num_immutable_partition_filtered_rows());
+    }
     if (_peak_memory_usage_counter) {
         _peak_memory_usage_counter->set(_mem_tracker->peak_consumption());
     }
diff --git a/be/src/vec/sink/vdata_stream_sender.h 
b/be/src/vec/sink/vdata_stream_sender.h
index 2834c7e2aa5..88a948ed05c 100644
--- a/be/src/vec/sink/vdata_stream_sender.h
+++ b/be/src/vec/sink/vdata_stream_sender.h
@@ -29,6 +29,7 @@
 
 #include <atomic>
 #include <cstddef>
+#include <cstdint>
 #include <memory>
 #include <ostream>
 #include <string>
@@ -39,6 +40,7 @@
 #include "common/logging.h"
 #include "common/status.h"
 #include "exec/data_sink.h"
+#include "exec/tablet_info.h"
 #include "pipeline/exec/exchange_sink_buffer.h"
 #include "service/backend_options.h"
 #include "util/ref_count_closure.h"
@@ -48,6 +50,8 @@
 #include "vec/exprs/vexpr_context.h"
 #include "vec/runtime/partitioner.h"
 #include "vec/runtime/vdata_stream_recvr.h"
+#include "vec/sink/vrow_distribution.h"
+#include "vec/sink/vtablet_finder.h"
 
 namespace doris {
 class ObjectPool;
@@ -160,10 +164,18 @@ protected:
     Status channel_add_rows(RuntimeState* state, Channels& channels, int 
num_channels,
                             const HashValueType* __restrict channel_ids, int 
rows, Block* block,
                             bool eos);
+    template <typename Channels>
+    Status channel_add_rows_with_idx(RuntimeState* state, Channels& channels, 
int num_channels,
+                                     std::vector<std::vector<uint32_t>>& 
channel2rows, Block* block,
+                                     bool eos);
 
     template <typename ChannelPtrType>
     void _handle_eof_channel(RuntimeState* state, ChannelPtrType channel, 
Status st);
 
+    static Status empty_callback_function(void* sender, 
TCreatePartitionResult* result) {
+        return Status::OK();
+    }
+    Status _send_new_partition_batch();
     // Sender instance id, unique within a fragment.
     int _sender_id;
 
@@ -221,6 +233,22 @@ protected:
     bool _enable_pipeline_exec = false;
 
     BlockSerializer<VDataStreamSender> _serializer;
+
+    // for shuffle data by partition and tablet
+    VRowDistribution _row_distribution;
+    RuntimeProfile::Counter* _add_partition_request_timer = nullptr;
+    int64_t _txn_id = -1;
+    RowDescriptor* _tablet_sink_row_desc = nullptr;
+    TupleDescriptor* _tablet_sink_tuple_desc = nullptr;
+    OlapTableLocationParam* _location = nullptr;
+    int64_t _number_input_rows = 0;
+    // reuse to avoid frequent memory allocation and release.
+    std::vector<RowPartTabletIds> _row_part_tablet_ids;
+    vectorized::VExprContextSPtrs _fake_expr_ctxs;
+    std::unique_ptr<VOlapTablePartitionParam> _vpartition = nullptr;
+    std::unique_ptr<OlapTabletFinder> _tablet_finder = nullptr;
+    std::shared_ptr<OlapTableSchemaParam> _schema = nullptr;
+    std::unique_ptr<vectorized::OlapTableBlockConvertor> _block_convertor = 
nullptr;
 };
 
 template <typename Parent = VDataStreamSender>
@@ -402,12 +430,20 @@ Status VDataStreamSender::channel_add_rows(RuntimeState* 
state, Channels& channe
                                            int num_channels,
                                            const HashValueType* __restrict 
channel_ids, int rows,
                                            Block* block, bool eos) {
-    std::vector<uint32_t> channel2rows[num_channels];
-
+    std::vector<std::vector<uint32_t>> channel2rows;
+    channel2rows.resize(num_channels);
     for (uint32_t i = 0; i < rows; i++) {
         channel2rows[channel_ids[i]].emplace_back(i);
     }
+    RETURN_IF_ERROR(
+            channel_add_rows_with_idx(state, channels, num_channels, 
channel2rows, block, eos));
+    return Status::OK();
+}
 
+template <typename Channels>
+Status VDataStreamSender::channel_add_rows_with_idx(
+        RuntimeState* state, Channels& channels, int num_channels,
+        std::vector<std::vector<uint32_t>>& channel2rows, Block* block, bool 
eos) {
     Status status;
     for (int i = 0; i < num_channels; ++i) {
         if (!channels[i]->is_receiver_eof() && !channel2rows[i].empty()) {
@@ -424,7 +460,6 @@ Status VDataStreamSender::channel_add_rows(RuntimeState* 
state, Channels& channe
             }
         }
     }
-
     return Status::OK();
 }
 
diff --git a/be/src/vec/sink/vrow_distribution.h 
b/be/src/vec/sink/vrow_distribution.h
index 54cd6f42ce6..12acda73ed2 100644
--- a/be/src/vec/sink/vrow_distribution.h
+++ b/be/src/vec/sink/vrow_distribution.h
@@ -161,8 +161,11 @@ private:
 
     // for auto partitions
     std::vector<std::vector<TStringLiteral>> _partitions_need_create;
+
+public:
     std::unique_ptr<MutableBlock> _batching_block;
     bool _deal_batched = false; // If true, send batched block before any 
block's append.
+private:
     size_t _batching_rows = 0, _batching_bytes = 0;
 
     OlapTableBlockConvertor* _block_convertor = nullptr;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index c9cb534c9c2..c64965080fb 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -70,6 +70,7 @@ import 
org.apache.doris.nereids.properties.DistributionSpecHash;
 import org.apache.doris.nereids.properties.DistributionSpecReplicated;
 import org.apache.doris.nereids.properties.DistributionSpecStorageAny;
 import org.apache.doris.nereids.properties.DistributionSpecStorageGather;
+import org.apache.doris.nereids.properties.DistributionSpecTabletIdShuffle;
 import org.apache.doris.nereids.properties.OrderKey;
 import org.apache.doris.nereids.properties.PhysicalProperties;
 import 
org.apache.doris.nereids.rules.implementation.LogicalWindowToPhysicalWindow.WindowFrameGroup;
@@ -2458,6 +2459,8 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
                             + distributionSpecHash.getShuffleType());
             }
             return new DataPartition(partitionType, partitionExprs);
+        } else if (distributionSpec instanceof 
DistributionSpecTabletIdShuffle) {
+            return DataPartition.TABLET_ID;
         } else {
             throw new RuntimeException("Unknown DistributionSpec: " + 
distributionSpec);
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecTabletIdShuffle.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecTabletIdShuffle.java
new file mode 100644
index 00000000000..17a84401c5b
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecTabletIdShuffle.java
@@ -0,0 +1,35 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.properties;
+
+/**
+ * use for shuffle data by tablet-id before sink.
+ */
+public class DistributionSpecTabletIdShuffle extends DistributionSpec {
+
+    public static final DistributionSpecTabletIdShuffle INSTANCE = new 
DistributionSpecTabletIdShuffle();
+
+    private DistributionSpecTabletIdShuffle() {
+        super();
+    }
+
+    @Override
+    public boolean satisfy(DistributionSpec other) {
+        return other instanceof DistributionSpecTabletIdShuffle;
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java
index 4fa32dbfb69..cc5d7db6a08 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java
@@ -46,6 +46,9 @@ public class PhysicalProperties {
 
     public static PhysicalProperties MUST_SHUFFLE = new 
PhysicalProperties(DistributionSpecMustShuffle.INSTANCE);
 
+    public static PhysicalProperties TABLET_ID_SHUFFLE
+            = new PhysicalProperties(DistributionSpecTabletIdShuffle.INSTANCE);
+
     private final OrderSpec orderSpec;
 
     private final DistributionSpec distributionSpec;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertExecutor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertExecutor.java
index 45471429bba..2c5ee559f3d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertExecutor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertExecutor.java
@@ -60,7 +60,10 @@ import org.apache.doris.nereids.types.DataType;
 import org.apache.doris.nereids.util.RelationUtil;
 import org.apache.doris.nereids.util.TypeCoercionUtils;
 import org.apache.doris.planner.DataSink;
+import org.apache.doris.planner.DataStreamSink;
+import org.apache.doris.planner.ExchangeNode;
 import org.apache.doris.planner.OlapTableSink;
+import org.apache.doris.planner.PlanFragment;
 import org.apache.doris.proto.InternalService;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.Coordinator;
@@ -74,6 +77,8 @@ import org.apache.doris.task.LoadEtlTask;
 import org.apache.doris.thrift.TFileFormatType;
 import org.apache.doris.thrift.TFileType;
 import org.apache.doris.thrift.TMergeType;
+import org.apache.doris.thrift.TOlapTableLocationParam;
+import org.apache.doris.thrift.TPartitionType;
 import org.apache.doris.thrift.TQueryType;
 import org.apache.doris.thrift.TStreamLoadPutRequest;
 import org.apache.doris.thrift.TTxnParams;
@@ -153,8 +158,8 @@ public class InsertExecutor {
     /**
      * finalize sink to complete enough info for sink execution
      */
-    public void finalizeSink(DataSink sink, boolean isPartialUpdate, boolean 
isFromInsert,
-            boolean allowAutoPartition) {
+    public void finalizeSink(PlanFragment fragment, DataSink sink,
+            boolean isPartialUpdate, boolean isFromInsert, boolean 
allowAutoPartition) {
         if (!(sink instanceof OlapTableSink)) {
             return;
         }
@@ -175,6 +180,23 @@ public class InsertExecutor {
             if (!allowAutoPartition) {
                 olapTableSink.setAutoPartition(false);
             }
+            // update
+
+            // set schema and partition info for tablet id shuffle exchange
+            if (fragment.getPlanRoot() instanceof ExchangeNode
+                    && fragment.getDataPartition().getType() == 
TPartitionType.TABLET_SINK_SHUFFLE_PARTITIONED) {
+                DataStreamSink dataStreamSink = (DataStreamSink) 
(fragment.getChild(0).getSink());
+                Analyzer analyzer = new Analyzer(Env.getCurrentEnv(), 
ConnectContext.get());
+                
dataStreamSink.setTabletSinkSchemaParam(olapTableSink.createSchema(
+                        database.getId(), olapTableSink.getDstTable(), 
analyzer));
+                
dataStreamSink.setTabletSinkPartitionParam(olapTableSink.createPartition(
+                        database.getId(), olapTableSink.getDstTable(), 
analyzer));
+                
dataStreamSink.setTabletSinkTupleDesc(olapTableSink.getTupleDescriptor());
+                List<TOlapTableLocationParam> locationParams = olapTableSink
+                        .createLocation(olapTableSink.getDstTable());
+                
dataStreamSink.setTabletSinkLocationParam(locationParams.get(0));
+                dataStreamSink.setTabletSinkTxnId(olapTableSink.getTxnId());
+            }
         } catch (Exception e) {
             throw new AnalysisException(e.getMessage(), e);
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java
index d5101aabcda..166219dfae1 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java
@@ -37,9 +37,12 @@ import org.apache.doris.nereids.trees.expressions.Slot;
 import org.apache.doris.nereids.trees.plans.Explainable;
 import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.PlanType;
+import org.apache.doris.nereids.trees.plans.algebra.OneRowRelation;
 import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType;
 import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalDistribute;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalUnion;
 import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
 import org.apache.doris.planner.DataSink;
 import org.apache.doris.planner.GroupCommitPlanner;
@@ -183,7 +186,7 @@ public class InsertIntoTableCommand extends Command 
implements ForwardWithSync,
                     physicalOlapTableSink.getDatabase(),
                     physicalOlapTableSink.getTargetTable(), label, planner);
             insertExecutor.beginTransaction();
-            insertExecutor.finalizeSink(sink, 
physicalOlapTableSink.isPartialUpdate(),
+            insertExecutor.finalizeSink(planner.getFragments().get(0), sink, 
physicalOlapTableSink.isPartialUpdate(),
                     physicalOlapTableSink.getDmlCommandType() == 
DMLCommandType.INSERT, this.allowAutoPartition);
         } finally {
             targetTableIf.readUnlock();
@@ -254,12 +257,20 @@ public class InsertIntoTableCommand extends Command 
implements ForwardWithSync,
         }
         OlapTable targetTable = physicalOlapTableSink.getTargetTable();
         return ctx.getSessionVariable().getSqlMode() != 
SqlModeHelper.MODE_NO_BACKSLASH_ESCAPES
-                && !ctx.isTxnModel() && sink.getFragment().getPlanRoot() 
instanceof UnionNode
+                && !ctx.isTxnModel() && 
isGroupCommitAvailablePlan(physicalOlapTableSink)
                 && physicalOlapTableSink.getPartitionIds().isEmpty() && 
targetTable.getTableProperty()
                 .getUseSchemaLightChange() && !targetTable.getQualifiedDbName()
                 .equalsIgnoreCase(FeConstants.INTERNAL_DB_NAME);
     }
 
+    private boolean isGroupCommitAvailablePlan(PhysicalOlapTableSink<? extends 
Plan> sink) {
+        Plan child = sink.child();
+        if (child instanceof PhysicalDistribute) {
+            child = child.child(0);
+        }
+        return child instanceof OneRowRelation || (child instanceof 
PhysicalUnion && child.arity() == 0);
+    }
+
     @Override
     public Plan getExplainPlan(ConnectContext ctx) {
         if (!ctx.getSessionVariable().isEnableNereidsDML()) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapTableSink.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapTableSink.java
index 11a85ed100f..86a6a4b8c61 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapTableSink.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapTableSink.java
@@ -25,7 +25,6 @@ import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.RandomDistributionInfo;
 import org.apache.doris.nereids.exceptions.AnalysisException;
 import org.apache.doris.nereids.memo.GroupExpression;
-import org.apache.doris.nereids.properties.DistributionSpecHash.ShuffleType;
 import org.apache.doris.nereids.properties.LogicalProperties;
 import org.apache.doris.nereids.properties.PhysicalProperties;
 import org.apache.doris.nereids.trees.expressions.Expression;
@@ -42,13 +41,11 @@ import org.apache.doris.statistics.Statistics;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
 
 import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
-import java.util.stream.Collectors;
 
 /**
  * physical olap table sink for insert command
@@ -218,23 +215,7 @@ public class PhysicalOlapTableSink<CHILD_TYPE extends 
Plan> extends PhysicalSink
         if (targetTable.isPartitionDistributed()) {
             DistributionInfo distributionInfo = 
targetTable.getDefaultDistributionInfo();
             if (distributionInfo instanceof HashDistributionInfo) {
-                HashDistributionInfo hashDistributionInfo
-                        = ((HashDistributionInfo) 
targetTable.getDefaultDistributionInfo());
-                List<Column> distributedColumns = 
hashDistributionInfo.getDistributionColumns();
-                List<Integer> columnIndexes = Lists.newArrayList();
-                int idx = 0;
-                for (int i = 0; i < targetTable.getFullSchema().size(); ++i) {
-                    if 
(targetTable.getFullSchema().get(i).equals(distributedColumns.get(idx))) {
-                        columnIndexes.add(i);
-                        idx++;
-                        if (idx == distributedColumns.size()) {
-                            break;
-                        }
-                    }
-                }
-                return PhysicalProperties.createHash(columnIndexes.stream()
-                        .map(colIdx -> 
child().getOutput().get(colIdx).getExprId())
-                        .collect(Collectors.toList()), ShuffleType.NATURAL);
+                return PhysicalProperties.TABLET_ID_SHUFFLE;
             } else if (distributionInfo instanceof RandomDistributionInfo) {
                 return PhysicalProperties.ANY;
             } else {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java
index 75352086940..9c5c375a35c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java
@@ -32,8 +32,6 @@ import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
 
 import java.util.List;
 
@@ -46,16 +44,14 @@ import java.util.List;
  * TODO: better name? just Partitioning?
  */
 public class DataPartition {
-    private static final Logger LOG = 
LogManager.getLogger(DataPartition.class);
 
     public static final DataPartition UNPARTITIONED = new 
DataPartition(TPartitionType.UNPARTITIONED);
-
     public static final DataPartition RANDOM = new 
DataPartition(TPartitionType.RANDOM);
+    public static final DataPartition TABLET_ID = new 
DataPartition(TPartitionType.TABLET_SINK_SHUFFLE_PARTITIONED);
 
     private final TPartitionType type;
-
     // for hash partition: exprs used to compute hash value
-    private ImmutableList<Expr> partitionExprs = ImmutableList.of();
+    private ImmutableList<Expr> partitionExprs;
 
     public DataPartition(TPartitionType type, List<Expr> exprs) {
         Preconditions.checkNotNull(exprs);
@@ -67,13 +63,10 @@ public class DataPartition {
         this.partitionExprs = ImmutableList.copyOf(exprs);
     }
 
-    public void substitute(ExprSubstitutionMap smap, Analyzer analyzer) throws 
AnalysisException {
-        List<Expr> list = Expr.trySubstituteList(partitionExprs, smap, 
analyzer, false);
-        partitionExprs = ImmutableList.copyOf(list);
-    }
-
     public DataPartition(TPartitionType type) {
-        Preconditions.checkState(type == TPartitionType.UNPARTITIONED || type 
== TPartitionType.RANDOM);
+        Preconditions.checkState(type == TPartitionType.UNPARTITIONED
+                || type == TPartitionType.RANDOM
+                || type == TPartitionType.TABLET_SINK_SHUFFLE_PARTITIONED);
         this.type = type;
         this.partitionExprs = ImmutableList.of();
     }
@@ -82,6 +75,11 @@ public class DataPartition {
         return new DataPartition(TPartitionType.HASH_PARTITIONED, exprs);
     }
 
+    public void substitute(ExprSubstitutionMap smap, Analyzer analyzer) throws 
AnalysisException {
+        List<Expr> list = Expr.trySubstituteList(partitionExprs, smap, 
analyzer, false);
+        partitionExprs = ImmutableList.copyOf(list);
+    }
+
     public boolean isPartitioned() {
         return type != TPartitionType.UNPARTITIONED;
     }
@@ -106,16 +104,6 @@ public class DataPartition {
         return result;
     }
 
-    /**
-     * Returns true if 'this' is a partition that is compatible with the
-     * requirements of 's'.
-     * TODO: specify more clearly and implement
-     */
-    public boolean isCompatible(DataPartition s) {
-        // TODO: implement
-        return true;
-    }
-
     public String getExplainString(TExplainLevel explainLevel) {
         StringBuilder str = new StringBuilder();
         str.append(type.toString());
@@ -127,7 +115,7 @@ public class DataPartition {
             for (Expr expr : partitionExprs) {
                 strings.add(expr.toSql());
             }
-            str.append(": " + Joiner.on(", ").join(strings));
+            str.append(": ").append(Joiner.on(", ").join(strings));
         }
         str.append("\n");
         return str.toString();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/DataStreamSink.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/DataStreamSink.java
index 4d4a2c641a9..b9cf516bc3d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/DataStreamSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DataStreamSink.java
@@ -27,8 +27,12 @@ import org.apache.doris.thrift.TDataSink;
 import org.apache.doris.thrift.TDataSinkType;
 import org.apache.doris.thrift.TDataStreamSink;
 import org.apache.doris.thrift.TExplainLevel;
+import org.apache.doris.thrift.TOlapTableLocationParam;
+import org.apache.doris.thrift.TOlapTablePartitionParam;
+import org.apache.doris.thrift.TOlapTableSchemaParam;
 
 import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import org.springframework.util.CollectionUtils;
 
@@ -52,6 +56,13 @@ public class DataStreamSink extends DataSink {
 
     protected List<RuntimeFilter> runtimeFilters = Lists.newArrayList();
 
+    // use for tablet id shuffle sink only
+    protected TOlapTableSchemaParam tabletSinkSchemaParam = null;
+    protected TOlapTablePartitionParam tabletSinkPartitionParam = null;
+    protected TOlapTableLocationParam tabletSinkLocationParam = null;
+    protected TupleDescriptor tabletSinkTupleDesc = null;
+    protected long tabletSinkTxnId = -1;
+
     public DataStreamSink() {
 
     }
@@ -118,6 +129,26 @@ public class DataStreamSink extends DataSink {
         this.runtimeFilters.add(runtimeFilter);
     }
 
+    public void setTabletSinkSchemaParam(TOlapTableSchemaParam schemaParam) {
+        this.tabletSinkSchemaParam = schemaParam;
+    }
+
+    public void setTabletSinkPartitionParam(TOlapTablePartitionParam 
partitionParam) {
+        this.tabletSinkPartitionParam = partitionParam;
+    }
+
+    public void setTabletSinkTupleDesc(TupleDescriptor tupleDesc) {
+        this.tabletSinkTupleDesc = tupleDesc;
+    }
+
+    public void setTabletSinkLocationParam(TOlapTableLocationParam 
locationParam) {
+        this.tabletSinkLocationParam = locationParam;
+    }
+
+    public void setTabletSinkTxnId(long txnId) {
+        this.tabletSinkTxnId = txnId;
+    }
+
     @Override
     public String getExplainString(String prefix, TExplainLevel explainLevel) {
         StringBuilder strBuilder = new StringBuilder();
@@ -179,6 +210,21 @@ public class DataStreamSink extends DataSink {
                 tStreamSink.addToRuntimeFilters(rf.toThrift());
             }
         }
+        Preconditions.checkState((tabletSinkSchemaParam != null) == 
(tabletSinkPartitionParam != null),
+                "schemaParam and partitionParam should be set together.");
+        if (tabletSinkSchemaParam != null) {
+            tStreamSink.setTabletSinkSchema(tabletSinkSchemaParam);
+        }
+        if (tabletSinkPartitionParam != null) {
+            tStreamSink.setTabletSinkPartition(tabletSinkPartitionParam);
+        }
+        if (tabletSinkTupleDesc != null) {
+            
tStreamSink.setTabletSinkTupleId(tabletSinkTupleDesc.getId().asInt());
+        }
+        if (tabletSinkLocationParam != null) {
+            tStreamSink.setTabletSinkLocation(tabletSinkLocationParam);
+        }
+        tStreamSink.setTabletSinkTxnId(tabletSinkTxnId);
         result.setStreamSink(tStreamSink);
         return result;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
index 485fde67fb1..924b64f87cd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
@@ -110,6 +110,7 @@ public class OlapTableSink extends DataSink {
     private boolean singleReplicaLoad;
 
     private boolean isStrictMode = false;
+    private long txnId = -1;
 
     public OlapTableSink(OlapTable dstTable, TupleDescriptor tupleDescriptor, 
List<Long> partitionIds,
             boolean singleReplicaLoad) {
@@ -129,6 +130,7 @@ public class OlapTableSink extends DataSink {
         tSink.setLoadChannelTimeoutS(loadChannelTimeoutS);
         tSink.setSendBatchParallelism(sendBatchParallelism);
         this.isStrictMode = isStrictMode;
+        this.txnId = txnId;
         if (loadToSingleTablet && !(dstTable.getDefaultDistributionInfo() 
instanceof RandomDistributionInfo)) {
             throw new AnalysisException(
                     "if load_to_single_tablet set to true," + " the olap table 
must be with random distribution");
@@ -237,7 +239,7 @@ public class OlapTableSink extends DataSink {
         return tDataSink;
     }
 
-    private TOlapTableSchemaParam createSchema(long dbId, OlapTable table, 
Analyzer analyzer) throws AnalysisException {
+    public TOlapTableSchemaParam createSchema(long dbId, OlapTable table, 
Analyzer analyzer) throws AnalysisException {
         TOlapTableSchemaParam schemaParam = new TOlapTableSchemaParam();
         schemaParam.setDbId(dbId);
         schemaParam.setTableId(table.getId());
@@ -321,7 +323,7 @@ public class OlapTableSink extends DataSink {
         return distColumns;
     }
 
-    private TOlapTablePartitionParam createPartition(long dbId, OlapTable 
table, Analyzer analyzer)
+    public TOlapTablePartitionParam createPartition(long dbId, OlapTable 
table, Analyzer analyzer)
             throws UserException {
         TOlapTablePartitionParam partitionParam = new 
TOlapTablePartitionParam();
         partitionParam.setDbId(dbId);
@@ -479,7 +481,7 @@ public class OlapTableSink extends DataSink {
         }
     }
 
-    private List<TOlapTableLocationParam> createLocation(OlapTable table) 
throws UserException {
+    public List<TOlapTableLocationParam> createLocation(OlapTable table) 
throws UserException {
         TOlapTableLocationParam locationParam = new TOlapTableLocationParam();
         TOlapTableLocationParam slaveLocationParam = new 
TOlapTableLocationParam();
         // BE id -> path hash
@@ -571,7 +573,7 @@ public class OlapTableSink extends DataSink {
         bePathsMap.putAll(result);
     }
 
-    private TPaloNodesInfo createPaloNodesInfo() {
+    public TPaloNodesInfo createPaloNodesInfo() {
         TPaloNodesInfo nodesInfo = new TPaloNodesInfo();
         SystemInfoService systemInfoService = Env.getCurrentSystemInfo();
         for (Long id : systemInfoService.getAllBackendIds(false)) {
@@ -584,4 +586,16 @@ public class OlapTableSink extends DataSink {
     protected TDataSinkType getDataSinkType() {
         return TDataSinkType.OLAP_TABLE_SINK;
     }
+
+    public OlapTable getDstTable() {
+        return dstTable;
+    }
+
+    public TupleDescriptor getTupleDescriptor() {
+        return tupleDescriptor;
+    }
+
+    public long getTxnId() {
+        return txnId;
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index a5bc477c3f6..e561ded693a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -3206,6 +3206,15 @@ public class SessionVariable implements Serializable, 
Writable {
         this.dumpNereidsMemo = dumpNereidsMemo;
     }
 
+    public void disableStrictConsistencyDmlOnce() throws DdlException {
+        if (!enableStrictConsistencyDml) {
+            return;
+        }
+        setIsSingleSetVar(true);
+        VariableMgr.setVar(this,
+                new SetVar(SessionVariable.ENABLE_STRICT_CONSISTENCY_DML, new 
StringLiteral("false")));
+    }
+
     public void enableFallbackToOriginalPlannerOnce() throws DdlException {
         if (enableFallbackToOriginalPlanner) {
             return;
diff --git a/gensrc/thrift/DataSinks.thrift b/gensrc/thrift/DataSinks.thrift
index 068b5927004..602943b4207 100644
--- a/gensrc/thrift/DataSinks.thrift
+++ b/gensrc/thrift/DataSinks.thrift
@@ -172,8 +172,11 @@ struct TDataStreamSink {
   7: optional list<PlanNodes.TRuntimeFilterDesc> runtime_filters
 
   // used for partition_type = TABLET_SINK_SHUFFLE_PARTITIONED
-  8: optional Descriptors.TOlapTableSchemaParam schema
-  9: optional Descriptors.TOlapTablePartitionParam partition
+  8: optional Descriptors.TOlapTableSchemaParam tablet_sink_schema
+  9: optional Descriptors.TOlapTablePartitionParam tablet_sink_partition
+  10: optional Descriptors.TOlapTableLocationParam tablet_sink_location
+  11: optional i64 tablet_sink_txn_id
+  12: optional Types.TTupleId tablet_sink_tuple_id
 }
 
 struct TMultiCastDataStreamSink {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to