This is an automated email from the ASF dual-hosted git repository.
zhangchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 1514f78b87e [refactor](partial-update) Split partial update infos from
tablet schema (#25147)
1514f78b87e is described below
commit 1514f78b87e3a3f09e2c8417d203b3968b8f5638
Author: bobhan1 <[email protected]>
AuthorDate: Tue Oct 17 14:21:40 2023 +0800
[refactor](partial-update) Split partial update infos from tablet schema
(#25147)
---
be/src/olap/compaction.cpp | 2 +-
be/src/olap/delta_writer.cpp | 1 +
be/src/olap/delta_writer_v2.cpp | 10 ++--
be/src/olap/delta_writer_v2.h | 3 ++
be/src/olap/memtable.cpp | 20 ++++---
be/src/olap/memtable.h | 6 ++-
be/src/olap/memtable_writer.cpp | 10 ++--
be/src/olap/memtable_writer.h | 4 ++
be/src/olap/partial_update_info.h | 54 +++++++++++++++++++
be/src/olap/rowset/beta_rowset_writer.cpp | 2 +-
be/src/olap/rowset/beta_rowset_writer.h | 8 +++
be/src/olap/rowset/beta_rowset_writer_v2.h | 8 +++
be/src/olap/rowset/rowset_writer.h | 4 ++
be/src/olap/rowset/rowset_writer_context.h | 5 ++
be/src/olap/rowset/segment_v2/segment_writer.cpp | 20 ++++---
be/src/olap/rowset_builder.cpp | 16 +++---
be/src/olap/rowset_builder.h | 7 +++
be/src/olap/tablet.cpp | 24 +++++----
be/src/olap/tablet.h | 7 ++-
be/src/olap/tablet_schema.cpp | 68 +-----------------------
be/src/olap/tablet_schema.h | 24 +--------
be/src/olap/txn_manager.cpp | 18 ++++---
be/src/olap/txn_manager.h | 5 +-
be/src/service/backend_service.cpp | 2 +-
gensrc/proto/descriptors.proto | 6 +--
gensrc/proto/olap_file.proto | 4 +-
26 files changed, 188 insertions(+), 150 deletions(-)
diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index 8fb2df2a49f..8383dac052f 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -685,7 +685,7 @@ Status Compaction::modify_rowsets(const Merger::Statistics*
stats) {
it.rowset_ids.insert(_output_rowset->rowset_id());
StorageEngine::instance()->txn_manager()->set_txn_related_delete_bitmap(
it.partition_id, it.transaction_id,
_tablet->tablet_id(),
- _tablet->tablet_uid(), true, it.delete_bitmap,
it.rowset_ids);
+ _tablet->tablet_uid(), true, it.delete_bitmap,
it.rowset_ids, nullptr);
}
}
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 2f2ff7cc938..2acb4069910 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -99,6 +99,7 @@ Status DeltaWriter::init() {
RETURN_IF_ERROR(_rowset_builder.init());
RETURN_IF_ERROR(
_memtable_writer->init(_rowset_builder.rowset_writer(),
_rowset_builder.tablet_schema(),
+ _rowset_builder.get_partial_update_info(),
_rowset_builder.tablet()->enable_unique_key_merge_on_write()));
ExecEnv::GetInstance()->memtable_memory_limiter()->register_writer(_memtable_writer);
_is_init = true;
diff --git a/be/src/olap/delta_writer_v2.cpp b/be/src/olap/delta_writer_v2.cpp
index 039ad714004..ef3ff23f9d8 100644
--- a/be/src/olap/delta_writer_v2.cpp
+++ b/be/src/olap/delta_writer_v2.cpp
@@ -50,6 +50,7 @@
#include "olap/schema_change.h"
#include "olap/storage_engine.h"
#include "olap/tablet_manager.h"
+#include "olap/tablet_schema.h"
#include "runtime/exec_env.h"
#include "service/backend_options.h"
#include "util/brpc_client_cache.h"
@@ -121,10 +122,11 @@ Status DeltaWriterV2::init() {
context.rowset_type = RowsetTypePB::BETA_ROWSET;
context.rowset_id = StorageEngine::instance()->next_rowset_id();
context.data_dir = nullptr;
+ context.partial_update_info = _partial_update_info;
_rowset_writer = std::make_shared<BetaRowsetWriterV2>(_streams);
RETURN_IF_ERROR(_rowset_writer->init(context));
- RETURN_IF_ERROR(_memtable_writer->init(_rowset_writer, _tablet_schema,
+ RETURN_IF_ERROR(_memtable_writer->init(_rowset_writer, _tablet_schema,
_partial_update_info,
_streams[0]->enable_unique_mow(_req.index_id)));
ExecEnv::GetInstance()->memtable_memory_limiter()->register_writer(_memtable_writer);
_is_init = true;
@@ -221,8 +223,10 @@ void DeltaWriterV2::_build_current_tablet_schema(int64_t
index_id,
_tablet_schema->set_table_id(table_schema_param->table_id());
// set partial update columns info
-
_tablet_schema->set_partial_update_info(table_schema_param->is_partial_update(),
-
table_schema_param->partial_update_input_columns());
+ _partial_update_info = std::make_shared<PartialUpdateInfo>();
+ _partial_update_info->init(*_tablet_schema,
table_schema_param->is_partial_update(),
+
table_schema_param->partial_update_input_columns(),
+ table_schema_param->is_strict_mode());
}
} // namespace doris
diff --git a/be/src/olap/delta_writer_v2.h b/be/src/olap/delta_writer_v2.h
index 0f8d21a19b1..b2b1f5f1c19 100644
--- a/be/src/olap/delta_writer_v2.h
+++ b/be/src/olap/delta_writer_v2.h
@@ -34,6 +34,7 @@
#include "olap/delta_writer_context.h"
#include "olap/memtable_writer.h"
#include "olap/olap_common.h"
+#include "olap/partial_update_info.h"
#include "olap/rowset/rowset.h"
#include "olap/tablet.h"
#include "olap/tablet_meta.h"
@@ -126,6 +127,8 @@ private:
MonotonicStopWatch _lock_watch;
std::vector<std::shared_ptr<LoadStreamStub>> _streams;
+
+ std::shared_ptr<PartialUpdateInfo> _partial_update_info;
};
} // namespace doris
diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index 47c16424478..d163abd26a7 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -49,7 +49,7 @@ using namespace ErrorCode;
MemTable::MemTable(int64_t tablet_id, const TabletSchema* tablet_schema,
const std::vector<SlotDescriptor*>* slot_descs,
TupleDescriptor* tuple_desc,
- bool enable_unique_key_mow,
+ bool enable_unique_key_mow, PartialUpdateInfo*
partial_update_info,
const std::shared_ptr<MemTracker>& insert_mem_tracker,
const std::shared_ptr<MemTracker>& flush_mem_tracker)
: _tablet_id(tablet_id),
@@ -77,8 +77,11 @@ MemTable::MemTable(int64_t tablet_id, const TabletSchema*
tablet_schema,
// TODO: Support ZOrderComparator in the future
_init_columns_offset_by_slot_descs(slot_descs, tuple_desc);
_num_columns = _tablet_schema->num_columns();
- if (_tablet_schema->is_partial_update()) {
- _num_columns = _tablet_schema->partial_input_column_size();
+ if (partial_update_info != nullptr) {
+ _is_partial_update = partial_update_info->is_partial_update;
+ if (_is_partial_update) {
+ _num_columns =
partial_update_info->partial_update_input_columns.size();
+ }
}
}
void MemTable::_init_columns_offset_by_slot_descs(const
std::vector<SlotDescriptor*>* slot_descs,
@@ -178,7 +181,7 @@ void MemTable::insert(const vectorized::Block* input_block,
const std::vector<in
_init_agg_functions(&target_block);
}
if (_tablet_schema->has_sequence_col()) {
- if (_tablet_schema->is_partial_update()) {
+ if (_is_partial_update) {
// for unique key partial update, sequence column index in
block
// may be different with the index in `_tablet_schema`
for (size_t i = 0; i < cloneBlock.columns(); i++) {
@@ -417,8 +420,8 @@ void MemTable::shrink_memtable_by_agg() {
bool MemTable::need_flush() const {
auto max_size = config::write_buffer_size;
- if (_tablet_schema->is_partial_update()) {
- auto update_columns_size = _tablet_schema->partial_input_column_size();
+ if (_is_partial_update) {
+ auto update_columns_size = _num_columns;
max_size = max_size * update_columns_size /
_tablet_schema->num_columns();
max_size = max_size > 1048576 ? max_size : 1048576;
}
@@ -428,11 +431,6 @@ bool MemTable::need_flush() const {
bool MemTable::need_agg() const {
if (_keys_type == KeysType::AGG_KEYS) {
auto max_size = config::write_buffer_size_for_agg;
- if (_tablet_schema->is_partial_update()) {
- auto update_columns_size =
_tablet_schema->partial_input_column_size();
- max_size = max_size * update_columns_size /
_tablet_schema->num_columns();
- max_size = max_size > 1048576 ? max_size : 1048576;
- }
return memory_usage() >= max_size;
}
return false;
diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h
index ed9226c4a0c..b98e7411e3b 100644
--- a/be/src/olap/memtable.h
+++ b/be/src/olap/memtable.h
@@ -31,6 +31,8 @@
#include "common/status.h"
#include "gutil/integral_types.h"
#include "olap/olap_common.h"
+#include "olap/partial_update_info.h"
+#include "olap/tablet_schema.h"
#include "runtime/memory/mem_tracker.h"
#include "vec/aggregate_functions/aggregate_function.h"
#include "vec/common/arena.h"
@@ -167,7 +169,8 @@ class MemTable {
public:
MemTable(int64_t tablet_id, const TabletSchema* tablet_schema,
const std::vector<SlotDescriptor*>* slot_descs, TupleDescriptor*
tuple_desc,
- bool enable_unique_key_mow, const std::shared_ptr<MemTracker>&
insert_mem_tracker,
+ bool enable_unique_key_mow, PartialUpdateInfo*
partial_update_info,
+ const std::shared_ptr<MemTracker>& insert_mem_tracker,
const std::shared_ptr<MemTracker>& flush_mem_tracker);
~MemTable();
@@ -202,6 +205,7 @@ private:
private:
int64_t _tablet_id;
bool _enable_unique_key_mow = false;
+ bool _is_partial_update = false;
const KeysType _keys_type;
const TabletSchema* _tablet_schema;
diff --git a/be/src/olap/memtable_writer.cpp b/be/src/olap/memtable_writer.cpp
index 4013f7fda99..2ef704f075a 100644
--- a/be/src/olap/memtable_writer.cpp
+++ b/be/src/olap/memtable_writer.cpp
@@ -39,6 +39,7 @@
#include "olap/rowset/rowset_writer.h"
#include "olap/schema_change.h"
#include "olap/storage_engine.h"
+#include "olap/tablet_schema.h"
#include "runtime/exec_env.h"
#include "runtime/memory/mem_tracker.h"
#include "service/backend_options.h"
@@ -63,10 +64,13 @@ MemTableWriter::~MemTableWriter() {
}
Status MemTableWriter::init(std::shared_ptr<RowsetWriter> rowset_writer,
- TabletSchemaSPtr tablet_schema, bool
unique_key_mow) {
+ TabletSchemaSPtr tablet_schema,
+ std::shared_ptr<PartialUpdateInfo>
partial_update_info,
+ bool unique_key_mow) {
_rowset_writer = rowset_writer;
_tablet_schema = tablet_schema;
_unique_key_mow = unique_key_mow;
+ _partial_update_info = partial_update_info;
_reset_mem_table();
@@ -195,8 +199,8 @@ void MemTableWriter::_reset_mem_table() {
_mem_table_flush_trackers.push_back(mem_table_flush_tracker);
}
_mem_table.reset(new MemTable(_req.tablet_id, _tablet_schema.get(),
_req.slots, _req.tuple_desc,
- _unique_key_mow, mem_table_insert_tracker,
- mem_table_flush_tracker));
+ _unique_key_mow, _partial_update_info.get(),
+ mem_table_insert_tracker,
mem_table_flush_tracker));
_segment_num++;
}
diff --git a/be/src/olap/memtable_writer.h b/be/src/olap/memtable_writer.h
index b374f10bded..3491f72abd5 100644
--- a/be/src/olap/memtable_writer.h
+++ b/be/src/olap/memtable_writer.h
@@ -33,6 +33,7 @@
#include "olap/delta_writer_context.h"
#include "olap/memtable.h"
#include "olap/olap_common.h"
+#include "olap/partial_update_info.h"
#include "olap/rowset/rowset.h"
#include "olap/tablet.h"
#include "olap/tablet_meta.h"
@@ -67,6 +68,7 @@ public:
~MemTableWriter();
Status init(std::shared_ptr<RowsetWriter> rowset_writer, TabletSchemaSPtr
tablet_schema,
+ std::shared_ptr<PartialUpdateInfo> partial_update_info,
bool unique_key_mow = false);
Status write(const vectorized::Block* block, const std::vector<int>&
row_idxs,
@@ -141,6 +143,8 @@ private:
int64_t _segment_num = 0;
MonotonicStopWatch _lock_watch;
+
+ std::shared_ptr<PartialUpdateInfo> _partial_update_info;
};
} // namespace doris
diff --git a/be/src/olap/partial_update_info.h
b/be/src/olap/partial_update_info.h
new file mode 100644
index 00000000000..cdea698b20d
--- /dev/null
+++ b/be/src/olap/partial_update_info.h
@@ -0,0 +1,54 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "olap/tablet_schema.h"
+
+namespace doris {
+
+struct PartialUpdateInfo {
+ void init(const TabletSchema& tablet_schema, bool partial_update,
+ const std::set<string>& partial_update_cols, bool
is_strict_mode) {
+ is_partial_update = partial_update;
+ partial_update_input_columns = partial_update_cols;
+ missing_cids.clear();
+ update_cids.clear();
+ for (auto i = 0; i < tablet_schema.num_columns(); ++i) {
+ auto tablet_column = tablet_schema.column(i);
+ if (!partial_update_input_columns.contains(tablet_column.name())) {
+ missing_cids.emplace_back(i);
+ if (!tablet_column.has_default_value() &&
!tablet_column.is_nullable()) {
+ can_insert_new_rows_in_partial_update = false;
+ }
+ } else {
+ update_cids.emplace_back(i);
+ }
+ }
+ this->is_strict_mode = is_strict_mode;
+ }
+
+ bool is_partial_update {false};
+ std::set<std::string> partial_update_input_columns;
+ std::vector<uint32_t> missing_cids;
+ std::vector<uint32_t> update_cids;
+ // if key not exist in old rowset, use default value or null value for the
unmentioned cols
+ // to generate a new row, only available in non-strict mode
+ bool can_insert_new_rows_in_partial_update {true};
+ bool is_strict_mode {false};
+};
+} // namespace doris
diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp
b/be/src/olap/rowset/beta_rowset_writer.cpp
index b8c578ff80d..9262d603e28 100644
--- a/be/src/olap/rowset/beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer.cpp
@@ -135,7 +135,7 @@ Status BetaRowsetWriter::add_block(const vectorized::Block*
block) {
Status BetaRowsetWriter::_generate_delete_bitmap(int32_t segment_id) {
SCOPED_RAW_TIMER(&_delete_bitmap_ns);
if (!_context.tablet->enable_unique_key_merge_on_write() ||
- _context.tablet_schema->is_partial_update()) {
+ (_context.partial_update_info &&
_context.partial_update_info->is_partial_update)) {
return Status::OK();
}
auto rowset = _build_tmp();
diff --git a/be/src/olap/rowset/beta_rowset_writer.h
b/be/src/olap/rowset/beta_rowset_writer.h
index 3691a661403..859ab84c52f 100644
--- a/be/src/olap/rowset/beta_rowset_writer.h
+++ b/be/src/olap/rowset/beta_rowset_writer.h
@@ -131,6 +131,14 @@ public:
int64_t segment_writer_ns() override { return _segment_writer_ns; }
+ std::shared_ptr<PartialUpdateInfo> get_partial_update_info() override {
+ return _context.partial_update_info;
+ }
+
+ bool is_partial_update() override {
+ return _context.partial_update_info &&
_context.partial_update_info->is_partial_update;
+ }
+
private:
Status _create_file_writer(std::string path, io::FileWriterPtr&
file_writer);
Status _check_segment_number_limit();
diff --git a/be/src/olap/rowset/beta_rowset_writer_v2.h
b/be/src/olap/rowset/beta_rowset_writer_v2.h
index 1279a564ff3..c845425bb10 100644
--- a/be/src/olap/rowset/beta_rowset_writer_v2.h
+++ b/be/src/olap/rowset/beta_rowset_writer_v2.h
@@ -134,6 +134,14 @@ public:
int64_t segment_writer_ns() override { return _segment_writer_ns; }
+ std::shared_ptr<PartialUpdateInfo> get_partial_update_info() override {
+ return _context.partial_update_info;
+ }
+
+ bool is_partial_update() override {
+ return _context.partial_update_info &&
_context.partial_update_info->is_partial_update;
+ }
+
private:
RowsetWriterContext _context;
diff --git a/be/src/olap/rowset/rowset_writer.h
b/be/src/olap/rowset/rowset_writer.h
index d32c813233b..869abea483e 100644
--- a/be/src/olap/rowset/rowset_writer.h
+++ b/be/src/olap/rowset/rowset_writer.h
@@ -151,6 +151,10 @@ public:
virtual int64_t segment_writer_ns() { return 0; }
+ virtual std::shared_ptr<PartialUpdateInfo> get_partial_update_info() = 0;
+
+ virtual bool is_partial_update() = 0;
+
private:
DISALLOW_COPY_AND_ASSIGN(RowsetWriter);
};
diff --git a/be/src/olap/rowset/rowset_writer_context.h
b/be/src/olap/rowset/rowset_writer_context.h
index 985efa0809e..6c4c7a04e0c 100644
--- a/be/src/olap/rowset/rowset_writer_context.h
+++ b/be/src/olap/rowset/rowset_writer_context.h
@@ -21,6 +21,7 @@
#include "io/fs/file_system.h"
#include "olap/olap_define.h"
+#include "olap/partial_update_info.h"
#include "olap/tablet.h"
#include "olap/tablet_schema.h"
@@ -105,6 +106,10 @@ struct RowsetWriterContext {
// segcompaction for this RowsetWriter, disable it for some transient
writers
bool enable_segcompaction = true;
+
+ std::shared_ptr<PartialUpdateInfo> partial_update_info;
+
+ bool is_transient_rowset_writer = false;
};
} // namespace doris
diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp
b/be/src/olap/rowset/segment_v2/segment_writer.cpp
index 9508557d0ef..0a6e72ad662 100644
--- a/be/src/olap/rowset/segment_v2/segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp
@@ -333,9 +333,10 @@ Status
SegmentWriter::append_block_with_partial_content(const vectorized::Block*
}
DCHECK(_tablet_schema->keys_type() == UNIQUE_KEYS &&
_opts.enable_unique_key_merge_on_write);
+ DCHECK(_opts.rowset_ctx->partial_update_info);
// find missing column cids
- std::vector<uint32_t> missing_cids = _tablet_schema->get_missing_cids();
- std::vector<uint32_t> including_cids = _tablet_schema->get_update_cids();
+ std::vector<uint32_t> missing_cids =
_opts.rowset_ctx->partial_update_info->missing_cids;
+ std::vector<uint32_t> including_cids =
_opts.rowset_ctx->partial_update_info->update_cids;
// create full block and fill with input columns
auto full_block = _tablet_schema->create_block();
@@ -421,7 +422,7 @@ Status
SegmentWriter::append_block_with_partial_content(const vectorized::Block*
auto st = _tablet->lookup_row_key(key, have_input_seq_column,
specified_rowsets, &loc,
_mow_context->max_version,
segment_caches, &rowset);
if (st.is<KEY_NOT_FOUND>()) {
- if (_tablet_schema->is_strict_mode()) {
+ if (_opts.rowset_ctx->partial_update_info->is_strict_mode) {
++num_rows_filtered;
// delete the invalid newly inserted row
_mow_context->delete_bitmap->add({_opts.rowset_ctx->rowset_id,
_segment_id,
@@ -429,7 +430,7 @@ Status
SegmentWriter::append_block_with_partial_content(const vectorized::Block*
segment_pos);
}
- if (!_tablet_schema->can_insert_new_rows_in_partial_update()) {
+ if
(!_opts.rowset_ctx->partial_update_info->can_insert_new_rows_in_partial_update)
{
return Status::InternalError(
"the unmentioned columns should have default value or
be nullable for "
"newly inserted rows in non-strict mode partial
update");
@@ -492,7 +493,7 @@ Status
SegmentWriter::append_block_with_partial_content(const vectorized::Block*
}
// convert missing columns and send to column writer
- auto cids_missing = _tablet_schema->get_missing_cids();
+ auto cids_missing = _opts.rowset_ctx->partial_update_info->missing_cids;
_olap_data_convertor->set_source_content_with_specifid_columns(&full_block,
row_pos, num_rows,
cids_missing);
for (auto cid : cids_missing) {
@@ -545,8 +546,8 @@ Status
SegmentWriter::fill_missing_columns(vectorized::MutableColumns& mutable_f
bool has_default_or_nullable,
const size_t& segment_start_pos) {
// create old value columns
- auto old_value_block = _tablet_schema->create_missing_columns_block();
- std::vector<uint32_t> cids_missing = _tablet_schema->get_missing_cids();
+ std::vector<uint32_t> cids_missing =
_opts.rowset_ctx->partial_update_info->missing_cids;
+ auto old_value_block = _tablet_schema->create_block_by_cids(cids_missing);
CHECK(cids_missing.size() == old_value_block.columns());
auto mutable_old_columns = old_value_block.mutate_columns();
bool has_row_column = _tablet_schema->store_row_column();
@@ -652,7 +653,10 @@ Status
SegmentWriter::fill_missing_columns(vectorized::MutableColumns& mutable_f
Status SegmentWriter::append_block(const vectorized::Block* block, size_t
row_pos,
size_t num_rows) {
- if (_tablet_schema->is_partial_update() && _opts.write_type ==
DataWriteType::TYPE_DIRECT) {
+ if (_opts.rowset_ctx->partial_update_info &&
+ _opts.rowset_ctx->partial_update_info->is_partial_update &&
+ _opts.write_type == DataWriteType::TYPE_DIRECT &&
+ !_opts.rowset_ctx->is_transient_rowset_writer) {
RETURN_IF_ERROR(append_block_with_partial_content(block, row_pos,
num_rows));
return Status::OK();
}
diff --git a/be/src/olap/rowset_builder.cpp b/be/src/olap/rowset_builder.cpp
index 0173643b8c5..1e1facab69a 100644
--- a/be/src/olap/rowset_builder.cpp
+++ b/be/src/olap/rowset_builder.cpp
@@ -43,6 +43,7 @@
#include "olap/storage_engine.h"
#include "olap/tablet_manager.h"
#include "olap/tablet_meta.h"
+#include "olap/tablet_schema.h"
#include "olap/txn_manager.h"
#include "util/brpc_client_cache.h"
#include "util/mem_info.h"
@@ -172,6 +173,7 @@ Status RowsetBuilder::init() {
context.write_type = DataWriteType::TYPE_DIRECT;
context.mow_context = mow_context;
context.write_file_cache = _req.write_file_cache;
+ context.partial_update_info = _partial_update_info;
std::unique_ptr<RowsetWriter> rowset_writer;
RETURN_IF_ERROR(_tablet->create_rowset_writer(context, &rowset_writer));
_rowset_writer = std::move(rowset_writer);
@@ -223,7 +225,7 @@ Status RowsetBuilder::submit_calc_delete_bitmap_task() {
// For partial update, we need to fill in the entire row of data, during
the calculation
// of the delete bitmap. This operation is resource-intensive, and we need
to minimize
// the number of times it occurs. Therefore, we skip this operation here.
- if (_rowset->tablet_schema()->is_partial_update()) {
+ if (_partial_update_info->is_partial_update) {
return Status::OK();
}
@@ -235,8 +237,7 @@ Status RowsetBuilder::submit_calc_delete_bitmap_task() {
}
Status RowsetBuilder::wait_calc_delete_bitmap() {
- if (!_tablet->enable_unique_key_merge_on_write() ||
- _rowset->tablet_schema()->is_partial_update()) {
+ if (!_tablet->enable_unique_key_merge_on_write() ||
_partial_update_info->is_partial_update) {
return Status::OK();
}
std::lock_guard<std::mutex> l(_lock);
@@ -278,7 +279,7 @@ Status RowsetBuilder::commit_txn() {
if (_tablet->enable_unique_key_merge_on_write()) {
_storage_engine->txn_manager()->set_txn_related_delete_bitmap(
_req.partition_id, _req.txn_id, _tablet->tablet_id(),
_tablet->tablet_uid(), true,
- _delete_bitmap, _rowset_ids);
+ _delete_bitmap, _rowset_ids, _partial_update_info);
}
_is_committed = true;
@@ -321,9 +322,10 @@ void RowsetBuilder::_build_current_tablet_schema(int64_t
index_id,
_tablet_schema->set_table_id(table_schema_param->table_id());
// set partial update columns info
-
_tablet_schema->set_partial_update_info(table_schema_param->is_partial_update(),
-
table_schema_param->partial_update_input_columns());
- _tablet_schema->set_is_strict_mode(table_schema_param->is_strict_mode());
+ _partial_update_info = std::make_shared<PartialUpdateInfo>();
+ _partial_update_info->init(*_tablet_schema,
table_schema_param->is_partial_update(),
+
table_schema_param->partial_update_input_columns(),
+ table_schema_param->is_strict_mode());
}
} // namespace doris
diff --git a/be/src/olap/rowset_builder.h b/be/src/olap/rowset_builder.h
index 8bb94c20905..b3193f2eafe 100644
--- a/be/src/olap/rowset_builder.h
+++ b/be/src/olap/rowset_builder.h
@@ -32,6 +32,7 @@
#include "common/status.h"
#include "olap/delta_writer_context.h"
#include "olap/olap_common.h"
+#include "olap/partial_update_info.h"
#include "olap/rowset/rowset.h"
#include "olap/tablet.h"
#include "olap/tablet_meta.h"
@@ -86,6 +87,10 @@ public:
// For UT
DeleteBitmapPtr get_delete_bitmap() { return _delete_bitmap; }
+ std::shared_ptr<PartialUpdateInfo> get_partial_update_info() const {
+ return _partial_update_info;
+ }
+
private:
void _garbage_collection();
@@ -113,6 +118,8 @@ private:
// current rowset_ids, used to do diff in publish_version
RowsetIdUnorderedSet _rowset_ids;
+ std::shared_ptr<PartialUpdateInfo> _partial_update_info;
+
RuntimeProfile* _profile = nullptr;
RuntimeProfile::Counter* _build_rowset_timer = nullptr;
RuntimeProfile::Counter* _submit_delete_bitmap_timer = nullptr;
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 8d11e39731e..12ef819d6fb 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -2006,14 +2006,14 @@ Status
Tablet::create_rowset_writer(RowsetWriterContext& context,
// create a rowset writer with rowset_id and seg_id
// after writer, merge this transient rowset with original rowset
-Status Tablet::create_transient_rowset_writer(RowsetSharedPtr rowset_ptr,
- std::unique_ptr<RowsetWriter>*
rowset_writer) {
+Status Tablet::create_transient_rowset_writer(
+ RowsetSharedPtr rowset_ptr, std::unique_ptr<RowsetWriter>*
rowset_writer,
+ std::shared_ptr<PartialUpdateInfo> partial_update_info) {
RowsetWriterContext context;
context.rowset_state = PREPARED;
context.segments_overlap = OVERLAPPING;
context.tablet_schema = std::make_shared<TabletSchema>();
context.tablet_schema->copy_from(*(rowset_ptr->tablet_schema()));
- context.tablet_schema->set_partial_update_info(false,
std::set<std::string>());
context.newest_write_timestamp = UnixSeconds();
context.tablet_id = table_id();
context.enable_segcompaction = false;
@@ -2021,6 +2021,8 @@ Status
Tablet::create_transient_rowset_writer(RowsetSharedPtr rowset_ptr,
// get the shared_ptr from tablet_manager.
context.tablet =
StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id());
context.write_type = DataWriteType::TYPE_DIRECT;
+ context.partial_update_info = partial_update_info;
+ context.is_transient_rowset_writer = true;
RETURN_IF_ERROR(
create_transient_rowset_writer(context, rowset_ptr->rowset_id(),
rowset_writer));
(*rowset_writer)->set_segment_start_id(rowset_ptr->num_segments());
@@ -2927,7 +2929,7 @@ Status Tablet::calc_segment_delete_bitmap(RowsetSharedPtr
rowset,
auto rowset_id = rowset->rowset_id();
Version dummy_version(end_version + 1, end_version + 1);
auto rowset_schema = rowset->tablet_schema();
- bool is_partial_update = rowset_schema->is_partial_update();
+ bool is_partial_update = rowset_writer &&
rowset_writer->is_partial_update();
// use for partial update
PartialUpdateReadPlan read_plan_ori;
PartialUpdateReadPlan read_plan_update;
@@ -3048,8 +3050,11 @@ Status
Tablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset,
}
if (pos > 0) {
+ auto partial_update_info = rowset_writer->get_partial_update_info();
+ DCHECK(partial_update_info);
RETURN_IF_ERROR(generate_new_block_for_partial_update(
- rowset_schema, read_plan_ori, read_plan_update,
rsid_to_rowset, &block));
+ rowset_schema, partial_update_info->missing_cids,
partial_update_info->update_cids,
+ read_plan_ori, read_plan_update, rsid_to_rowset, &block));
sort_block(block, ordered_block);
RETURN_IF_ERROR(rowset_writer->flush_single_block(&ordered_block));
}
@@ -3123,7 +3128,8 @@ std::vector<RowsetSharedPtr> Tablet::get_rowset_by_ids(
}
Status Tablet::generate_new_block_for_partial_update(
- TabletSchemaSPtr rowset_schema, const PartialUpdateReadPlan&
read_plan_ori,
+ TabletSchemaSPtr rowset_schema, const std::vector<uint32>&
missing_cids,
+ const std::vector<uint32>& update_cids, const PartialUpdateReadPlan&
read_plan_ori,
const PartialUpdateReadPlan& read_plan_update,
const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset,
vectorized::Block* output_block) {
@@ -3134,10 +3140,8 @@ Status Tablet::generate_new_block_for_partial_update(
// 4. mark current keys deleted
CHECK(output_block);
auto full_mutable_columns = output_block->mutate_columns();
- auto old_block = rowset_schema->create_missing_columns_block();
- auto missing_cids = rowset_schema->get_missing_cids();
- auto update_block = rowset_schema->create_update_columns_block();
- auto update_cids = rowset_schema->get_update_cids();
+ auto old_block = rowset_schema->create_block_by_cids(missing_cids);
+ auto update_block = rowset_schema->create_block_by_cids(update_cids);
std::map<uint32_t, uint32_t> read_index_old;
RETURN_IF_ERROR(read_columns_by_plan(rowset_schema, missing_cids,
read_plan_ori, rsid_to_rowset,
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index c7e7dcc6e31..576dcce555f 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -44,6 +44,7 @@
#include "olap/binlog_config.h"
#include "olap/data_dir.h"
#include "olap/olap_common.h"
+#include "olap/partial_update_info.h"
#include "olap/rowset/rowset.h"
#include "olap/rowset/rowset_meta.h"
#include "olap/rowset/rowset_reader.h"
@@ -351,7 +352,8 @@ public:
std::unique_ptr<RowsetWriter>* rowset_writer);
Status create_transient_rowset_writer(RowsetSharedPtr rowset_ptr,
- std::unique_ptr<RowsetWriter>*
rowset_writer);
+ std::unique_ptr<RowsetWriter>*
rowset_writer,
+ std::shared_ptr<PartialUpdateInfo>
partial_update_info);
Status create_transient_rowset_writer(RowsetWriterContext& context, const
RowsetId& rowset_id,
std::unique_ptr<RowsetWriter>*
rowset_writer);
@@ -477,7 +479,8 @@ public:
void prepare_to_read(const RowLocation& row_location, size_t pos,
PartialUpdateReadPlan* read_plan);
Status generate_new_block_for_partial_update(
- TabletSchemaSPtr rowset_schema, const PartialUpdateReadPlan&
read_plan_ori,
+ TabletSchemaSPtr rowset_schema, const std::vector<uint32>&
missing_cids,
+ const std::vector<uint32>& update_cids, const
PartialUpdateReadPlan& read_plan_ori,
const PartialUpdateReadPlan& read_plan_update,
const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset,
vectorized::Block* output_block);
diff --git a/be/src/olap/tablet_schema.cpp b/be/src/olap/tablet_schema.cpp
index 9cbc9ab608f..20260f2f4ff 100644
--- a/be/src/olap/tablet_schema.cpp
+++ b/be/src/olap/tablet_schema.cpp
@@ -710,9 +710,6 @@ void TabletSchema::init_from_pb(const TabletSchemaPB&
schema) {
_indexes.clear();
_field_name_to_index.clear();
_field_id_to_index.clear();
- _partial_update_input_columns.clear();
- _missing_cids.clear();
- _update_cids.clear();
for (auto& column_pb : schema.column()) {
TabletColumn column;
column.init_from_pb(column_pb);
@@ -758,23 +755,6 @@ void TabletSchema::init_from_pb(const TabletSchemaPB&
schema) {
_sort_col_num = schema.sort_col_num();
_compression_type = schema.compression_type();
_schema_version = schema.schema_version();
- _is_partial_update = schema.is_partial_update();
- for (auto& col_name : schema.partial_update_input_columns()) {
- _partial_update_input_columns.emplace(col_name);
- }
- if (_is_partial_update) {
- for (auto i = 0; i < _cols.size(); ++i) {
- if (_partial_update_input_columns.count(_cols[i].name()) == 0) {
- _missing_cids.emplace_back(i);
- auto tablet_column = column(i);
- if (!tablet_column.has_default_value() &&
!tablet_column.is_nullable()) {
- _can_insert_new_rows_in_partial_update = false;
- }
- } else {
- _update_cids.emplace_back(i);
- }
- }
- }
}
void TabletSchema::copy_from(const TabletSchema& tablet_schema) {
@@ -916,10 +896,6 @@ void TabletSchema::to_schema_pb(TabletSchemaPB*
tablet_schema_pb) const {
tablet_schema_pb->set_schema_version(_schema_version);
tablet_schema_pb->set_compression_type(_compression_type);
tablet_schema_pb->set_version_col_idx(_version_col_idx);
- tablet_schema_pb->set_is_partial_update(_is_partial_update);
- for (auto& col : _partial_update_input_columns) {
- *tablet_schema_pb->add_partial_update_input_columns() = col;
- }
}
size_t TabletSchema::row_size() const {
@@ -1103,19 +1079,9 @@ vectorized::Block TabletSchema::create_block(bool
ignore_dropped_col) const {
return block;
}
-vectorized::Block TabletSchema::create_missing_columns_block() {
- vectorized::Block block;
- for (const auto& cid : _missing_cids) {
- auto col = _cols[cid];
- auto data_type =
vectorized::DataTypeFactory::instance().create_data_type(col);
- block.insert({data_type->create_column(), data_type, col.name()});
- }
- return block;
-}
-
-vectorized::Block TabletSchema::create_update_columns_block() {
+vectorized::Block TabletSchema::create_block_by_cids(const
std::vector<uint32_t>& cids) {
vectorized::Block block;
- for (const auto& cid : _update_cids) {
+ for (const auto& cid : cids) {
auto col = _cols[cid];
auto data_type =
vectorized::DataTypeFactory::instance().create_data_type(col);
block.insert({data_type->create_column(), data_type, col.name()});
@@ -1123,36 +1089,6 @@ vectorized::Block
TabletSchema::create_update_columns_block() {
return block;
}
-void TabletSchema::set_partial_update_info(bool is_partial_update,
- const std::set<string>&
partial_update_input_columns) {
- _is_partial_update = is_partial_update;
- _partial_update_input_columns = partial_update_input_columns;
- _missing_cids.clear();
- _update_cids.clear();
- for (auto i = 0; i < _cols.size(); ++i) {
- if (_partial_update_input_columns.count(_cols[i].name()) == 0) {
- _missing_cids.emplace_back(i);
- auto tablet_column = column(i);
- if (!tablet_column.has_default_value() &&
!tablet_column.is_nullable()) {
- _can_insert_new_rows_in_partial_update = false;
- }
- } else {
- _update_cids.emplace_back(i);
- }
- }
-}
-
-bool TabletSchema::is_column_missing(size_t cid) const {
- DCHECK(cid < _cols.size());
- if (!_is_partial_update) {
- return false;
- }
- if (_partial_update_input_columns.count(_cols[cid].name()) == 0) {
- return true;
- }
- return false;
-}
-
bool operator==(const TabletColumn& a, const TabletColumn& b) {
if (a._unique_id != b._unique_id) return false;
if (a._col_name != b._col_name) return false;
diff --git a/be/src/olap/tablet_schema.h b/be/src/olap/tablet_schema.h
index 5e6ffa3ac77..42fabc6c2f6 100644
--- a/be/src/olap/tablet_schema.h
+++ b/be/src/olap/tablet_schema.h
@@ -351,20 +351,7 @@ public:
return str;
}
- vectorized::Block create_missing_columns_block();
- vectorized::Block create_update_columns_block();
- void set_partial_update_info(bool is_partial_update,
- const std::set<string>&
partial_update_input_columns);
- bool is_partial_update() const { return _is_partial_update; }
- size_t partial_input_column_size() const { return
_partial_update_input_columns.size(); }
- bool is_column_missing(size_t cid) const;
- bool can_insert_new_rows_in_partial_update() const {
- return _can_insert_new_rows_in_partial_update;
- }
- void set_is_strict_mode(bool is_strict_mode) { _is_strict_mode =
is_strict_mode; }
- bool is_strict_mode() const { return _is_strict_mode; }
- std::vector<uint32_t> get_missing_cids() const { return _missing_cids; }
- std::vector<uint32_t> get_update_cids() const { return _update_cids; }
+ vectorized::Block create_block_by_cids(const std::vector<uint32_t>& cids);
private:
friend bool operator==(const TabletSchema& a, const TabletSchema& b);
@@ -402,15 +389,6 @@ private:
int64_t _mem_size = 0;
bool _store_row_column = false;
bool _skip_write_index_on_load = false;
-
- bool _is_partial_update;
- std::set<std::string> _partial_update_input_columns;
- std::vector<uint32_t> _missing_cids;
- std::vector<uint32_t> _update_cids;
- // if key not exist in old rowset, use default value or null value for the
unmentioned cols
- // to generate a new row, only available in non-strict mode
- bool _can_insert_new_rows_in_partial_update = true;
- bool _is_strict_mode = false;
};
bool operator==(const TabletSchema& a, const TabletSchema& b);
diff --git a/be/src/olap/txn_manager.cpp b/be/src/olap/txn_manager.cpp
index 6d14e620d96..e5d7ceffa91 100644
--- a/be/src/olap/txn_manager.cpp
+++ b/be/src/olap/txn_manager.cpp
@@ -35,6 +35,7 @@
#include "common/logging.h"
#include "olap/data_dir.h"
#include "olap/delta_writer.h"
+#include "olap/olap_common.h"
#include "olap/rowset/rowset_meta.h"
#include "olap/rowset/rowset_meta_manager.h"
#include "olap/schema_change.h"
@@ -175,11 +176,11 @@ Status TxnManager::delete_txn(TPartitionId partition_id,
const TabletSharedPtr&
tablet->tablet_id(), tablet->tablet_uid());
}
-void TxnManager::set_txn_related_delete_bitmap(TPartitionId partition_id,
- TTransactionId transaction_id,
TTabletId tablet_id,
- TabletUid tablet_uid, bool
unique_key_merge_on_write,
- DeleteBitmapPtr delete_bitmap,
- const RowsetIdUnorderedSet&
rowset_ids) {
+void TxnManager::set_txn_related_delete_bitmap(
+ TPartitionId partition_id, TTransactionId transaction_id, TTabletId
tablet_id,
+ TabletUid tablet_uid, bool unique_key_merge_on_write, DeleteBitmapPtr
delete_bitmap,
+ const RowsetIdUnorderedSet& rowset_ids,
+ std::shared_ptr<PartialUpdateInfo> partial_update_info) {
pair<int64_t, int64_t> key(partition_id, transaction_id);
TabletInfo tablet_info(tablet_id, tablet_uid);
@@ -205,6 +206,7 @@ void TxnManager::set_txn_related_delete_bitmap(TPartitionId
partition_id,
load_info.unique_key_merge_on_write = unique_key_merge_on_write;
load_info.delete_bitmap = delete_bitmap;
load_info.rowset_ids = rowset_ids;
+ load_info.partial_update_info = partial_update_info;
}
}
@@ -363,7 +365,8 @@ Status TxnManager::publish_txn(OlapMeta* meta, TPartitionId
partition_id,
// update delete_bitmap
if (tablet_txn_info.unique_key_merge_on_write) {
std::unique_ptr<RowsetWriter> rowset_writer;
- static_cast<void>(tablet->create_transient_rowset_writer(rowset,
&rowset_writer));
+ static_cast<void>(tablet->create_transient_rowset_writer(
+ rowset, &rowset_writer, tablet_txn_info.partial_update_info));
int64_t t2 = MonotonicMicros();
RETURN_IF_ERROR(tablet->update_delete_bitmap(rowset,
tablet_txn_info.rowset_ids,
@@ -371,7 +374,8 @@ Status TxnManager::publish_txn(OlapMeta* meta, TPartitionId
partition_id,
rowset_writer.get()));
int64_t t3 = MonotonicMicros();
stats->calc_delete_bitmap_time_us = t3 - t2;
- if (rowset->tablet_schema()->is_partial_update()) {
+ if (tablet_txn_info.partial_update_info &&
+ tablet_txn_info.partial_update_info->is_partial_update) {
// build rowset writer and merge transient rowset
RETURN_IF_ERROR(rowset_writer->flush());
RowsetSharedPtr transient_rowset;
diff --git a/be/src/olap/txn_manager.h b/be/src/olap/txn_manager.h
index c311fed8799..a7dcc852d9b 100644
--- a/be/src/olap/txn_manager.h
+++ b/be/src/olap/txn_manager.h
@@ -36,6 +36,7 @@
#include "common/status.h"
#include "olap/olap_common.h"
+#include "olap/partial_update_info.h"
#include "olap/rowset/rowset.h"
#include "olap/rowset/rowset_meta.h"
#include "olap/rowset/segment_v2/segment.h"
@@ -59,6 +60,7 @@ struct TabletTxnInfo {
RowsetIdUnorderedSet rowset_ids;
int64_t creation_time;
bool ingest {false};
+ std::shared_ptr<PartialUpdateInfo> partial_update_info;
TabletTxnInfo(PUniqueId load_id, RowsetSharedPtr rowset)
: load_id(load_id), rowset(rowset), creation_time(UnixSeconds()) {}
@@ -185,7 +187,8 @@ public:
TTabletId tablet_id, TabletUid
tablet_uid,
bool unique_key_merge_on_write,
DeleteBitmapPtr delete_bitmap,
- const RowsetIdUnorderedSet& rowset_ids);
+ const RowsetIdUnorderedSet& rowset_ids,
+ std::shared_ptr<PartialUpdateInfo>
partial_update_info);
void get_all_commit_tablet_txn_info_by_tablet(
const TabletSharedPtr& tablet, CommitTabletTxnInfoVec*
commit_tablet_txn_info_vec);
diff --git a/be/src/service/backend_service.cpp
b/be/src/service/backend_service.cpp
index 69c9e6608e7..436187d779b 100644
--- a/be/src/service/backend_service.cpp
+++ b/be/src/service/backend_service.cpp
@@ -717,7 +717,7 @@ void BackendService::ingest_binlog(TIngestBinlogResult&
result,
if (local_tablet->enable_unique_key_merge_on_write()) {
StorageEngine::instance()->txn_manager()->set_txn_related_delete_bitmap(
partition_id, txn_id, local_tablet_id,
local_tablet->tablet_uid(), true,
- delete_bitmap, pre_rowset_ids);
+ delete_bitmap, pre_rowset_ids, nullptr);
}
tstatus.__set_status_code(TStatusCode::OK);
diff --git a/gensrc/proto/descriptors.proto b/gensrc/proto/descriptors.proto
index abebf8fde5e..99101767644 100644
--- a/gensrc/proto/descriptors.proto
+++ b/gensrc/proto/descriptors.proto
@@ -64,8 +64,8 @@ message POlapTableSchemaParam {
repeated PSlotDescriptor slot_descs = 4;
required PTupleDescriptor tuple_desc = 5;
repeated POlapTableIndexSchema indexes = 6;
- optional bool partial_update = 7;
- repeated string partial_update_input_columns = 8;
- optional bool is_strict_mode = 9 [default = false];
+ optional bool partial_update = 7; // deprecated
+ repeated string partial_update_input_columns = 8; // deprecated
+ optional bool is_strict_mode = 9 [default = false]; // deprecated
};
diff --git a/gensrc/proto/olap_file.proto b/gensrc/proto/olap_file.proto
index fe5f76f6b4f..4c49e31a7f4 100644
--- a/gensrc/proto/olap_file.proto
+++ b/gensrc/proto/olap_file.proto
@@ -250,8 +250,8 @@ message TabletSchemaPB {
optional int32 version_col_idx = 17 [default = -1];
optional bool store_row_column = 18 [default=false]; // store tuplerow
oriented column
optional bool is_dynamic_schema = 19 [default=false]; // deprecated
- optional bool is_partial_update = 20 [default=false];
- repeated string partial_update_input_columns = 21;
+ optional bool is_partial_update = 20 [default=false]; // deprecated
+ repeated string partial_update_input_columns = 21; // deprecated
optional bool enable_single_replica_compaction = 22 [default=false];
optional bool skip_write_index_on_load = 23 [default=false];
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]