This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 29fbe749cd [refactor](load) split rowset builder out of delta writer
(#22805)
29fbe749cd is described below
commit 29fbe749cd1bd384d6075d0c70a522d8cce9fe19
Author: Kaijie Chen <[email protected]>
AuthorDate: Mon Aug 14 10:32:58 2023 +0800
[refactor](load) split rowset builder out of delta writer (#22805)
---
be/src/olap/delta_writer.cpp | 269 ++++---------------------------
be/src/olap/delta_writer.h | 37 +----
be/src/olap/delta_writer_context.h | 45 ++++++
be/src/olap/memtable_writer.cpp | 2 +-
be/src/olap/memtable_writer.h | 10 +-
be/src/olap/rowset_builder.cpp | 313 +++++++++++++++++++++++++++++++++++++
be/src/olap/rowset_builder.h | 123 +++++++++++++++
7 files changed, 521 insertions(+), 278 deletions(-)
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 97e7d0a9f5..9c5b4b1464 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -37,17 +37,13 @@
#include "gutil/strings/numbers.h"
#include "io/fs/file_writer.h" // IWYU pragma: keep
#include "olap/memtable_flush_executor.h"
-#include "olap/olap_define.h"
#include "olap/rowset/beta_rowset.h"
#include "olap/rowset/beta_rowset_writer.h"
#include "olap/rowset/rowset_meta.h"
-#include "olap/rowset/rowset_writer.h"
-#include "olap/rowset/rowset_writer_context.h"
#include "olap/rowset/segment_v2/inverted_index_desc.h"
#include "olap/schema_change.h"
#include "olap/storage_engine.h"
#include "olap/tablet_manager.h"
-#include "olap/tablet_meta.h"
#include "olap/txn_manager.h"
#include "runtime/exec_env.h"
#include "service/backend_options.h"
@@ -55,7 +51,6 @@
#include "util/mem_info.h"
#include "util/ref_count_closure.h"
#include "util/stopwatch.hpp"
-#include "util/time.h"
#include "vec/core/block.h"
namespace doris {
@@ -70,27 +65,19 @@ Status DeltaWriter::open(WriteRequest* req, DeltaWriter**
writer, RuntimeProfile
DeltaWriter::DeltaWriter(WriteRequest* req, StorageEngine* storage_engine,
RuntimeProfile* profile,
const UniqueId& load_id)
: _req(*req),
- _tablet(nullptr),
- _cur_rowset(nullptr),
- _rowset_writer(nullptr),
+ _rowset_builder(*req, storage_engine, profile),
_memtable_writer(*req, profile),
- _tablet_schema(new TabletSchema),
- _delta_written_success(false),
- _storage_engine(storage_engine),
- _load_id(load_id) {
+ _storage_engine(storage_engine) {
_init_profile(profile);
}
void DeltaWriter::_init_profile(RuntimeProfile* profile) {
_profile = profile->create_child(fmt::format("DeltaWriter {}",
_req.tablet_id), true, true);
- _close_wait_timer = ADD_TIMER(_profile, "DeltaWriterCloseWaitTime");
+ _close_wait_timer = ADD_TIMER(_profile, "CloseWaitTime");
+ _commit_txn_timer = ADD_TIMER(_profile, "CommitTxnTime");
}
DeltaWriter::~DeltaWriter() {
- if (_is_init && !_delta_written_success) {
- _garbage_collection();
- }
-
if (!_is_init) {
return;
}
@@ -98,110 +85,17 @@ DeltaWriter::~DeltaWriter() {
// cancel and wait all memtables in flush queue to be finished
_memtable_writer.cancel();
- if (_tablet != nullptr) {
+ if (_rowset_builder.tablet() != nullptr) {
const FlushStatistic& stat = _memtable_writer.get_flush_token_stats();
- _tablet->flush_bytes->increment(stat.flush_size_bytes);
- _tablet->flush_finish_count->increment(stat.flush_finish_count);
- }
-
- if (_calc_delete_bitmap_token != nullptr) {
- _calc_delete_bitmap_token->cancel();
- }
-
- if (_tablet != nullptr) {
- _tablet->data_dir()->remove_pending_ids(ROWSET_ID_PREFIX +
-
_rowset_writer->rowset_id().to_string());
- }
-}
-
-void DeltaWriter::_garbage_collection() {
- Status rollback_status = Status::OK();
- TxnManager* txn_mgr = _storage_engine->txn_manager();
- if (_tablet != nullptr) {
- rollback_status = txn_mgr->rollback_txn(_req.partition_id, _tablet,
_req.txn_id);
- }
- // has to check rollback status, because the rowset maybe committed in
this thread and
- // published in another thread, then rollback will failed.
- // when rollback failed should not delete rowset
- if (rollback_status.ok()) {
- _storage_engine->add_unused_rowset(_cur_rowset);
+
_rowset_builder.tablet()->flush_bytes->increment(stat.flush_size_bytes);
+
_rowset_builder.tablet()->flush_finish_count->increment(stat.flush_finish_count);
}
}
Status DeltaWriter::init() {
- TabletManager* tablet_mgr = _storage_engine->tablet_manager();
- _tablet = tablet_mgr->get_tablet(_req.tablet_id);
- if (_tablet == nullptr) {
- return Status::Error<TABLE_NOT_FOUND>("fail to find tablet.
tablet_id={}, schema_hash={}",
- _req.tablet_id,
_req.schema_hash);
- }
-
- // get rowset ids snapshot
- if (_tablet->enable_unique_key_merge_on_write()) {
- std::lock_guard<std::shared_mutex> lck(_tablet->get_header_lock());
- _cur_max_version = _tablet->max_version_unlocked().second;
- // tablet is under alter process. The delete bitmap will be calculated
after conversion.
- if (_tablet->tablet_state() == TABLET_NOTREADY &&
- SchemaChangeHandler::tablet_in_converting(_tablet->tablet_id())) {
- // Disable 'partial_update' when the tablet is undergoing a
'schema changing process'
- if (_req.table_schema_param->is_partial_update()) {
- return Status::InternalError(
- "Unable to do 'partial_update' when "
- "the tablet is undergoing a 'schema changing
process'");
- }
- _rowset_ids.clear();
- } else {
- _rowset_ids = _tablet->all_rs_id(_cur_max_version);
- }
- }
-
- // check tablet version number
- if (!config::disable_auto_compaction &&
- _tablet->exceed_version_limit(config::max_tablet_version_num - 100) &&
- !MemInfo::is_exceed_soft_mem_limit(GB_EXCHANGE_BYTE)) {
- //trigger compaction
- StorageEngine::instance()->submit_compaction_task(
- _tablet, CompactionType::CUMULATIVE_COMPACTION, true);
- if (_tablet->version_count() > config::max_tablet_version_num) {
- return Status::Error<TOO_MANY_VERSION>(
- "failed to init delta writer. version count: {}, exceed
limit: {}, tablet: {}",
- _tablet->version_count(), config::max_tablet_version_num,
_tablet->full_name());
- }
- }
-
- {
- std::shared_lock base_migration_rlock(_tablet->get_migration_lock(),
std::try_to_lock);
- if (!base_migration_rlock.owns_lock()) {
- return Status::Error<TRY_LOCK_FAILED>("get lock failed");
- }
- std::lock_guard<std::mutex> push_lock(_tablet->get_push_lock());
-
RETURN_IF_ERROR(_storage_engine->txn_manager()->prepare_txn(_req.partition_id,
_tablet,
-
_req.txn_id, _req.load_id));
- }
- if (_tablet->enable_unique_key_merge_on_write() && _delete_bitmap ==
nullptr) {
- _delete_bitmap.reset(new DeleteBitmap(_tablet->tablet_id()));
- }
- // build tablet schema in request level
- _build_current_tablet_schema(_req.index_id, _req.table_schema_param,
*_tablet->tablet_schema());
- RowsetWriterContext context;
- context.txn_id = _req.txn_id;
- context.load_id = _req.load_id;
- context.rowset_state = PREPARED;
- context.segments_overlap = OVERLAPPING;
- context.tablet_schema = _tablet_schema;
- context.newest_write_timestamp = UnixSeconds();
- context.tablet_id = _tablet->tablet_id();
- context.tablet = _tablet;
- context.write_type = DataWriteType::TYPE_DIRECT;
- context.mow_context = std::make_shared<MowContext>(_cur_max_version,
_req.txn_id, _rowset_ids,
- _delete_bitmap);
- std::unique_ptr<RowsetWriter> rowset_writer;
- RETURN_IF_ERROR(_tablet->create_rowset_writer(context, &rowset_writer));
- _rowset_writer = std::move(rowset_writer);
- _memtable_writer.init(_rowset_writer, _tablet_schema,
- _tablet->enable_unique_key_merge_on_write());
- _calc_delete_bitmap_token =
_storage_engine->calc_delete_bitmap_executor()->create_token();
-
+ _rowset_builder.init();
+ _memtable_writer.init(_rowset_builder.rowset_writer(),
_rowset_builder.tablet_schema(),
+
_rowset_builder.tablet()->enable_unique_key_merge_on_write());
_is_init = true;
return Status::OK();
}
@@ -247,90 +141,24 @@ Status DeltaWriter::build_rowset() {
DCHECK(_is_init)
<< "delta writer is supposed be to initialized before
build_rowset() being called";
+ SCOPED_TIMER(_close_wait_timer);
RETURN_IF_ERROR(_memtable_writer.close_wait());
-
- // use rowset meta manager to save meta
- _cur_rowset = _rowset_writer->build();
- if (_cur_rowset == nullptr) {
- return Status::Error<MEM_ALLOC_FAILED>("fail to build rowset");
- }
- return Status::OK();
+ return _rowset_builder.build_rowset();
}
Status DeltaWriter::submit_calc_delete_bitmap_task() {
- if (!_tablet->enable_unique_key_merge_on_write()) {
- return Status::OK();
- }
-
- std::lock_guard<std::mutex> l(_lock);
- // tablet is under alter process. The delete bitmap will be calculated
after conversion.
- if (_tablet->tablet_state() == TABLET_NOTREADY &&
- SchemaChangeHandler::tablet_in_converting(_tablet->tablet_id())) {
- LOG(INFO) << "tablet is under alter process, delete bitmap will be
calculated later, "
- "tablet_id: "
- << _tablet->tablet_id() << " txn_id: " << _req.txn_id;
- return Status::OK();
- }
- auto beta_rowset = reinterpret_cast<BetaRowset*>(_cur_rowset.get());
- std::vector<segment_v2::SegmentSharedPtr> segments;
- RETURN_IF_ERROR(beta_rowset->load_segments(&segments));
- // tablet is under alter process. The delete bitmap will be calculated
after conversion.
- if (_tablet->tablet_state() == TABLET_NOTREADY &&
- SchemaChangeHandler::tablet_in_converting(_tablet->tablet_id())) {
- return Status::OK();
- }
- if (segments.size() > 1) {
- // calculate delete bitmap between segments
-
RETURN_IF_ERROR(_tablet->calc_delete_bitmap_between_segments(_cur_rowset,
segments,
-
_delete_bitmap));
- }
-
- // For partial update, we need to fill in the entire row of data, during
the calculation
- // of the delete bitmap. This operation is resource-intensive, and we need
to minimize
- // the number of times it occurs. Therefore, we skip this operation here.
- if (_cur_rowset->tablet_schema()->is_partial_update()) {
- return Status::OK();
- }
-
- LOG(INFO) << "submit calc delete bitmap task to executor, tablet_id: " <<
_tablet->tablet_id()
- << ", txn_id: " << _req.txn_id;
- return _tablet->commit_phase_update_delete_bitmap(_cur_rowset,
_rowset_ids, _delete_bitmap,
- segments, _req.txn_id,
-
_calc_delete_bitmap_token.get(), nullptr);
+ return _rowset_builder.submit_calc_delete_bitmap_task();
}
Status DeltaWriter::wait_calc_delete_bitmap() {
- if (!_tablet->enable_unique_key_merge_on_write() ||
- _cur_rowset->tablet_schema()->is_partial_update()) {
- return Status::OK();
- }
- std::lock_guard<std::mutex> l(_lock);
- RETURN_IF_ERROR(_calc_delete_bitmap_token->wait());
-
RETURN_IF_ERROR(_calc_delete_bitmap_token->get_delete_bitmap(_delete_bitmap));
- LOG(INFO) << "Got result of calc delete bitmap task from executor,
tablet_id: "
- << _tablet->tablet_id() << ", txn_id: " << _req.txn_id;
- return Status::OK();
+ return _rowset_builder.wait_calc_delete_bitmap();
}
Status DeltaWriter::commit_txn(const PSlaveTabletNodes& slave_tablet_nodes,
const bool write_single_replica) {
std::lock_guard<std::mutex> l(_lock);
- SCOPED_TIMER(_close_wait_timer);
- Status res = _storage_engine->txn_manager()->commit_txn(_req.partition_id,
_tablet, _req.txn_id,
- _req.load_id,
_cur_rowset, false);
-
- if (!res && !res.is<PUSH_TRANSACTION_ALREADY_EXIST>()) {
- LOG(WARNING) << "Failed to commit txn: " << _req.txn_id
- << " for rowset: " << _cur_rowset->rowset_id();
- return res;
- }
- if (_tablet->enable_unique_key_merge_on_write()) {
- _storage_engine->txn_manager()->set_txn_related_delete_bitmap(
- _req.partition_id, _req.txn_id, _tablet->tablet_id(),
_tablet->schema_hash(),
- _tablet->tablet_uid(), true, _delete_bitmap, _rowset_ids);
- }
-
- _delta_written_success = true;
+ SCOPED_TIMER(_commit_txn_timer);
+ _rowset_builder.commit_txn();
if (write_single_replica) {
for (auto node_info : slave_tablet_nodes.slave_nodes()) {
@@ -344,7 +172,7 @@ bool DeltaWriter::check_slave_replicas_done(
google::protobuf::Map<int64_t, PSuccessSlaveTabletNodeIds>*
success_slave_tablet_node_ids) {
std::lock_guard<std::shared_mutex> lock(_slave_node_lock);
if (_unfinished_slave_node.empty()) {
- success_slave_tablet_node_ids->insert({_tablet->tablet_id(),
_success_slave_node_ids});
+ success_slave_tablet_node_ids->insert({_req.tablet_id,
_success_slave_node_ids});
return true;
}
return false;
@@ -353,7 +181,7 @@ bool DeltaWriter::check_slave_replicas_done(
void DeltaWriter::add_finished_slave_replicas(
google::protobuf::Map<int64_t, PSuccessSlaveTabletNodeIds>*
success_slave_tablet_node_ids) {
std::lock_guard<std::shared_mutex> lock(_slave_node_lock);
- success_slave_tablet_node_ids->insert({_tablet->tablet_id(),
_success_slave_node_ids});
+ success_slave_tablet_node_ids->insert({_req.tablet_id,
_success_slave_node_ids});
}
Status DeltaWriter::cancel() {
@@ -366,12 +194,6 @@ Status DeltaWriter::cancel_with_status(const Status& st) {
return Status::OK();
}
RETURN_IF_ERROR(_memtable_writer.cancel_with_status(st));
- if (_rowset_writer && _rowset_writer->is_doing_segcompaction()) {
- _rowset_writer->wait_flying_segcompaction(); /* already cancel, ignore
the return status */
- }
- if (_calc_delete_bitmap_token != nullptr) {
- _calc_delete_bitmap_token->cancel();
- }
_is_cancelled = true;
return Status::OK();
}
@@ -380,35 +202,6 @@ int64_t DeltaWriter::mem_consumption(MemType mem) {
return _memtable_writer.mem_consumption(mem);
}
-void DeltaWriter::_build_current_tablet_schema(int64_t index_id,
- const OlapTableSchemaParam*
table_schema_param,
- const TabletSchema&
ori_tablet_schema) {
- _tablet_schema->copy_from(ori_tablet_schema);
- // find the right index id
- int i = 0;
- auto indexes = table_schema_param->indexes();
- for (; i < indexes.size(); i++) {
- if (indexes[i]->index_id == index_id) {
- break;
- }
- }
-
- if (indexes.size() > 0 && indexes[i]->columns.size() != 0 &&
- indexes[i]->columns[0]->unique_id() >= 0) {
- _tablet_schema->build_current_tablet_schema(index_id,
table_schema_param->version(),
- indexes[i],
ori_tablet_schema);
- }
- if (_tablet_schema->schema_version() > ori_tablet_schema.schema_version())
{
- _tablet->update_max_version_schema(_tablet_schema);
- }
-
- _tablet_schema->set_table_id(table_schema_param->table_id());
- // set partial update columns info
-
_tablet_schema->set_partial_update_info(table_schema_param->is_partial_update(),
-
table_schema_param->partial_update_input_columns());
- _tablet_schema->set_is_strict_mode(table_schema_param->is_strict_mode());
-}
-
void DeltaWriter::_request_slave_tablet_pull_rowset(PNodeInfo node_info) {
std::shared_ptr<PBackendService_Stub> stub =
ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(
@@ -417,19 +210,19 @@ void
DeltaWriter::_request_slave_tablet_pull_rowset(PNodeInfo node_info) {
LOG(WARNING) << "failed to send pull rowset request to slave replica.
get rpc stub failed, "
"slave host="
<< node_info.host() << ", port=" <<
node_info.async_internal_port()
- << ", tablet_id=" << _tablet->tablet_id() << ", txn_id="
<< _req.txn_id;
+ << ", tablet_id=" << _req.tablet_id << ", txn_id=" <<
_req.txn_id;
return;
}
- _storage_engine->txn_manager()->add_txn_tablet_delta_writer(_req.txn_id,
_tablet->tablet_id(),
- this);
+ _storage_engine->txn_manager()->add_txn_tablet_delta_writer(_req.txn_id,
_req.tablet_id, this);
{
std::lock_guard<std::shared_mutex> lock(_slave_node_lock);
_unfinished_slave_node.insert(node_info.id());
}
std::vector<int64_t> indices_ids;
- auto tablet_schema = _cur_rowset->rowset_meta()->tablet_schema();
+ auto cur_rowset = _rowset_builder.rowset();
+ auto tablet_schema = cur_rowset->rowset_meta()->tablet_schema();
if (!tablet_schema->skip_write_index_on_load()) {
for (auto& column : tablet_schema->columns()) {
const TabletIndex* index_meta =
tablet_schema->get_inverted_index(column.unique_id());
@@ -440,19 +233,18 @@ void
DeltaWriter::_request_slave_tablet_pull_rowset(PNodeInfo node_info) {
}
PTabletWriteSlaveRequest request;
- RowsetMetaPB rowset_meta_pb = _cur_rowset->rowset_meta()->get_rowset_pb();
+ RowsetMetaPB rowset_meta_pb = cur_rowset->rowset_meta()->get_rowset_pb();
request.set_allocated_rowset_meta(&rowset_meta_pb);
request.set_host(BackendOptions::get_localhost());
request.set_http_port(config::webserver_port);
- string tablet_path = _tablet->tablet_path();
+ string tablet_path = _rowset_builder.tablet()->tablet_path();
request.set_rowset_path(tablet_path);
request.set_token(ExecEnv::GetInstance()->token());
request.set_brpc_port(config::brpc_port);
request.set_node_id(node_info.id());
- for (int segment_id = 0; segment_id <
_cur_rowset->rowset_meta()->num_segments();
- segment_id++) {
+ for (int segment_id = 0; segment_id <
cur_rowset->rowset_meta()->num_segments(); segment_id++) {
std::stringstream segment_name;
- segment_name << _cur_rowset->rowset_id() << "_" << segment_id <<
".dat";
+ segment_name << cur_rowset->rowset_id() << "_" << segment_id << ".dat";
int64_t segment_size = std::filesystem::file_size(tablet_path + "/" +
segment_name.str());
request.mutable_segments_size()->insert({segment_id, segment_size});
@@ -491,8 +283,8 @@ void
DeltaWriter::_request_slave_tablet_pull_rowset(PNodeInfo node_info) {
LOG(WARNING) << "failed to send pull rowset request to slave replica,
error="
<< berror(closure->cntl.ErrorCode())
<< ", error_text=" << closure->cntl.ErrorText()
- << ". slave host: " << node_info.host()
- << ", tablet_id=" << _tablet->tablet_id() << ", txn_id="
<< _req.txn_id;
+ << ". slave host: " << node_info.host() << ", tablet_id="
<< _req.tablet_id
+ << ", txn_id=" << _req.txn_id;
std::lock_guard<std::shared_mutex> lock(_slave_node_lock);
_unfinished_slave_node.erase(node_info.id());
}
@@ -508,13 +300,14 @@ void DeltaWriter::finish_slave_tablet_pull_rowset(int64_t
node_id, bool is_succe
if (is_succeed) {
_success_slave_node_ids.add_slave_node_ids(node_id);
VLOG_CRITICAL << "record successful slave replica for txn [" <<
_req.txn_id
- << "], tablet_id=" << _tablet->tablet_id() << ",
node_id=" << node_id;
+ << "], tablet_id=" << _req.tablet_id << ", node_id=" <<
node_id;
}
_unfinished_slave_node.erase(node_id);
}
int64_t DeltaWriter::num_rows_filtered() const {
- return _rowset_writer == nullptr ? 0 : _rowset_writer->num_rows_filtered();
+ auto rowset_writer = _rowset_builder.rowset_writer();
+ return rowset_writer == nullptr ? 0 : rowset_writer->num_rows_filtered();
}
} // namespace doris
diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h
index c0d98b3b28..4c9f0fc35a 100644
--- a/be/src/olap/delta_writer.h
+++ b/be/src/olap/delta_writer.h
@@ -30,9 +30,11 @@
#include <vector>
#include "common/status.h"
+#include "olap/delta_writer_context.h"
#include "olap/memtable_writer.h"
#include "olap/olap_common.h"
#include "olap/rowset/rowset.h"
+#include "olap/rowset_builder.h"
#include "olap/tablet.h"
#include "olap/tablet_meta.h"
#include "olap/tablet_schema.h"
@@ -54,14 +56,6 @@ namespace vectorized {
class Block;
} // namespace vectorized
-struct WriteRequest : MemTableWriter::WriteRequest {
- int32_t schema_hash;
- int64_t txn_id;
- int64_t partition_id;
- int64_t index_id = 0;
- OlapTableSchemaParam* table_schema_param;
-};
-
// Writer for a particular (load, index, tablet).
// This class is NOT thread-safe, external synchronization is required.
class DeltaWriter {
@@ -93,6 +87,8 @@ public:
void add_finished_slave_replicas(google::protobuf::Map<int64_t,
PSuccessSlaveTabletNodeIds>*
success_slave_tablet_node_ids);
+ void finish_slave_tablet_pull_rowset(int64_t node_id, bool is_succeed);
+
// abandon current memtable and wait for all pending-flushing memtables to
be destructed.
// mem_consumption() should be 0 after this function returns.
Status cancel();
@@ -109,14 +105,12 @@ public:
int64_t txn_id() const { return _req.txn_id; }
- void finish_slave_tablet_pull_rowset(int64_t node_id, bool is_succeed);
-
int64_t total_received_rows() const { return _total_received_rows; }
int64_t num_rows_filtered() const;
// For UT
- DeleteBitmapPtr get_delete_bitmap() { return _delete_bitmap; }
+ DeleteBitmapPtr get_delete_bitmap() { return
_rowset_builder.get_delete_bitmap(); }
MemTableWriter* memtable_writer() { return &_memtable_writer; }
@@ -124,12 +118,6 @@ private:
DeltaWriter(WriteRequest* req, StorageEngine* storage_engine,
RuntimeProfile* profile,
const UniqueId& load_id);
- void _garbage_collection();
-
- void _build_current_tablet_schema(int64_t index_id,
- const OlapTableSchemaParam*
table_schema_param,
- const TabletSchema& ori_tablet_schema);
-
void _request_slave_tablet_pull_rowset(PNodeInfo node_info);
void _init_profile(RuntimeProfile* profile);
@@ -137,15 +125,10 @@ private:
bool _is_init = false;
bool _is_cancelled = false;
WriteRequest _req;
- TabletSharedPtr _tablet;
- RowsetSharedPtr _cur_rowset;
- std::shared_ptr<RowsetWriter> _rowset_writer;
+ RowsetBuilder _rowset_builder;
MemTableWriter _memtable_writer;
- TabletSchemaSPtr _tablet_schema;
- bool _delta_written_success;
StorageEngine* _storage_engine;
- UniqueId _load_id;
std::mutex _lock;
@@ -153,18 +136,12 @@ private:
PSuccessSlaveTabletNodeIds _success_slave_node_ids;
std::shared_mutex _slave_node_lock;
- DeleteBitmapPtr _delete_bitmap = nullptr;
- std::unique_ptr<CalcDeleteBitmapToken> _calc_delete_bitmap_token;
- // current rowset_ids, used to do diff in publish_version
- RowsetIdUnorderedSet _rowset_ids;
- // current max version, used to calculate delete bitmap
- int64_t _cur_max_version;
-
// total rows num written by DeltaWriter
int64_t _total_received_rows = 0;
RuntimeProfile* _profile = nullptr;
RuntimeProfile::Counter* _close_wait_timer = nullptr;
+ RuntimeProfile::Counter* _commit_txn_timer = nullptr;
MonotonicStopWatch _lock_watch;
};
diff --git a/be/src/olap/delta_writer_context.h
b/be/src/olap/delta_writer_context.h
new file mode 100644
index 0000000000..c5c30b5ce7
--- /dev/null
+++ b/be/src/olap/delta_writer_context.h
@@ -0,0 +1,45 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <gen_cpp/Types_types.h>
+#include <gen_cpp/types.pb.h>
+
+#include <vector>
+
+namespace doris {
+
+class TupleDescriptor;
+class SlotDescriptor;
+class OlapTableSchemaParam;
+
+struct WriteRequest {
+ int64_t tablet_id;
+ int32_t schema_hash;
+ int64_t txn_id;
+ int64_t partition_id;
+ PUniqueId load_id;
+ TupleDescriptor* tuple_desc;
+ // slots are in order of tablet's schema
+ const std::vector<SlotDescriptor*>* slots;
+ bool is_high_priority = false;
+ OlapTableSchemaParam* table_schema_param;
+ int64_t index_id = 0;
+};
+
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/olap/memtable_writer.cpp b/be/src/olap/memtable_writer.cpp
index b40d03a8ce..8caddb6d37 100644
--- a/be/src/olap/memtable_writer.cpp
+++ b/be/src/olap/memtable_writer.cpp
@@ -63,7 +63,7 @@ void MemTableWriter::_init_profile(RuntimeProfile* profile) {
_wait_flush_timer = ADD_TIMER(_profile, "MemTableWaitFlushTime");
_put_into_output_timer = ADD_TIMER(_profile, "MemTablePutIntoOutputTime");
_delete_bitmap_timer = ADD_TIMER(_profile, "DeleteBitmapTime");
- _close_wait_timer = ADD_TIMER(_profile, "MemTableWriterCloseWaitTime");
+ _close_wait_timer = ADD_TIMER(_profile, "CloseWaitTime");
_sort_times = ADD_COUNTER(_profile, "MemTableSortTimes", TUnit::UNIT);
_agg_times = ADD_COUNTER(_profile, "MemTableAggTimes", TUnit::UNIT);
_segment_num = ADD_COUNTER(_profile, "SegmentNum", TUnit::UNIT);
diff --git a/be/src/olap/memtable_writer.h b/be/src/olap/memtable_writer.h
index ed09f61933..457534ce3a 100644
--- a/be/src/olap/memtable_writer.h
+++ b/be/src/olap/memtable_writer.h
@@ -30,6 +30,7 @@
#include <vector>
#include "common/status.h"
+#include "olap/delta_writer_context.h"
#include "olap/memtable.h"
#include "olap/olap_common.h"
#include "olap/rowset/rowset.h"
@@ -61,15 +62,6 @@ enum MemType { WRITE = 1, FLUSH = 2, ALL = 3 };
// This class is NOT thread-safe, external synchronization is required.
class MemTableWriter {
public:
- struct WriteRequest {
- int64_t tablet_id;
- PUniqueId load_id;
- TupleDescriptor* tuple_desc;
- // slots are in order of tablet's schema
- const std::vector<SlotDescriptor*>* slots;
- bool is_high_priority = false;
- };
-
MemTableWriter(const WriteRequest& req, RuntimeProfile* profile);
~MemTableWriter();
diff --git a/be/src/olap/rowset_builder.cpp b/be/src/olap/rowset_builder.cpp
new file mode 100644
index 0000000000..f92b1c96fb
--- /dev/null
+++ b/be/src/olap/rowset_builder.cpp
@@ -0,0 +1,313 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/rowset_builder.h"
+
+#include <brpc/controller.h>
+#include <fmt/format.h>
+
+#include <filesystem>
+#include <ostream>
+#include <string>
+#include <utility>
+
+// IWYU pragma: no_include <opentelemetry/common/threadlocal.h>
+#include "common/compiler_util.h" // IWYU pragma: keep
+#include "common/config.h"
+#include "common/status.h"
+#include "exec/tablet_info.h"
+#include "gutil/strings/numbers.h"
+#include "io/fs/file_writer.h" // IWYU pragma: keep
+#include "olap/calc_delete_bitmap_executor.h"
+#include "olap/olap_define.h"
+#include "olap/rowset/beta_rowset.h"
+#include "olap/rowset/beta_rowset_writer.h"
+#include "olap/rowset/rowset_meta.h"
+#include "olap/rowset/rowset_writer.h"
+#include "olap/rowset/rowset_writer_context.h"
+#include "olap/schema_change.h"
+#include "olap/storage_engine.h"
+#include "olap/tablet_manager.h"
+#include "olap/tablet_meta.h"
+#include "olap/txn_manager.h"
+#include "util/brpc_client_cache.h"
+#include "util/mem_info.h"
+#include "util/ref_count_closure.h"
+#include "util/stopwatch.hpp"
+#include "util/time.h"
+#include "vec/core/block.h"
+
+namespace doris {
+using namespace ErrorCode;
+
+RowsetBuilder::RowsetBuilder(const WriteRequest& req, StorageEngine*
storage_engine,
+ RuntimeProfile* profile)
+ : _req(req), _tablet_schema(new TabletSchema),
_storage_engine(storage_engine) {
+ _init_profile(profile);
+}
+
+void RowsetBuilder::_init_profile(RuntimeProfile* profile) {
+ _profile = profile->create_child(fmt::format("RowsetBuilder {}",
_req.tablet_id), true, true);
+ _build_rowset_timer = ADD_TIMER(_profile, "BuildRowsetTime");
+ _submit_delete_bitmap_timer = ADD_TIMER(_profile,
"DeleteBitmapSubmitTime");
+ _wait_delete_bitmap_timer = ADD_TIMER(_profile, "DeleteBitmapWaitTime");
+ _commit_txn_timer = ADD_TIMER(_profile, "CommitTxnTime");
+}
+
+RowsetBuilder::~RowsetBuilder() {
+ if (_is_init && !_is_committed) {
+ _garbage_collection();
+ }
+
+ if (!_is_init) {
+ return;
+ }
+
+ if (_calc_delete_bitmap_token != nullptr) {
+ _calc_delete_bitmap_token->cancel();
+ }
+
+ if (_tablet != nullptr) {
+ _tablet->data_dir()->remove_pending_ids(ROWSET_ID_PREFIX +
+
_rowset_writer->rowset_id().to_string());
+ }
+}
+
+void RowsetBuilder::_garbage_collection() {
+ Status rollback_status = Status::OK();
+ TxnManager* txn_mgr = _storage_engine->txn_manager();
+ if (_tablet != nullptr) {
+ rollback_status = txn_mgr->rollback_txn(_req.partition_id, _tablet,
_req.txn_id);
+ }
+ // has to check rollback status, because the rowset maybe committed in
this thread and
+ // published in another thread, then rollback will fail.
+ // when rollback failed should not delete rowset
+ if (rollback_status.ok()) {
+ _storage_engine->add_unused_rowset(_rowset);
+ }
+}
+
+Status RowsetBuilder::init() {
+ TabletManager* tablet_mgr = _storage_engine->tablet_manager();
+ _tablet = tablet_mgr->get_tablet(_req.tablet_id);
+ if (_tablet == nullptr) {
+ return Status::Error<TABLE_NOT_FOUND>("fail to find tablet.
tablet_id={}, schema_hash={}",
+ _req.tablet_id,
_req.schema_hash);
+ }
+
+ std::shared_ptr<MowContext> mow_context = nullptr;
+ // get rowset ids snapshot
+ if (_tablet->enable_unique_key_merge_on_write()) {
+ std::lock_guard<std::shared_mutex> lck(_tablet->get_header_lock());
+ int64_t cur_max_version = _tablet->max_version_unlocked().second;
+ // tablet is under alter process. The delete bitmap will be calculated
after conversion.
+ if (_tablet->tablet_state() == TABLET_NOTREADY &&
+ SchemaChangeHandler::tablet_in_converting(_tablet->tablet_id())) {
+ // Disable 'partial_update' when the tablet is undergoing a
'schema changing process'
+ if (_req.table_schema_param->is_partial_update()) {
+ return Status::InternalError(
+ "Unable to do 'partial_update' when "
+ "the tablet is undergoing a 'schema changing
process'");
+ }
+ _rowset_ids.clear();
+ } else {
+ _rowset_ids = _tablet->all_rs_id(cur_max_version);
+ }
+ _delete_bitmap = std::make_shared<DeleteBitmap>(_tablet->tablet_id());
+ mow_context = std::make_shared<MowContext>(cur_max_version,
_req.txn_id, _rowset_ids,
+ _delete_bitmap);
+ }
+
+ // check tablet version number
+ if (!config::disable_auto_compaction &&
+ _tablet->exceed_version_limit(config::max_tablet_version_num - 100) &&
+ !MemInfo::is_exceed_soft_mem_limit(GB_EXCHANGE_BYTE)) {
+ //trigger compaction
+ StorageEngine::instance()->submit_compaction_task(
+ _tablet, CompactionType::CUMULATIVE_COMPACTION, true);
+ if (_tablet->version_count() > config::max_tablet_version_num) {
+ return Status::Error<TOO_MANY_VERSION>(
+ "failed to init rowset builder. version count: {}, exceed
limit: {}, tablet: "
+ "{}",
+ _tablet->version_count(), config::max_tablet_version_num,
_tablet->full_name());
+ }
+ }
+
+ {
+ std::shared_lock base_migration_lock(_tablet->get_migration_lock(),
std::try_to_lock);
+ if (!base_migration_lock.owns_lock()) {
+ return Status::Error<TRY_LOCK_FAILED>("get lock failed");
+ }
+ std::lock_guard<std::mutex> push_lock(_tablet->get_push_lock());
+
RETURN_IF_ERROR(_storage_engine->txn_manager()->prepare_txn(_req.partition_id,
_tablet,
+
_req.txn_id, _req.load_id));
+ }
+ // build tablet schema in request level
+ _build_current_tablet_schema(_req.index_id, _req.table_schema_param,
*_tablet->tablet_schema());
+ RowsetWriterContext context;
+ context.txn_id = _req.txn_id;
+ context.load_id = _req.load_id;
+ context.rowset_state = PREPARED;
+ context.segments_overlap = OVERLAPPING;
+ context.tablet_schema = _tablet_schema;
+ context.newest_write_timestamp = UnixSeconds();
+ context.tablet_id = _tablet->tablet_id();
+ context.tablet = _tablet;
+ context.write_type = DataWriteType::TYPE_DIRECT;
+ context.mow_context = mow_context;
+ std::unique_ptr<RowsetWriter> rowset_writer;
+ RETURN_IF_ERROR(_tablet->create_rowset_writer(context, &rowset_writer));
+ _rowset_writer = std::move(rowset_writer);
+ _calc_delete_bitmap_token =
_storage_engine->calc_delete_bitmap_executor()->create_token();
+
+ _is_init = true;
+ return Status::OK();
+}
+
+Status RowsetBuilder::build_rowset() {
+ std::lock_guard<std::mutex> l(_lock);
+ DCHECK(_is_init)
+ << "rowset builder is supposed be to initialized before
build_rowset() being called";
+
+ SCOPED_TIMER(_build_rowset_timer);
+ // use rowset meta manager to save meta
+ _rowset = _rowset_writer->build();
+ if (_rowset == nullptr) {
+ return Status::Error<MEM_ALLOC_FAILED>("fail to build rowset");
+ }
+ return Status::OK();
+}
+
+Status RowsetBuilder::submit_calc_delete_bitmap_task() {
+ if (!_tablet->enable_unique_key_merge_on_write()) {
+ return Status::OK();
+ }
+ std::lock_guard<std::mutex> l(_lock);
+ SCOPED_TIMER(_submit_delete_bitmap_timer);
+ // tablet is under alter process. The delete bitmap will be calculated
after conversion.
+ if (_tablet->tablet_state() == TABLET_NOTREADY &&
+ SchemaChangeHandler::tablet_in_converting(_tablet->tablet_id())) {
+ LOG(INFO) << "tablet is under alter process, delete bitmap will be
calculated later, "
+ "tablet_id: "
+ << _tablet->tablet_id() << " txn_id: " << _req.txn_id;
+ return Status::OK();
+ }
+ auto beta_rowset = reinterpret_cast<BetaRowset*>(_rowset.get());
+ std::vector<segment_v2::SegmentSharedPtr> segments;
+ RETURN_IF_ERROR(beta_rowset->load_segments(&segments));
+ // tablet is under alter process. The delete bitmap will be calculated
after conversion.
+ if (_tablet->tablet_state() == TABLET_NOTREADY &&
+ SchemaChangeHandler::tablet_in_converting(_tablet->tablet_id())) {
+ return Status::OK();
+ }
+ if (segments.size() > 1) {
+ // calculate delete bitmap between segments
+ RETURN_IF_ERROR(
+ _tablet->calc_delete_bitmap_between_segments(_rowset,
segments, _delete_bitmap));
+ }
+
+ // For partial update, we need to fill in the entire row of data, during
the calculation
+ // of the delete bitmap. This operation is resource-intensive, and we need
to minimize
+ // the number of times it occurs. Therefore, we skip this operation here.
+ if (_rowset->tablet_schema()->is_partial_update()) {
+ return Status::OK();
+ }
+
+ LOG(INFO) << "submit calc delete bitmap task to executor, tablet_id: " <<
_tablet->tablet_id()
+ << ", txn_id: " << _req.txn_id;
+ return _tablet->commit_phase_update_delete_bitmap(_rowset, _rowset_ids,
_delete_bitmap,
+ segments, _req.txn_id,
+
_calc_delete_bitmap_token.get(), nullptr);
+}
+
+Status RowsetBuilder::wait_calc_delete_bitmap() {
+ if (!_tablet->enable_unique_key_merge_on_write() ||
+ _rowset->tablet_schema()->is_partial_update()) {
+ return Status::OK();
+ }
+ std::lock_guard<std::mutex> l(_lock);
+ SCOPED_TIMER(_wait_delete_bitmap_timer);
+ RETURN_IF_ERROR(_calc_delete_bitmap_token->wait());
+
RETURN_IF_ERROR(_calc_delete_bitmap_token->get_delete_bitmap(_delete_bitmap));
+ LOG(INFO) << "Got result of calc delete bitmap task from executor,
tablet_id: "
+ << _tablet->tablet_id() << ", txn_id: " << _req.txn_id;
+ return Status::OK();
+}
+
+Status RowsetBuilder::commit_txn() {
+ std::lock_guard<std::mutex> l(_lock);
+ SCOPED_TIMER(_commit_txn_timer);
+ Status res = _storage_engine->txn_manager()->commit_txn(_req.partition_id,
_tablet, _req.txn_id,
+ _req.load_id,
_rowset, false);
+
+ if (!res && !res.is<PUSH_TRANSACTION_ALREADY_EXIST>()) {
+ LOG(WARNING) << "Failed to commit txn: " << _req.txn_id
+ << " for rowset: " << _rowset->rowset_id();
+ return res;
+ }
+ if (_tablet->enable_unique_key_merge_on_write()) {
+ _storage_engine->txn_manager()->set_txn_related_delete_bitmap(
+ _req.partition_id, _req.txn_id, _tablet->tablet_id(),
_tablet->schema_hash(),
+ _tablet->tablet_uid(), true, _delete_bitmap, _rowset_ids);
+ }
+
+ _is_committed = true;
+ return Status::OK();
+}
+
+Status RowsetBuilder::cancel() {
+ std::lock_guard<std::mutex> l(_lock);
+ if (_is_cancelled) {
+ return Status::OK();
+ }
+ if (_calc_delete_bitmap_token != nullptr) {
+ _calc_delete_bitmap_token->cancel();
+ }
+ _is_cancelled = true;
+ return Status::OK();
+}
+
+void RowsetBuilder::_build_current_tablet_schema(int64_t index_id,
+ const OlapTableSchemaParam*
table_schema_param,
+ const TabletSchema&
ori_tablet_schema) {
+ _tablet_schema->copy_from(ori_tablet_schema);
+ // find the right index id
+ int i = 0;
+ auto indexes = table_schema_param->indexes();
+ for (; i < indexes.size(); i++) {
+ if (indexes[i]->index_id == index_id) {
+ break;
+ }
+ }
+
+ if (indexes.size() > 0 && indexes[i]->columns.size() != 0 &&
+ indexes[i]->columns[0]->unique_id() >= 0) {
+ _tablet_schema->build_current_tablet_schema(index_id,
table_schema_param->version(),
+ indexes[i],
ori_tablet_schema);
+ }
+ if (_tablet_schema->schema_version() > ori_tablet_schema.schema_version())
{
+ _tablet->update_max_version_schema(_tablet_schema);
+ }
+
+ _tablet_schema->set_table_id(table_schema_param->table_id());
+ // set partial update columns info
+
_tablet_schema->set_partial_update_info(table_schema_param->is_partial_update(),
+
table_schema_param->partial_update_input_columns());
+ _tablet_schema->set_is_strict_mode(table_schema_param->is_strict_mode());
+}
+
+} // namespace doris
diff --git a/be/src/olap/rowset_builder.h b/be/src/olap/rowset_builder.h
new file mode 100644
index 0000000000..8bb94c2090
--- /dev/null
+++ b/be/src/olap/rowset_builder.h
@@ -0,0 +1,123 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <gen_cpp/Types_types.h>
+#include <gen_cpp/internal_service.pb.h>
+#include <gen_cpp/types.pb.h>
+#include <stdint.h>
+
+#include <atomic>
+#include <memory>
+#include <mutex>
+#include <shared_mutex>
+#include <unordered_set>
+#include <vector>
+
+#include "common/status.h"
+#include "olap/delta_writer_context.h"
+#include "olap/olap_common.h"
+#include "olap/rowset/rowset.h"
+#include "olap/tablet.h"
+#include "olap/tablet_meta.h"
+#include "olap/tablet_schema.h"
+#include "util/spinlock.h"
+#include "util/uid_util.h"
+
+namespace doris {
+
+class CalcDeleteBitmapToken;
+class FlushToken;
+class MemTable;
+class MemTracker;
+class StorageEngine;
+class TupleDescriptor;
+class SlotDescriptor;
+class OlapTableSchemaParam;
+class RowsetWriter;
+
+namespace vectorized {
+class Block;
+} // namespace vectorized
+
+// Writer for a particular (load, index, tablet).
+// This class is NOT thread-safe, external synchronization is required.
+class RowsetBuilder {
+public:
+ RowsetBuilder(const WriteRequest& req, StorageEngine* storage_engine,
RuntimeProfile* profile);
+
+ ~RowsetBuilder();
+
+ Status init();
+
+ Status build_rowset();
+
+ Status submit_calc_delete_bitmap_task();
+
+ Status wait_calc_delete_bitmap();
+
+ Status commit_txn();
+
+ Status cancel();
+
+ std::shared_ptr<RowsetWriter> rowset_writer() const { return
_rowset_writer; }
+
+ TabletSharedPtr tablet() const { return _tablet; }
+
+ RowsetSharedPtr rowset() const { return _rowset; }
+
+ TabletSchemaSPtr tablet_schema() const { return _tablet_schema; }
+
+ // For UT
+ DeleteBitmapPtr get_delete_bitmap() { return _delete_bitmap; }
+
+private:
+ void _garbage_collection();
+
+ void _build_current_tablet_schema(int64_t index_id,
+ const OlapTableSchemaParam*
table_schema_param,
+ const TabletSchema& ori_tablet_schema);
+
+ void _init_profile(RuntimeProfile* profile);
+
+ bool _is_init = false;
+ bool _is_cancelled = false;
+ bool _is_committed = false;
+ WriteRequest _req;
+ TabletSharedPtr _tablet;
+ RowsetSharedPtr _rowset;
+ std::shared_ptr<RowsetWriter> _rowset_writer;
+ TabletSchemaSPtr _tablet_schema;
+
+ StorageEngine* _storage_engine = nullptr;
+
+ std::mutex _lock;
+
+ DeleteBitmapPtr _delete_bitmap;
+ std::unique_ptr<CalcDeleteBitmapToken> _calc_delete_bitmap_token;
+ // current rowset_ids, used to do diff in publish_version
+ RowsetIdUnorderedSet _rowset_ids;
+
+ RuntimeProfile* _profile = nullptr;
+ RuntimeProfile::Counter* _build_rowset_timer = nullptr;
+ RuntimeProfile::Counter* _submit_delete_bitmap_timer = nullptr;
+ RuntimeProfile::Counter* _wait_delete_bitmap_timer = nullptr;
+ RuntimeProfile::Counter* _commit_txn_timer = nullptr;
+};
+
+} // namespace doris
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]