This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new d97642e9b5e [cherry-pick](branch-21) fix tablet sink shuffle without
project not match the output tuple (#40299)(#41293) (#41327)
d97642e9b5e is described below
commit d97642e9b5eff17d15b6f9de8f5dcaf948ff07a8
Author: zhangstar333 <[email protected]>
AuthorDate: Tue Oct 15 00:12:23 2024 +0800
[cherry-pick](branch-21) fix tablet sink shuffle without project not match
the output tuple (#40299)(#41293) (#41327)
## Proposed changes
cherry-pick from master (#40299)(#41293)
<!--Describe your changes.-->
---
be/src/pipeline/exec/exchange_sink_operator.cpp | 33 +++++++-
be/src/pipeline/exec/exchange_sink_operator.h | 5 +-
.../glue/translator/PhysicalPlanTranslator.java | 3 +-
.../plans/commands/insert/OlapInsertExecutor.java | 1 +
.../org/apache/doris/planner/DataStreamSink.java | 10 +++
gensrc/thrift/DataSinks.thrift | 1 +
.../data/nereids_p0/insert_into_table/random.out | 9 +++
.../nereids_p0/insert_into_table/random.groovy | 87 ++++++++++++++++++++++
8 files changed, 143 insertions(+), 6 deletions(-)
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index e4150b4f7ac..7584c0b0e45 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -18,6 +18,7 @@
#include "exchange_sink_operator.h"
#include <gen_cpp/DataSinks_types.h>
+#include <gen_cpp/Partitions_types.h>
#include <gen_cpp/Types_types.h>
#include <gen_cpp/types.pb.h>
@@ -249,6 +250,10 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
std::make_unique<vectorized::OlapTabletFinder>(_vpartition.get(),
find_tablet_mode);
_tablet_sink_tuple_desc =
_state->desc_tbl().get_tuple_descriptor(p._tablet_sink_tuple_id);
_tablet_sink_row_desc = p._pool->add(new
RowDescriptor(_tablet_sink_tuple_desc, false));
+ _tablet_sink_expr_ctxs.resize(p._tablet_sink_expr_ctxs.size());
+ for (size_t i = 0; i < _tablet_sink_expr_ctxs.size(); i++) {
+ RETURN_IF_ERROR(p._tablet_sink_expr_ctxs[i]->clone(state,
_tablet_sink_expr_ctxs[i]));
+ }
// if _part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED,
we handle the processing of auto_increment column
// on exchange node rather than on TabletWriter
_block_convertor =
@@ -265,7 +270,7 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
.txn_id = _txn_id,
.pool = p._pool.get(),
.location = _location,
- .vec_output_expr_ctxs = &_fake_expr_ctxs,
+ .vec_output_expr_ctxs = &_tablet_sink_expr_ctxs,
.schema = _schema,
.caller = (void*)this,
.create_partition_callback =
&ExchangeSinkLocalState::empty_callback_function});
@@ -355,7 +360,8 @@ ExchangeSinkOperatorX::ExchangeSinkOperatorX(
_tablet_sink_partition(sink.tablet_sink_partition),
_tablet_sink_location(sink.tablet_sink_location),
_tablet_sink_tuple_id(sink.tablet_sink_tuple_id),
- _tablet_sink_txn_id(sink.tablet_sink_txn_id) {
+ _tablet_sink_txn_id(sink.tablet_sink_txn_id),
+ _t_tablet_sink_exprs(&sink.tablet_sink_exprs) {
DCHECK_GT(destinations.size(), 0);
DCHECK(sink.output_partition.type == TPartitionType::UNPARTITIONED ||
sink.output_partition.type == TPartitionType::HASH_PARTITIONED ||
@@ -367,6 +373,9 @@ ExchangeSinkOperatorX::ExchangeSinkOperatorX(
sink.output_partition.type ==
TPartitionType::TABLE_SINK_RANDOM_PARTITIONED);
_name = "ExchangeSinkOperatorX";
_pool = std::make_shared<ObjectPool>();
+ if (sink.__isset.output_tuple_id) {
+ _output_tuple_id = sink.output_tuple_id;
+ }
}
Status ExchangeSinkOperatorX::init(const TDataSink& tsink) {
@@ -374,6 +383,10 @@ Status ExchangeSinkOperatorX::init(const TDataSink& tsink)
{
if (_part_type == TPartitionType::RANGE_PARTITIONED) {
return Status::InternalError("TPartitionType::RANGE_PARTITIONED should
not be used");
}
+ if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) {
+
RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(*_t_tablet_sink_exprs,
+
_tablet_sink_expr_ctxs));
+ }
return Status::OK();
}
@@ -386,6 +399,18 @@ Status ExchangeSinkOperatorX::prepare(RuntimeState* state)
{
Status ExchangeSinkOperatorX::open(RuntimeState* state) {
DCHECK(state != nullptr);
_compression_type = state->fragement_transmission_compression_type();
+ if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) {
+ if (_output_tuple_id == -1) {
+ RETURN_IF_ERROR(vectorized::VExpr::prepare(_tablet_sink_expr_ctxs,
state,
+ _child_x->row_desc()));
+ } else {
+ auto* output_tuple_desc =
state->desc_tbl().get_tuple_descriptor(_output_tuple_id);
+ auto* output_row_desc = _pool->add(new
RowDescriptor(output_tuple_desc, false));
+ RETURN_IF_ERROR(
+ vectorized::VExpr::prepare(_tablet_sink_expr_ctxs, state,
*output_row_desc));
+ }
+ RETURN_IF_ERROR(vectorized::VExpr::open(_tablet_sink_expr_ctxs,
state));
+ }
return Status::OK();
}
@@ -534,8 +559,10 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state,
vectorized::Block* block
local_state._row_distribution._deal_batched = true;
RETURN_IF_ERROR(local_state._send_new_partition_batch());
}
+ // the convert_block maybe different with block after execute exprs
+ // when send data we still use block
RETURN_IF_ERROR(channel_add_rows_with_idx(state, local_state.channels,
num_channels,
- channel2rows,
convert_block.get(), eos));
+ channel2rows, block, eos));
} else if (_part_type == TPartitionType::TABLE_SINK_HASH_PARTITIONED) {
{
SCOPED_TIMER(local_state._split_block_hash_compute_timer);
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h
b/be/src/pipeline/exec/exchange_sink_operator.h
index 5a7b8bf4201..a94392b906d 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -217,7 +217,7 @@ private:
// for shuffle data by partition and tablet
int64_t _txn_id = -1;
- vectorized::VExprContextSPtrs _fake_expr_ctxs;
+ vectorized::VExprContextSPtrs _tablet_sink_expr_ctxs;
std::unique_ptr<VOlapTablePartitionParam> _vpartition = nullptr;
std::unique_ptr<vectorized::OlapTabletFinder> _tablet_finder = nullptr;
std::shared_ptr<OlapTableSchemaParam> _schema = nullptr;
@@ -273,6 +273,7 @@ private:
const std::vector<TExpr> _texprs;
const RowDescriptor& _row_desc;
+ TTupleId _output_tuple_id = -1;
TPartitionType::type _part_type;
@@ -299,6 +300,8 @@ private:
const TTupleId _tablet_sink_tuple_id;
int64_t _tablet_sink_txn_id = -1;
std::shared_ptr<ObjectPool> _pool;
+ vectorized::VExprContextSPtrs _tablet_sink_expr_ctxs;
+ const std::vector<TExpr>* _t_tablet_sink_exprs = nullptr;
// for external table sink random partition
// Control the number of channels according to the flow, thereby
controlling the number of table sink writers.
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index a3d3a1885f3..c0d1aeb917f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -353,8 +353,7 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
MultiCastDataSink multiCastDataSink = (MultiCastDataSink)
inputFragment.getSink();
DataStreamSink dataStreamSink =
multiCastDataSink.getDataStreamSinks().get(
multiCastDataSink.getDataStreamSinks().size() - 1);
- TupleDescriptor tupleDescriptor =
generateTupleDesc(distribute.getOutput(), null, context);
- exchangeNode.updateTupleIds(tupleDescriptor);
+ exchangeNode.updateTupleIds(dataStreamSink.getOutputTupleDesc());
dataStreamSink.setExchNodeId(exchangeNode.getId());
dataStreamSink.setOutputPartition(dataPartition);
parentFragment.addChild(inputFragment);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
index 8fc6d6cbd14..f522a956899 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
@@ -174,6 +174,7 @@ public class OlapInsertExecutor extends
AbstractInsertExecutor {
.createLocation(database.getId(),
olapTableSink.getDstTable());
dataStreamSink.setTabletSinkLocationParam(locationParams.get(0));
dataStreamSink.setTabletSinkTxnId(olapTableSink.getTxnId());
+ dataStreamSink.setTabletSinkExprs(fragment.getOutputExprs());
}
} catch (Exception e) {
throw new AnalysisException(e.getMessage(), e);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/DataStreamSink.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/DataStreamSink.java
index b9cf516bc3d..ef42190fa25 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/DataStreamSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DataStreamSink.java
@@ -62,6 +62,7 @@ public class DataStreamSink extends DataSink {
protected TOlapTableLocationParam tabletSinkLocationParam = null;
protected TupleDescriptor tabletSinkTupleDesc = null;
protected long tabletSinkTxnId = -1;
+ protected List<Expr> tabletSinkExprs = null;
public DataStreamSink() {
@@ -145,6 +146,10 @@ public class DataStreamSink extends DataSink {
this.tabletSinkLocationParam = locationParam;
}
+ public void setTabletSinkExprs(List<Expr> tabletSinkExprs) {
+ this.tabletSinkExprs = tabletSinkExprs;
+ }
+
public void setTabletSinkTxnId(long txnId) {
this.tabletSinkTxnId = txnId;
}
@@ -224,6 +229,11 @@ public class DataStreamSink extends DataSink {
if (tabletSinkLocationParam != null) {
tStreamSink.setTabletSinkLocation(tabletSinkLocationParam);
}
+ if (tabletSinkExprs != null) {
+ for (Expr expr : tabletSinkExprs) {
+ tStreamSink.addToTabletSinkExprs(expr.treeToThrift());
+ }
+ }
tStreamSink.setTabletSinkTxnId(tabletSinkTxnId);
result.setStreamSink(tStreamSink);
return result;
diff --git a/gensrc/thrift/DataSinks.thrift b/gensrc/thrift/DataSinks.thrift
index 6610de4b688..dfdbbcc0a9f 100644
--- a/gensrc/thrift/DataSinks.thrift
+++ b/gensrc/thrift/DataSinks.thrift
@@ -188,6 +188,7 @@ struct TDataStreamSink {
10: optional Descriptors.TOlapTableLocationParam tablet_sink_location
11: optional i64 tablet_sink_txn_id
12: optional Types.TTupleId tablet_sink_tuple_id
+ 13: optional list<Exprs.TExpr> tablet_sink_exprs
}
struct TMultiCastDataStreamSink {
diff --git a/regression-test/data/nereids_p0/insert_into_table/random.out
b/regression-test/data/nereids_p0/insert_into_table/random.out
index d42426a991f..c774e023267 100644
--- a/regression-test/data/nereids_p0/insert_into_table/random.out
+++ b/regression-test/data/nereids_p0/insert_into_table/random.out
@@ -135,3 +135,12 @@
13 12 20480.0 48640045.000000 10944010779 2012-03-12
2012-03-12T12:11:12 22.634
13 12 20480.0 48640045.000000 10944010779 2012-03-12
2012-03-12T12:11:12 22.634
+-- !sql_select --
+1 11 11
+
+-- !sql_select2 --
+1
+
+-- !sql_select3 --
+601022201389484209 2024-04-09T20:58:49 卖卖 {"is_poi_first_order":0}
+
diff --git a/regression-test/suites/nereids_p0/insert_into_table/random.groovy
b/regression-test/suites/nereids_p0/insert_into_table/random.groovy
index 6cc5cb2b991..9edd855a9a8 100644
--- a/regression-test/suites/nereids_p0/insert_into_table/random.groovy
+++ b/regression-test/suites/nereids_p0/insert_into_table/random.groovy
@@ -43,4 +43,91 @@ suite('nereids_insert_random') {
sql 'set delete_without_partition=true'
sql '''delete from dup_t_type_cast_rd where id is not null'''
sql '''delete from dup_t_type_cast_rd where id is null'''
+
+ sql 'set enable_strict_consistency_dml=true'
+ sql 'drop table if exists tbl_1'
+ sql 'drop table if exists tbl_4'
+ sql """CREATE TABLE tbl_1 (k1 INT, k2 INT) DISTRIBUTED BY HASH(k1) BUCKETS
10 PROPERTIES ( "light_schema_change" = "false", "replication_num" = "1");"""
+ sql """INSERT INTO tbl_1 VALUES (1, 11);"""
+ sql 'sync'
+ sql """CREATE TABLE tbl_4 (k1 INT, k2 INT, v INT SUM) AGGREGATE KEY (k1,
k2) DISTRIBUTED BY HASH(k1) BUCKETS 10 PROPERTIES ( "replication_num" = "1");
"""
+ sql """INSERT INTO tbl_4 SELECT k1, k2, k2 FROM tbl_1;"""
+ sql 'sync'
+ qt_sql_select """ select * from tbl_4; """;
+
+
+ sql 'drop table if exists tbl_5'
+ sql 'drop table if exists tbl_6'
+ sql 'drop table if exists tbl_7'
+
+ sql """
+ CREATE TABLE `tbl_5` (
+ `orderId` varchar(96) NOT NULL,
+ `updated_at` datetime NOT NULL,
+ `userLabel` varchar(255) NULL,
+ `userTag` variant NULL
+ ) ENGINE=OLAP
+ duplicate KEY(`orderId`, `updated_at`)
+ DISTRIBUTED BY HASH(`orderId`) BUCKETS 1
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1"
+ );
+ """
+ sql """
+ CREATE TABLE tbl_6
+ (
+ order_id VARCHAR(96) NOT NULL,
+ updated_at DATETIMEV2 NOT NULL
+ ) ENGINE=OLAP
+ duplicate KEY(`order_id`)
+ DISTRIBUTED BY HASH(`order_id`) BUCKETS 1
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1"
+ );
+ """
+
+ sql """ INSERT INTO `tbl_6` values('601022201389484209', '2024-04-09
20:58:49');"""
+
+ sql """
+ CREATE TABLE tbl_7
+ (
+ orderId VARCHAR(96) NOT NULL,
+ userLabel VARIANT NULL
+ )ENGINE=OLAP
+ UNIQUE KEY(`orderId`)
+ DISTRIBUTED BY HASH(orderId) BUCKETS AUTO
+ PROPERTIES (
+ "replication_num" = "1"
+ );
+ """
+ sql """INSERT INTO `tbl_7`
values('601022201389484209','{\"is_poi_first_order\":0}');"""
+
+ sql 'sync'
+ qt_sql_select2 """ INSERT INTO
+ tbl_5
+ SELECT
+ A.order_id as orderId,
+ A.updated_at,
+ CASE
+ WHEN LOCATE('下单1次', CAST(B.userLabel AS
STRING)) > 0
+ OR LOCATE('买买', CAST(B.userLabel AS STRING)) >
0 then '买买'
+ when B.userLabel ["is_poi_first_order"] = 1
then '买买'
+ else '卖卖'
+ end as userLabel,
+ B.userLabel AS `userTag`
+ FROM
+ (
+ select
+ order_id,updated_at
+ from
+ tbl_6
+ ) AS A
+ LEFT JOIN (
+ select
+ orderId,userLabel
+ from
+ tbl_7
+ ) AS B ON A.order_id = B.orderId; """;
+ qt_sql_select3 """ select * from tbl_5; """;
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]