This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new de62c00f4e4 [fix](move-memtable) init auto partition context in 
VRowDistribution::open (#26911)
de62c00f4e4 is described below

commit de62c00f4e4414c1f83ecd2db9b216f1bfc52100
Author: Kaijie Chen <[email protected]>
AuthorDate: Tue Nov 14 08:16:14 2023 +0800

    [fix](move-memtable) init auto partition context in VRowDistribution::open 
(#26911)
---
 be/src/vec/sink/vrow_distribution.h       | 16 ++++++++++++++++
 be/src/vec/sink/vtablet_sink_v2.cpp       | 10 ++++++++--
 be/src/vec/sink/vtablet_sink_v2.h         |  3 ++-
 be/src/vec/sink/writer/vtablet_writer.cpp | 18 +++++-------------
 be/src/vec/sink/writer/vtablet_writer.h   |  4 +---
 5 files changed, 32 insertions(+), 19 deletions(-)

diff --git a/be/src/vec/sink/vrow_distribution.h 
b/be/src/vec/sink/vrow_distribution.h
index 5da964d44fc..3376eb5ab6f 100644
--- a/be/src/vec/sink/vrow_distribution.h
+++ b/be/src/vec/sink/vrow_distribution.h
@@ -88,6 +88,22 @@ public:
         _schema = ctx->schema;
     }
 
+    Status open(RowDescriptor* output_row_desc) {
+        if (_vpartition->is_auto_partition()) {
+            auto [part_ctx, part_func] = _get_partition_function();
+            RETURN_IF_ERROR(part_ctx->prepare(_state, *output_row_desc));
+            RETURN_IF_ERROR(part_ctx->open(_state));
+        }
+        for (auto& index : _schema->indexes()) {
+            auto& where_clause = index->where_clause;
+            if (where_clause != nullptr) {
+                RETURN_IF_ERROR(where_clause->prepare(_state, 
*output_row_desc));
+                RETURN_IF_ERROR(where_clause->open(_state));
+            }
+        }
+        return Status::OK();
+    }
+
     // auto partition
     // mv where clause
     // v1 needs index->node->row_ids - tabletids
diff --git a/be/src/vec/sink/vtablet_sink_v2.cpp 
b/be/src/vec/sink/vtablet_sink_v2.cpp
index 08ddfb03811..3aa23e10595 100644
--- a/be/src/vec/sink/vtablet_sink_v2.cpp
+++ b/be/src/vec/sink/vtablet_sink_v2.cpp
@@ -91,7 +91,7 @@ static Status on_partitions_created(void* writer, 
TCreatePartitionResult* result
     return 
static_cast<VOlapTableSinkV2*>(writer)->on_partitions_created(result);
 }
 
-void VOlapTableSinkV2::_init_row_distribution() {
+Status VOlapTableSinkV2::_init_row_distribution() {
     VRowDistributionContext ctx;
 
     ctx.state = _state;
@@ -108,6 +108,10 @@ void VOlapTableSinkV2::_init_row_distribution() {
     ctx.schema = _schema;
 
     _row_distribution.init(&ctx);
+
+    RETURN_IF_ERROR(_row_distribution.open(_output_row_desc));
+
+    return Status::OK();
 }
 
 Status VOlapTableSinkV2::init(const TDataSink& t_sink) {
@@ -174,6 +178,8 @@ Status VOlapTableSinkV2::prepare(RuntimeState* state) {
     _block_convertor = 
std::make_unique<OlapTableBlockConvertor>(_output_tuple_desc);
     _block_convertor->init_autoinc_info(_schema->db_id(), _schema->table_id(),
                                         _state->batch_size());
+    _output_row_desc = _pool->add(new RowDescriptor(_output_tuple_desc, 
false));
+
     // add all counter
     _input_rows_counter = ADD_COUNTER(_profile, "RowsRead", TUnit::UNIT);
     _output_rows_counter = ADD_COUNTER(_profile, "RowsReturned", TUnit::UNIT);
@@ -209,7 +215,7 @@ Status VOlapTableSinkV2::open(RuntimeState* state) {
 
     _build_tablet_node_mapping();
     RETURN_IF_ERROR(_open_streams(state->backend_id()));
-    _init_row_distribution();
+    RETURN_IF_ERROR(_init_row_distribution());
 
     return Status::OK();
 }
diff --git a/be/src/vec/sink/vtablet_sink_v2.h 
b/be/src/vec/sink/vtablet_sink_v2.h
index 1a67ea581ec..6f369286677 100644
--- a/be/src/vec/sink/vtablet_sink_v2.h
+++ b/be/src/vec/sink/vtablet_sink_v2.h
@@ -119,7 +119,7 @@ public:
     Status on_partitions_created(TCreatePartitionResult* result);
 
 private:
-    void _init_row_distribution();
+    Status _init_row_distribution();
 
     Status _open_streams(int64_t src_id);
 
@@ -149,6 +149,7 @@ private:
 
     // this is tuple descriptor of destination OLAP table
     TupleDescriptor* _output_tuple_desc = nullptr;
+    RowDescriptor* _output_row_desc = nullptr;
 
     // number of senders used to insert into OlapTable, if we only support 
single node insert,
     // all data from select should collectted and then send to OlapTable.
diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp 
b/be/src/vec/sink/writer/vtablet_writer.cpp
index f3270c4b9ff..22b72f28cb4 100644
--- a/be/src/vec/sink/writer/vtablet_writer.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer.cpp
@@ -1063,7 +1063,7 @@ static Status on_partitions_created(void* writer, 
TCreatePartitionResult* result
     return static_cast<VTabletWriter*>(writer)->on_partitions_created(result);
 }
 
-void VTabletWriter::_init_row_distribution() {
+Status VTabletWriter::_init_row_distribution() {
     VRowDistributionContext ctx;
 
     ctx.state = _state;
@@ -1080,6 +1080,9 @@ void VTabletWriter::_init_row_distribution() {
     ctx.schema = _schema;
 
     _row_distribution.init(&ctx);
+
+    RETURN_IF_ERROR(_row_distribution.open(_output_row_desc));
+    return Status::OK();
 }
 
 Status VTabletWriter::_init(RuntimeState* state, RuntimeProfile* profile) {
@@ -1218,19 +1221,13 @@ Status VTabletWriter::_init(RuntimeState* state, 
RuntimeProfile* profile) {
         RETURN_IF_ERROR(_channels.back()->init(state, tablets));
     }
 
-    // prepare for auto partition functions
-    if (_vpartition->is_auto_partition()) {
-        auto [part_ctx, part_func] = _get_partition_function();
-        RETURN_IF_ERROR(part_ctx->prepare(_state, *_output_row_desc));
-        RETURN_IF_ERROR(part_ctx->open(_state));
-    }
     if (_group_commit) {
         RETURN_IF_ERROR(_state->exec_env()->wal_mgr()->add_wal_path(_db_id, 
_tb_id, _wal_id,
                                                                     
_state->import_label()));
         
RETURN_IF_ERROR(_state->exec_env()->wal_mgr()->create_wal_writer(_wal_id, 
_wal_writer));
     }
 
-    _init_row_distribution();
+    RETURN_IF_ERROR(_init_row_distribution());
 
     _inited = true;
     return Status::OK();
@@ -1287,11 +1284,6 @@ Status VTabletWriter::_incremental_open_node_channel(
     return Status::OK();
 }
 
-std::pair<vectorized::VExprContextSPtr, vectorized::VExprSPtr>
-VTabletWriter::_get_partition_function() {
-    return {_vpartition->get_part_func_ctx(), 
_vpartition->get_partition_function()};
-}
-
 Status VTabletWriter::_cancel_channel_and_check_intolerable_failure(
         Status status, const std::string& err_msg, const 
std::shared_ptr<IndexChannel> ich,
         const std::shared_ptr<VNodeChannel> nch) {
diff --git a/be/src/vec/sink/writer/vtablet_writer.h 
b/be/src/vec/sink/writer/vtablet_writer.h
index ea998a0f0b4..3b8df40beee 100644
--- a/be/src/vec/sink/writer/vtablet_writer.h
+++ b/be/src/vec/sink/writer/vtablet_writer.h
@@ -549,7 +549,7 @@ private:
     using ChannelDistributionPayload = std::unordered_map<VNodeChannel*, 
Payload>;
     using ChannelDistributionPayloadVec = 
std::vector<std::unordered_map<VNodeChannel*, Payload>>;
 
-    void _init_row_distribution();
+    Status _init_row_distribution();
 
     Status _init(RuntimeState* state, RuntimeProfile* profile);
 
@@ -564,8 +564,6 @@ private:
                                                          const 
std::shared_ptr<IndexChannel> ich,
                                                          const 
std::shared_ptr<VNodeChannel> nch);
 
-    std::pair<vectorized::VExprContextSPtr, vectorized::VExprSPtr> 
_get_partition_function();
-
     void _cancel_all_channel(Status status);
 
     void _save_missing_values(vectorized::ColumnPtr col, 
vectorized::DataTypePtr value_type,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to