This is an automated email from the ASF dual-hosted git repository.
zhangchen pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 5e28fde7db [cherry-pick](merge-on-write) support concurrent delete
bitmap calc while close_wait (#21488) (#22267)
5e28fde7db is described below
commit 5e28fde7db94b1a3b708a08f96b455ccbfa11ae8
Author: zhannngchen <[email protected]>
AuthorDate: Thu Jul 27 10:00:51 2023 +0800
[cherry-pick](merge-on-write) support concurrent delete bitmap calc while
close_wait (#21488) (#22267)
cherry-pick #21488
---
be/src/olap/calc_delete_bitmap_executor.cpp | 91 ++++++
be/src/olap/calc_delete_bitmap_executor.h | 91 ++++++
be/src/olap/delta_writer.cpp | 107 ++++---
be/src/olap/delta_writer.h | 11 +-
be/src/olap/full_compaction.cpp | 3 +-
be/src/olap/memtable.cpp | 2 +-
be/src/olap/olap_server.cpp | 5 -
be/src/olap/storage_engine.cpp | 7 +-
be/src/olap/storage_engine.h | 9 +-
be/src/olap/tablet.cpp | 75 +++--
be/src/olap/tablet.h | 5 +-
be/src/runtime/tablets_channel.cpp | 62 +++-
be/src/runtime/tablets_channel.h | 8 +-
be/test/olap/delta_writer_test.cpp | 338 ++++++++++++++++++---
.../olap/engine_storage_migration_task_test.cpp | 4 +-
be/test/olap/tablet_cooldown_test.cpp | 4 +-
16 files changed, 664 insertions(+), 158 deletions(-)
diff --git a/be/src/olap/calc_delete_bitmap_executor.cpp
b/be/src/olap/calc_delete_bitmap_executor.cpp
new file mode 100644
index 0000000000..284c03c985
--- /dev/null
+++ b/be/src/olap/calc_delete_bitmap_executor.cpp
@@ -0,0 +1,91 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/calc_delete_bitmap_executor.h"
+
+#include <gen_cpp/olap_file.pb.h>
+
+#include <ostream>
+
+#include "common/config.h"
+#include "common/logging.h"
+#include "olap/memtable.h"
+#include "olap/tablet.h"
+#include "util/time.h"
+
+namespace doris {
+using namespace ErrorCode;
+
+Status CalcDeleteBitmapToken::submit(TabletSharedPtr tablet, RowsetSharedPtr
cur_rowset,
+ const segment_v2::SegmentSharedPtr&
cur_segment,
+ const std::vector<RowsetSharedPtr>&
target_rowsets,
+ int64_t end_version, RowsetWriter*
rowset_writer) {
+ {
+ std::shared_lock rlock(_lock);
+ RETURN_IF_ERROR(_status);
+ }
+
+ DeleteBitmapPtr bitmap =
std::make_shared<DeleteBitmap>(tablet->tablet_id());
+ {
+ std::lock_guard wlock(_lock);
+ _delete_bitmaps.push_back(bitmap);
+ }
+ return _thread_token->submit_func([=, this]() {
+ auto st = tablet->calc_segment_delete_bitmap(cur_rowset, cur_segment,
target_rowsets,
+ bitmap, end_version,
rowset_writer);
+ if (!st.ok()) {
+ LOG(WARNING) << "failed to calc segment delete bitmap, tablet_id: "
+ << tablet->tablet_id() << " rowset: " <<
cur_rowset->rowset_id()
+ << " seg_id: " << cur_segment->id() << " version: "
<< end_version;
+ std::lock_guard wlock(_lock);
+ if (_status.ok()) {
+ _status = st;
+ }
+ }
+ });
+}
+
+Status CalcDeleteBitmapToken::wait() {
+ _thread_token->wait();
+ // all tasks complete here, don't need lock;
+ return _status;
+}
+
+Status CalcDeleteBitmapToken::get_delete_bitmap(DeleteBitmapPtr res_bitmap) {
+ std::lock_guard wlock(_lock);
+ RETURN_IF_ERROR(_status);
+
+ for (auto bitmap : _delete_bitmaps) {
+ res_bitmap->merge(*bitmap);
+ }
+ _delete_bitmaps.clear();
+ return Status::OK();
+}
+
+void CalcDeleteBitmapExecutor::init() {
+ ThreadPoolBuilder("TabletCalcDeleteBitmapThreadPool")
+ .set_min_threads(1)
+ .set_max_threads(config::calc_delete_bitmap_max_thread)
+ .build(&_thread_pool);
+}
+
+std::unique_ptr<CalcDeleteBitmapToken>
CalcDeleteBitmapExecutor::create_token() {
+ return std::make_unique<CalcDeleteBitmapToken>(
+ _thread_pool->new_token(ThreadPool::ExecutionMode::CONCURRENT));
+}
+
+} // namespace doris
diff --git a/be/src/olap/calc_delete_bitmap_executor.h
b/be/src/olap/calc_delete_bitmap_executor.h
new file mode 100644
index 0000000000..d2c392a04d
--- /dev/null
+++ b/be/src/olap/calc_delete_bitmap_executor.h
@@ -0,0 +1,91 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <stdint.h>
+
+#include <atomic>
+#include <iosfwd>
+#include <memory>
+#include <utility>
+#include <vector>
+
+#include "common/status.h"
+#include "olap/rowset/rowset.h"
+#include "olap/rowset/rowset_writer.h"
+#include "olap/rowset/segment_v2/segment.h"
+#include "util/threadpool.h"
+
+namespace doris {
+
+class DataDir;
+class Tablet;
+enum RowsetTypePB : int;
+using TabletSharedPtr = std::shared_ptr<Tablet>;
+
+// A thin wrapper of ThreadPoolToken to submit calc delete bitmap task.
+// Usage:
+// 1. create a token
+// 2. submit delete bitmap calculate tasks
+// 3. wait all tasks complete
+// 4. call `get_delete_bitmap()` to get the result of all tasks
+class CalcDeleteBitmapToken {
+public:
+ explicit CalcDeleteBitmapToken(std::unique_ptr<ThreadPoolToken>
thread_token)
+ : _thread_token(std::move(thread_token)), _status(Status::OK()) {}
+
+ Status submit(TabletSharedPtr tablet, RowsetSharedPtr cur_rowset,
+ const segment_v2::SegmentSharedPtr& cur_segment,
+ const std::vector<RowsetSharedPtr>& target_rowsets, int64_t
end_version,
+ RowsetWriter* rowset_writer);
+
+ // wait all tasks in token to be completed.
+ Status wait();
+
+ void cancel() { _thread_token->shutdown(); }
+
+ Status get_delete_bitmap(DeleteBitmapPtr res_bitmap);
+
+private:
+ std::unique_ptr<ThreadPoolToken> _thread_token;
+
+ std::shared_mutex _lock;
+ std::vector<DeleteBitmapPtr> _delete_bitmaps;
+
+ // Records the current status of the calc delete bitmap job.
+ // Note: Once its value is set to Failed, it cannot return to SUCCESS.
+ Status _status;
+};
+
+// CalcDeleteBitmapExecutor is responsible for calc delete bitmap concurrently.
+// It encapsulate a ThreadPool to handle all tasks.
+class CalcDeleteBitmapExecutor {
+public:
+ CalcDeleteBitmapExecutor() {}
+ ~CalcDeleteBitmapExecutor() { _thread_pool->shutdown(); }
+
+ // init should be called after storage engine is opened,
+ void init();
+
+ std::unique_ptr<CalcDeleteBitmapToken> create_token();
+
+private:
+ std::unique_ptr<ThreadPool> _thread_pool;
+};
+
+} // namespace doris
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 197ecb417e..dd5b3c74f6 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -125,6 +125,10 @@ DeltaWriter::~DeltaWriter() {
}
}
+ if (_calc_delete_bitmap_token != nullptr) {
+ _calc_delete_bitmap_token->cancel();
+ }
+
if (_tablet != nullptr) {
_tablet->data_dir()->remove_pending_ids(ROWSET_ID_PREFIX +
_rowset_writer->rowset_id().to_string());
@@ -225,6 +229,7 @@ Status DeltaWriter::init() {
bool should_serial = false;
RETURN_IF_ERROR(_storage_engine->memtable_flush_executor()->create_flush_token(
&_flush_token, _rowset_writer->type(), should_serial,
_req.is_high_priority));
+ _calc_delete_bitmap_token =
_storage_engine->calc_delete_bitmap_executor()->create_token();
_is_init = true;
return Status::OK();
@@ -411,12 +416,10 @@ Status DeltaWriter::close() {
}
}
-Status DeltaWriter::close_wait(const PSlaveTabletNodes& slave_tablet_nodes,
- const bool write_single_replica) {
- SCOPED_TIMER(_close_wait_timer);
+Status DeltaWriter::build_rowset() {
std::lock_guard<std::mutex> l(_lock);
DCHECK(_is_init)
- << "delta writer is supposed be to initialized before close_wait()
being called";
+ << "delta writer is supposed be to initialized before
build_rowset() being called";
if (_is_cancelled) {
return _cancel_status;
@@ -447,43 +450,68 @@ Status DeltaWriter::close_wait(const PSlaveTabletNodes&
slave_tablet_nodes,
if (_cur_rowset == nullptr) {
return Status::Error<MEM_ALLOC_FAILED>("fail to build rowset");
}
+ return Status::OK();
+}
- if (_tablet->enable_unique_key_merge_on_write()) {
- // tablet is under alter process. The delete bitmap will be calculated
after conversion.
- if (_tablet->tablet_state() == TABLET_NOTREADY &&
- SchemaChangeHandler::tablet_in_converting(_tablet->tablet_id())) {
- LOG(INFO) << "tablet is under alter process, delete bitmap will be
calculated later, "
- "tablet_id: "
- << _tablet->tablet_id() << " txn_id: " << _req.txn_id;
- } else {
- auto beta_rowset =
reinterpret_cast<BetaRowset*>(_cur_rowset.get());
- std::vector<segment_v2::SegmentSharedPtr> segments;
- RETURN_IF_ERROR(beta_rowset->load_segments(&segments));
- if (segments.size() > 1) {
- // calculate delete bitmap between segments
-
RETURN_IF_ERROR(_tablet->calc_delete_bitmap_between_segments(_cur_rowset,
segments,
-
_delete_bitmap));
- }
+Status DeltaWriter::submit_calc_delete_bitmap_task() {
+ if (!_tablet->enable_unique_key_merge_on_write()) {
+ return Status::OK();
+ }
- // commit_phase_update_delete_bitmap() may generate new segments,
we need to create a new
- // transient rowset writer to write the new segments, then merge
it back the original
- // rowset.
- std::unique_ptr<RowsetWriter> rowset_writer;
- _tablet->create_transient_rowset_writer(_cur_rowset,
&rowset_writer);
- RETURN_IF_ERROR(_tablet->commit_phase_update_delete_bitmap(
- _cur_rowset, _rowset_ids, _delete_bitmap, segments,
_req.txn_id,
- rowset_writer.get()));
- if (_cur_rowset->tablet_schema()->is_partial_update()) {
- // build rowset writer and merge transient rowset
- RETURN_IF_ERROR(rowset_writer->flush());
- RowsetSharedPtr transient_rowset = rowset_writer->build();
-
_cur_rowset->merge_rowset_meta(transient_rowset->rowset_meta());
-
- // erase segment cache cause we will add a segment to rowset
-
SegmentLoader::instance()->erase_segment(_cur_rowset->rowset_id());
- }
- }
+ std::lock_guard<std::mutex> l(_lock);
+ // tablet is under alter process. The delete bitmap will be calculated
after conversion.
+ if (_tablet->tablet_state() == TABLET_NOTREADY &&
+ SchemaChangeHandler::tablet_in_converting(_tablet->tablet_id())) {
+ LOG(INFO) << "tablet is under alter process, delete bitmap will be
calculated later, "
+ "tablet_id: "
+ << _tablet->tablet_id() << " txn_id: " << _req.txn_id;
+ return Status::OK();
}
+ auto beta_rowset = reinterpret_cast<BetaRowset*>(_cur_rowset.get());
+ std::vector<segment_v2::SegmentSharedPtr> segments;
+ RETURN_IF_ERROR(beta_rowset->load_segments(&segments));
+ // tablet is under alter process. The delete bitmap will be calculated
after conversion.
+ if (_tablet->tablet_state() == TABLET_NOTREADY &&
+ SchemaChangeHandler::tablet_in_converting(_tablet->tablet_id())) {
+ return Status::OK();
+ }
+ if (segments.size() > 1) {
+ // calculate delete bitmap between segments
+
RETURN_IF_ERROR(_tablet->calc_delete_bitmap_between_segments(_cur_rowset,
segments,
+
_delete_bitmap));
+ }
+
+ // For partial update, we need to fill in the entire row of data, during
the calculation
+ // of the delete bitmap. This operation is resource-intensive, and we need
to minimize
+ // the number of times it occurs. Therefore, we skip this operation here.
+ if (_cur_rowset->tablet_schema()->is_partial_update()) {
+ return Status::OK();
+ }
+
+ LOG(INFO) << "submit calc delete bitmap task to executor, tablet_id: " <<
_tablet->tablet_id()
+ << ", txn_id: " << _req.txn_id;
+ return _tablet->commit_phase_update_delete_bitmap(_cur_rowset,
_rowset_ids, _delete_bitmap,
+ segments, _req.txn_id,
+
_calc_delete_bitmap_token.get(), nullptr);
+}
+
+Status DeltaWriter::wait_calc_delete_bitmap() {
+ if (!_tablet->enable_unique_key_merge_on_write() ||
+ _cur_rowset->tablet_schema()->is_partial_update()) {
+ return Status::OK();
+ }
+ std::lock_guard<std::mutex> l(_lock);
+ RETURN_IF_ERROR(_calc_delete_bitmap_token->wait());
+
RETURN_IF_ERROR(_calc_delete_bitmap_token->get_delete_bitmap(_delete_bitmap));
+ LOG(INFO) << "Got result of calc delete bitmap task from executor,
tablet_id: "
+ << _tablet->tablet_id() << ", txn_id: " << _req.txn_id;
+ return Status::OK();
+}
+
+Status DeltaWriter::commit_txn(const PSlaveTabletNodes& slave_tablet_nodes,
+ const bool write_single_replica) {
+ std::lock_guard<std::mutex> l(_lock);
+ SCOPED_TIMER(_close_wait_timer);
Status res = _storage_engine->txn_manager()->commit_txn(_req.partition_id,
_tablet, _req.txn_id,
_req.load_id,
_cur_rowset, false);
@@ -550,6 +578,9 @@ Status DeltaWriter::cancel_with_status(const Status& st) {
// cancel and wait all memtables in flush queue to be finished
_flush_token->cancel();
}
+ if (_calc_delete_bitmap_token != nullptr) {
+ _calc_delete_bitmap_token->cancel();
+ }
_is_cancelled = true;
_cancel_status = st;
return Status::OK();
diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h
index daf091bf8e..aeef1c6866 100644
--- a/be/src/olap/delta_writer.h
+++ b/be/src/olap/delta_writer.h
@@ -89,11 +89,14 @@ public:
Status append(const vectorized::Block* block);
- // flush the last memtable to flush queue, must call it before close_wait()
+ // flush the last memtable to flush queue, must call it before
build_rowset()
Status close();
// wait for all memtables to be flushed.
// mem_consumption() should be 0 after this function returns.
- Status close_wait(const PSlaveTabletNodes& slave_tablet_nodes, const bool
write_single_replica);
+ Status build_rowset();
+ Status submit_calc_delete_bitmap_task();
+ Status wait_calc_delete_bitmap();
+ Status commit_txn(const PSlaveTabletNodes& slave_tablet_nodes, const bool
write_single_replica);
bool check_slave_replicas_done(google::protobuf::Map<int64_t,
PSuccessSlaveTabletNodeIds>*
success_slave_tablet_node_ids);
@@ -131,6 +134,9 @@ public:
int64_t num_rows_filtered() const;
+ // For UT
+ DeleteBitmapPtr get_delete_bitmap() { return _delete_bitmap; }
+
private:
DeltaWriter(WriteRequest* req, StorageEngine* storage_engine,
RuntimeProfile* profile,
const UniqueId& load_id);
@@ -183,6 +189,7 @@ private:
std::shared_mutex _slave_node_lock;
DeleteBitmapPtr _delete_bitmap = nullptr;
+ std::unique_ptr<CalcDeleteBitmapToken> _calc_delete_bitmap_token;
// current rowset_ids, used to do diff in publish_version
RowsetIdUnorderedSet _rowset_ids;
// current max version, used to calculate delete bitmap
diff --git a/be/src/olap/full_compaction.cpp b/be/src/olap/full_compaction.cpp
index e2acd1cdb0..cdec9c09ed 100644
--- a/be/src/olap/full_compaction.cpp
+++ b/be/src/olap/full_compaction.cpp
@@ -193,7 +193,8 @@ Status
FullCompaction::_full_compaction_calc_delete_bitmap(const RowsetSharedPtr
OlapStopWatch watch;
RETURN_IF_ERROR(_tablet->calc_delete_bitmap(published_rowset, segments,
specified_rowsets,
- delete_bitmap, cur_version,
rowset_writer));
+ delete_bitmap, cur_version,
nullptr,
+ rowset_writer));
size_t total_rows = std::accumulate(
segments.begin(), segments.end(), 0,
[](size_t sum, const segment_v2::SegmentSharedPtr& s) { return sum
+= s->num_rows(); });
diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index 9716f265d3..78860a1f65 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -480,7 +480,7 @@ Status MemTable::_generate_delete_bitmap(int32_t
segment_id) {
OlapStopWatch watch;
RETURN_IF_ERROR(_tablet->calc_delete_bitmap(rowset, segments,
specified_rowsets,
_mow_context->delete_bitmap,
- _mow_context->max_version));
+ _mow_context->max_version,
nullptr));
size_t total_rows = std::accumulate(
segments.begin(), segments.end(), 0,
[](size_t sum, const segment_v2::SegmentSharedPtr& s) { return sum
+= s->num_rows(); });
diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index c1af281465..67c41c6f7e 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -243,11 +243,6 @@ Status StorageEngine::start_bg_threads() {
.set_max_threads(config::tablet_publish_txn_max_thread)
.build(&_tablet_publish_txn_thread_pool);
- ThreadPoolBuilder("TabletCalcDeleteBitmapThreadPool")
- .set_min_threads(1)
- .set_max_threads(config::calc_delete_bitmap_max_thread)
- .build(&_calc_delete_bitmap_thread_pool);
-
RETURN_IF_ERROR(Thread::create(
"StorageEngine", "aync_publish_version_thread",
[this]() { this->_async_publish_callback(); },
&_async_publish_thread));
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index 745637a296..340f02b65b 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -126,6 +126,7 @@ StorageEngine::StorageEngine(const EngineOptions& options)
_txn_manager(new TxnManager(config::txn_map_shard_size,
config::txn_shard_size)),
_rowset_id_generator(new
UniqueRowsetIdGenerator(options.backend_uid)),
_memtable_flush_executor(nullptr),
+ _calc_delete_bitmap_executor(nullptr),
_default_rowset_type(BETA_ROWSET),
_heartbeat_flags(nullptr),
_stream_load_recorder(nullptr) {
@@ -155,9 +156,6 @@ StorageEngine::~StorageEngine() {
if (_tablet_meta_checkpoint_thread_pool) {
_tablet_meta_checkpoint_thread_pool->shutdown();
}
- if (_calc_delete_bitmap_thread_pool) {
- _calc_delete_bitmap_thread_pool->shutdown();
- }
if (_cold_data_compaction_thread_pool) {
_cold_data_compaction_thread_pool->shutdown();
}
@@ -199,6 +197,9 @@ Status StorageEngine::_open() {
_memtable_flush_executor.reset(new MemTableFlushExecutor());
_memtable_flush_executor->init(dirs);
+ _calc_delete_bitmap_executor.reset(new CalcDeleteBitmapExecutor());
+ _calc_delete_bitmap_executor->init();
+
_parse_default_rowset_type();
return Status::OK();
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index a40e1792d9..7af2fde0fb 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -37,6 +37,7 @@
#include "common/status.h"
#include "gutil/ref_counted.h"
+#include "olap/calc_delete_bitmap_executor.h"
#include "olap/compaction_permit_limiter.h"
#include "olap/olap_common.h"
#include "olap/options.h"
@@ -143,6 +144,9 @@ public:
TabletManager* tablet_manager() { return _tablet_manager.get(); }
TxnManager* txn_manager() { return _txn_manager.get(); }
MemTableFlushExecutor* memtable_flush_executor() { return
_memtable_flush_executor.get(); }
+ CalcDeleteBitmapExecutor* calc_delete_bitmap_executor() {
+ return _calc_delete_bitmap_executor.get();
+ }
bool check_rowset_id_in_unused_rowsets(const RowsetId& rowset_id);
@@ -210,9 +214,6 @@ public:
std::unique_ptr<ThreadPool>& tablet_publish_txn_thread_pool() {
return _tablet_publish_txn_thread_pool;
}
- std::unique_ptr<ThreadPool>& calc_delete_bitmap_thread_pool() {
- return _calc_delete_bitmap_thread_pool;
- }
bool stopped() { return _stopped; }
ThreadPool* get_bg_multiget_threadpool() { return
_bg_multi_get_thread_pool.get(); }
@@ -423,6 +424,7 @@ private:
std::unique_ptr<RowsetIdGenerator> _rowset_id_generator;
std::unique_ptr<MemTableFlushExecutor> _memtable_flush_executor;
+ std::unique_ptr<CalcDeleteBitmapExecutor> _calc_delete_bitmap_executor;
// Used to control the migration from segment_v1 to segment_v2, can be
deleted in futrue.
// Type of new loaded data
@@ -437,7 +439,6 @@ private:
std::unique_ptr<ThreadPool> _cold_data_compaction_thread_pool;
std::unique_ptr<ThreadPool> _tablet_publish_txn_thread_pool;
- std::unique_ptr<ThreadPool> _calc_delete_bitmap_thread_pool;
std::unique_ptr<ThreadPool> _tablet_meta_checkpoint_thread_pool;
std::unique_ptr<ThreadPool> _bg_multi_get_thread_pool;
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 1cd2e0427f..19c8a8467e 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -2992,11 +2992,15 @@ Status
Tablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset,
return Status::OK();
}
+// if user pass a token, then all calculation works will submit to a
threadpool,
+// user can get all delete bitmaps from that token.
+// if `token` is nullptr, the calculation will run in local, and user can get
the result
+// delete bitmap from `delete_bitmap` directly.
Status Tablet::calc_delete_bitmap(RowsetSharedPtr rowset,
const
std::vector<segment_v2::SegmentSharedPtr>& segments,
const std::vector<RowsetSharedPtr>&
specified_rowsets,
DeleteBitmapPtr delete_bitmap, int64_t
end_version,
- RowsetWriter* rowset_writer) {
+ CalcDeleteBitmapToken* token, RowsetWriter*
rowset_writer) {
auto rowset_id = rowset->rowset_id();
if (specified_rowsets.empty() || segments.empty()) {
LOG(INFO) << "skip to construct delete bitmap tablet: " << tablet_id()
@@ -3005,37 +3009,31 @@ Status Tablet::calc_delete_bitmap(RowsetSharedPtr
rowset,
}
OlapStopWatch watch;
+ doris::TabletSharedPtr tablet_ptr =
+
StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id());
+ if (tablet_ptr == nullptr) {
+ return Status::InternalError("Can't find tablet id: {}, maybe already
dropped.",
+ tablet_id());
+ }
std::vector<DeleteBitmapPtr> seg_delete_bitmaps;
- std::unique_ptr<ThreadPoolToken> token =
-
StorageEngine::instance()->calc_delete_bitmap_thread_pool()->new_token(
- ThreadPool::ExecutionMode::CONCURRENT);
- std::atomic<int> calc_status {ErrorCode::OK};
- for (size_t i = 1; i < segments.size(); i++) {
+ for (size_t i = 0; i < segments.size(); i++) {
auto& seg = segments[i];
- DeleteBitmapPtr seg_delete_bitmap =
std::make_shared<DeleteBitmap>(tablet_id());
- seg_delete_bitmaps.push_back(seg_delete_bitmap);
- RETURN_IF_ERROR(token->submit_func([=, &calc_status, this]() {
- auto st = calc_segment_delete_bitmap(rowset, seg,
specified_rowsets, seg_delete_bitmap,
- end_version, rowset_writer);
- if (!st.ok()) {
- LOG(WARNING) << "failed to calc segment delete bitmap,
tablet_id: " << tablet_id()
- << " rowset: " << rowset_id << " seg_id: " <<
seg->id()
- << " version: " << end_version;
- calc_status.store(st.code());
- }
- }));
+ if (token != nullptr) {
+ RETURN_IF_ERROR(token->submit(tablet_ptr, rowset, seg,
specified_rowsets, end_version,
+ rowset_writer));
+ } else {
+ DeleteBitmapPtr seg_delete_bitmap =
std::make_shared<DeleteBitmap>(tablet_id());
+ seg_delete_bitmaps.push_back(seg_delete_bitmap);
+ RETURN_IF_ERROR(calc_segment_delete_bitmap(rowset, segments[i],
specified_rowsets,
+ seg_delete_bitmap,
end_version,
+ rowset_writer));
+ }
}
- // this thread calc delete bitmap of segment 0
- RETURN_IF_ERROR(calc_segment_delete_bitmap(rowset, segments[0],
specified_rowsets,
- delete_bitmap, end_version,
rowset_writer));
- token->wait();
- auto code = calc_status.load();
- if (code != ErrorCode::OK) {
- return Status::Error(code, "Tablet::calc_delete_bitmap meet error");
- }
- for (auto seg_delete_bitmap : seg_delete_bitmaps) {
- delete_bitmap->merge(*seg_delete_bitmap);
+ if (token == nullptr) {
+ for (auto seg_delete_bitmap : seg_delete_bitmaps) {
+ delete_bitmap->merge(*seg_delete_bitmap);
+ }
}
return Status::OK();
}
@@ -3212,8 +3210,11 @@ Status Tablet::update_delete_bitmap_without_lock(const
RowsetSharedPtr& rowset)
std::vector<RowsetSharedPtr> specified_rowsets =
get_rowset_by_ids(&cur_rowset_ids);
OlapStopWatch watch;
+ auto token =
StorageEngine::instance()->calc_delete_bitmap_executor()->create_token();
RETURN_IF_ERROR(calc_delete_bitmap(rowset, segments, specified_rowsets,
delete_bitmap,
- cur_version - 1));
+ cur_version - 1, token.get()));
+ token->wait();
+ token->get_delete_bitmap(delete_bitmap);
size_t total_rows = std::accumulate(
segments.begin(), segments.end(), 0,
[](size_t sum, const segment_v2::SegmentSharedPtr& s) { return sum
+= s->num_rows(); });
@@ -3233,7 +3234,7 @@ Status Tablet::update_delete_bitmap_without_lock(const
RowsetSharedPtr& rowset)
Status Tablet::commit_phase_update_delete_bitmap(
const RowsetSharedPtr& rowset, RowsetIdUnorderedSet& pre_rowset_ids,
DeleteBitmapPtr delete_bitmap, const
std::vector<segment_v2::SegmentSharedPtr>& segments,
- int64_t txn_id, RowsetWriter* rowset_writer) {
+ int64_t txn_id, CalcDeleteBitmapToken* token, RowsetWriter*
rowset_writer) {
SCOPED_BVAR_LATENCY(g_tablet_commit_phase_update_delete_bitmap_latency);
RowsetIdUnorderedSet cur_rowset_ids;
RowsetIdUnorderedSet rowset_ids_to_add;
@@ -3247,19 +3248,14 @@ Status Tablet::commit_phase_update_delete_bitmap(
cur_rowset_ids = all_rs_id(cur_version);
_rowset_ids_difference(cur_rowset_ids, pre_rowset_ids,
&rowset_ids_to_add,
&rowset_ids_to_del);
- if (!rowset_ids_to_add.empty() || !rowset_ids_to_del.empty()) {
- LOG(INFO) << "rowset_ids_to_add: " << rowset_ids_to_add.size()
- << ", rowset_ids_to_del: " << rowset_ids_to_del.size();
- }
specified_rowsets = get_rowset_by_ids(&rowset_ids_to_add);
}
for (const auto& to_del : rowset_ids_to_del) {
delete_bitmap->remove({to_del, 0, 0}, {to_del, UINT32_MAX, INT64_MAX});
}
- OlapStopWatch watch;
RETURN_IF_ERROR(calc_delete_bitmap(rowset, segments, specified_rowsets,
delete_bitmap,
- cur_version, rowset_writer));
+ cur_version, token, rowset_writer));
size_t total_rows = std::accumulate(
segments.begin(), segments.end(), 0,
[](size_t sum, const segment_v2::SegmentSharedPtr& s) { return sum
+= s->num_rows(); });
@@ -3267,7 +3263,7 @@ Status Tablet::commit_phase_update_delete_bitmap(
<< ", rowset_ids to add: " << rowset_ids_to_add.size()
<< ", rowset_ids to del: " << rowset_ids_to_del.size()
<< ", cur max_version: " << cur_version << ", transaction_id: "
<< txn_id
- << ", cost: " << watch.get_elapse_time_us() << "(us), total
rows: " << total_rows;
+ << ", total rows: " << total_rows;
pre_rowset_ids = cur_rowset_ids;
return Status::OK();
}
@@ -3309,8 +3305,11 @@ Status Tablet::update_delete_bitmap(const
RowsetSharedPtr& rowset,
}
OlapStopWatch watch;
+ auto token =
StorageEngine::instance()->calc_delete_bitmap_executor()->create_token();
RETURN_IF_ERROR(calc_delete_bitmap(rowset, segments, specified_rowsets,
delete_bitmap,
- cur_version - 1, rowset_writer));
+ cur_version - 1, token.get(),
rowset_writer));
+ token->wait();
+ token->get_delete_bitmap(delete_bitmap);
size_t total_rows = std::accumulate(
segments.begin(), segments.end(), 0,
[](size_t sum, const segment_v2::SegmentSharedPtr& s) { return sum
+= s->num_rows(); });
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index adc06de339..3efd2c89ce 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -70,6 +70,7 @@ class RowIdConversion;
class TTabletInfo;
class TabletMetaPB;
class TupleDescriptor;
+class CalcDeleteBitmapToken;
enum CompressKind : int;
namespace io {
@@ -446,7 +447,7 @@ public:
const std::vector<segment_v2::SegmentSharedPtr>&
segments,
const std::vector<RowsetSharedPtr>&
specified_rowsets,
DeleteBitmapPtr delete_bitmap, int64_t version,
- RowsetWriter* rowset_writer = nullptr);
+ CalcDeleteBitmapToken* token, RowsetWriter*
rowset_writer = nullptr);
std::vector<RowsetSharedPtr> get_rowset_by_ids(
const RowsetIdUnorderedSet* specified_rowset_ids);
@@ -479,7 +480,7 @@ public:
const RowsetSharedPtr& rowset, RowsetIdUnorderedSet&
pre_rowset_ids,
DeleteBitmapPtr delete_bitmap,
const std::vector<segment_v2::SegmentSharedPtr>& segments, int64_t
txn_id,
- RowsetWriter* rowset_writer = nullptr);
+ CalcDeleteBitmapToken* token, RowsetWriter* rowset_writer =
nullptr);
Status update_delete_bitmap(const RowsetSharedPtr& rowset,
const RowsetIdUnorderedSet& pre_rowset_ids,
diff --git a/be/src/runtime/tablets_channel.cpp
b/be/src/runtime/tablets_channel.cpp
index 3862fd533e..714aff369e 100644
--- a/be/src/runtime/tablets_channel.cpp
+++ b/be/src/runtime/tablets_channel.cpp
@@ -166,9 +166,9 @@ Status TabletsChannel::close(
// just skip this tablet(writer) and continue to close
others
continue;
}
- // to make sure tablet writer in `_broken_tablets` won't call
`close_wait` method.
- // `close_wait` might create the rowset and commit txn
directly, and the subsequent
- // publish version task will success, which can cause the
replica inconsistency.
+ // tablet writer in `_broken_tablets` should not call
`build_rowset` and
+ // `commit_txn` method, after that, the publish-version task
will success,
+ // which can cause the replica inconsistency.
if (_is_broken_tablet(it.second->tablet_id())) {
LOG(WARNING) << "SHOULD NOT HAPPEN, tablet writer is
broken but not cancelled"
<< ", tablet_id=" << it.first << ",
transaction_id=" << _txn_id;
@@ -190,7 +190,39 @@ Status TabletsChannel::close(
_write_single_replica = write_single_replica;
- // 2. wait delta writers and build the tablet vector
+ // 2. wait all writer finished flush.
+ for (auto writer : need_wait_writers) {
+ writer->wait_flush();
+ }
+
+ // 3. build rowset
+ for (auto it = need_wait_writers.begin(); it !=
need_wait_writers.end(); it++) {
+ Status st = (*it)->build_rowset();
+ if (!st.ok()) {
+ _add_error_tablet(tablet_errors, (*it)->tablet_id(), st);
+ it = need_wait_writers.erase(it);
+ continue;
+ }
+ // 3.1 calculate delete bitmap for Unique Key MoW tables
+ st = (*it)->submit_calc_delete_bitmap_task();
+ if (!st.ok()) {
+ _add_error_tablet(tablet_errors, (*it)->tablet_id(), st);
+ it = need_wait_writers.erase(it);
+ continue;
+ }
+ }
+
+ // 4. wait for delete bitmap calculation complete if necessary
+ for (auto it = need_wait_writers.begin(); it !=
need_wait_writers.end(); it++) {
+ Status st = (*it)->wait_calc_delete_bitmap();
+ if (!st.ok()) {
+ _add_error_tablet(tablet_errors, (*it)->tablet_id(), st);
+ it = need_wait_writers.erase(it);
+ continue;
+ }
+ }
+
+ // 5. commit all writers
for (auto writer : need_wait_writers) {
PSlaveTabletNodes slave_nodes;
if (write_single_replica) {
@@ -198,7 +230,7 @@ Status TabletsChannel::close(
}
// close may return failed, but no need to handle it here.
// tablet_vec will only contains success tablet, and then let FE
judge it.
- _close_wait(writer, tablet_vec, tablet_errors, slave_nodes,
write_single_replica);
+ _commit_txn(writer, tablet_vec, tablet_errors, slave_nodes,
write_single_replica);
}
if (write_single_replica) {
@@ -229,12 +261,12 @@ Status TabletsChannel::close(
return Status::OK();
}
-void TabletsChannel::_close_wait(DeltaWriter* writer,
+void TabletsChannel::_commit_txn(DeltaWriter* writer,
google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec,
google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors,
PSlaveTabletNodes slave_tablet_nodes,
const bool write_single_replica) {
- Status st = writer->close_wait(slave_tablet_nodes, write_single_replica);
+ Status st = writer->commit_txn(slave_tablet_nodes, write_single_replica);
if (st.ok()) {
PTabletInfo* tablet_info = tablet_vec->Add();
tablet_info->set_tablet_id(writer->tablet_id());
@@ -242,14 +274,20 @@ void TabletsChannel::_close_wait(DeltaWriter* writer,
tablet_info->set_received_rows(writer->total_received_rows());
tablet_info->set_num_rows_filtered(writer->num_rows_filtered());
} else {
- PTabletError* tablet_error = tablet_errors->Add();
- tablet_error->set_tablet_id(writer->tablet_id());
- tablet_error->set_msg(st.to_string());
- VLOG_PROGRESS << "close wait failed tablet " << writer->tablet_id() <<
" transaction_id "
- << _txn_id << "err msg " << st;
+ _add_error_tablet(tablet_errors, writer->tablet_id(), st);
}
}
+void TabletsChannel::_add_error_tablet(
+ google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors,
int64_t tablet_id,
+ Status error) {
+ PTabletError* tablet_error = tablet_errors->Add();
+ tablet_error->set_tablet_id(tablet_id);
+ tablet_error->set_msg(error.to_string());
+ VLOG_PROGRESS << "close wait failed tablet " << tablet_id << "
transaction_id " << _txn_id
+ << "err msg " << error;
+}
+
int64_t TabletsChannel::mem_consumption() {
int64_t mem_usage = 0;
{
diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h
index 9a06653c74..e7c05a1998 100644
--- a/be/src/runtime/tablets_channel.h
+++ b/be/src/runtime/tablets_channel.h
@@ -126,14 +126,16 @@ private:
// open all writer
Status _open_all_writers(const PTabletWriterOpenRequest& request);
- // deal with DeltaWriter close_wait(), add tablet to list for return.
- void _close_wait(DeltaWriter* writer,
+ // deal with DeltaWriter commit_txn(), add tablet to list for return.
+ void _commit_txn(DeltaWriter* writer,
google::protobuf::RepeatedPtrField<PTabletInfo>*
tablet_vec,
- google::protobuf::RepeatedPtrField<PTabletError>*
tablet_error,
+ google::protobuf::RepeatedPtrField<PTabletError>*
tablet_errors,
PSlaveTabletNodes slave_tablet_nodes, const bool
write_single_replica);
void _build_partition_tablets_relation(const PTabletWriterOpenRequest&
request);
void _add_broken_tablet(int64_t tablet_id);
+ void _add_error_tablet(google::protobuf::RepeatedPtrField<PTabletError>*
tablet_errors,
+ int64_t tablet_id, Status error);
bool _is_broken_tablet(int64_t tablet_id);
void _init_profile(RuntimeProfile* profile);
diff --git a/be/test/olap/delta_writer_test.cpp
b/be/test/olap/delta_writer_test.cpp
index f1d5f49619..9fd998ebd0 100644
--- a/be/test/olap/delta_writer_test.cpp
+++ b/be/test/olap/delta_writer_test.cpp
@@ -261,7 +261,8 @@ static void create_tablet_request(int64_t tablet_id,
int32_t schema_hash,
}
static void create_tablet_request_with_sequence_col(int64_t tablet_id, int32_t
schema_hash,
- TCreateTabletReq* request)
{
+ TCreateTabletReq* request,
+ bool enable_mow = false) {
request->tablet_id = tablet_id;
request->__set_version(1);
request->tablet_schema.schema_hash = schema_hash;
@@ -270,6 +271,7 @@ static void create_tablet_request_with_sequence_col(int64_t
tablet_id, int32_t s
request->tablet_schema.storage_type = TStorageType::COLUMN;
request->tablet_schema.__set_sequence_col_idx(4);
request->__set_storage_format(TStorageFormat::V2);
+ request->__set_enable_unique_key_merge_on_write(enable_mow);
TColumn k1;
k1.column_name = "k1";
@@ -434,6 +436,28 @@ static TDescriptorTable
create_descriptor_tablet_with_sequence_col() {
return dtb.desc_tbl();
}
+static void generate_data(vectorized::Block* block, int8_t k1, int16_t k2,
int32_t seq) {
+ auto columns = block->mutate_columns();
+ int8_t c1 = k1;
+ columns[0]->insert_data((const char*)&c1, sizeof(c1));
+
+ int16_t c2 = k2;
+ columns[1]->insert_data((const char*)&c2, sizeof(c2));
+
+ vectorized::VecDateTimeValue c3;
+ c3.from_date_str("2020-07-16 19:39:43", 19);
+ int64_t c3_int = c3.to_int64();
+ columns[2]->insert_data((const char*)&c3_int, sizeof(c3));
+
+ doris::vectorized::DateV2Value<doris::vectorized::DateV2ValueType> c4;
+ c4.set_time(2022, 6, 6, 0, 0, 0, 0);
+ uint32_t c4_int = c4.to_date_int_val();
+ columns[3]->insert_data((const char*)&c4_int, sizeof(c4));
+
+ int32_t c5 = seq;
+ columns[4]->insert_data((const char*)&c5, sizeof(c2));
+}
+
class TestDeltaWriter : public ::testing::Test {
public:
TestDeltaWriter() {}
@@ -473,7 +497,9 @@ TEST_F(TestDeltaWriter, open) {
EXPECT_NE(delta_writer, nullptr);
res = delta_writer->close();
EXPECT_EQ(Status::OK(), res);
- res = delta_writer->close_wait(PSlaveTabletNodes(), false);
+ res = delta_writer->build_rowset();
+ EXPECT_EQ(Status::OK(), res);
+ res = delta_writer->commit_txn(PSlaveTabletNodes(), false);
EXPECT_EQ(Status::OK(), res);
SAFE_DELETE(delta_writer);
@@ -593,7 +619,15 @@ TEST_F(TestDeltaWriter, vec_write) {
res = delta_writer->close();
ASSERT_TRUE(res.ok());
- res = delta_writer->close_wait(PSlaveTabletNodes(), false);
+ res = delta_writer->wait_flush();
+ ASSERT_TRUE(res.ok());
+ res = delta_writer->build_rowset();
+ ASSERT_TRUE(res.ok());
+ res = delta_writer->submit_calc_delete_bitmap_task();
+ ASSERT_TRUE(res.ok());
+ res = delta_writer->wait_calc_delete_bitmap();
+ ASSERT_TRUE(res.ok());
+ res = delta_writer->commit_txn(PSlaveTabletNodes(), false);
ASSERT_TRUE(res.ok());
// publish version success
@@ -661,54 +695,25 @@ TEST_F(TestDeltaWriter, vec_sequence_col) {
slot_desc->col_name()));
}
- auto columns = block.mutate_columns();
- {
- int8_t c1 = 123;
- columns[0]->insert_data((const char*)&c1, sizeof(c1));
-
- int16_t c2 = 456;
- columns[1]->insert_data((const char*)&c2, sizeof(c2));
-
- vectorized::VecDateTimeValue c3;
- c3.from_date_str("2020-07-16 19:39:43", 19);
- int64_t c3_int = c3.to_int64();
- columns[2]->insert_data((const char*)&c3_int, sizeof(c3));
-
- doris::vectorized::DateV2Value<doris::vectorized::DateV2ValueType> c4;
- c4.set_time(2022, 6, 6, 0, 0, 0, 0);
- uint32_t c4_int = c4.to_date_int_val();
- columns[3]->insert_data((const char*)&c4_int, sizeof(c4));
-
- int32_t c5 = 100;
- columns[4]->insert_data((const char*)&c5, sizeof(c2));
- res = delta_writer->write(&block, {0});
- ASSERT_TRUE(res.ok());
- }
- {
- int8_t c1 = 123;
- columns[0]->insert_data((const char*)&c1, sizeof(c1));
-
- int16_t c2 = 456;
- columns[1]->insert_data((const char*)&c2, sizeof(c2));
-
- vectorized::VecDateTimeValue c3;
- c3.from_date_str("2020-07-31 19:39:43", 19);
- int64_t c3_int = c3.to_int64();
- columns[2]->insert_data((const char*)&c3_int, sizeof(c3));
+ generate_data(&block, 123, 456, 100);
+ res = delta_writer->write(&block, {0});
+ ASSERT_TRUE(res.ok());
- doris::vectorized::DateV2Value<doris::vectorized::DateV2ValueType> c4;
- c4.set_time(2022, 7, 6, 0, 0, 0, 0);
- uint32_t c4_int = c4.to_date_int_val();
- columns[3]->insert_data((const char*)&c4_int, sizeof(c4));
+ generate_data(&block, 123, 456, 90);
+ res = delta_writer->write(&block, {1});
+ ASSERT_TRUE(res.ok());
- int32_t c5 = 90;
- columns[4]->insert_data((const char*)&c5, sizeof(c2));
- res = delta_writer->write(&block, {1});
- ASSERT_TRUE(res.ok());
- }
res = delta_writer->close();
ASSERT_TRUE(res.ok());
- res = delta_writer->close_wait(PSlaveTabletNodes(), false);
+ res = delta_writer->wait_flush();
+ ASSERT_TRUE(res.ok());
+ res = delta_writer->build_rowset();
+ ASSERT_TRUE(res.ok());
+ res = delta_writer->submit_calc_delete_bitmap_task();
+ ASSERT_TRUE(res.ok());
+ res = delta_writer->wait_calc_delete_bitmap();
+ ASSERT_TRUE(res.ok());
+ res = delta_writer->commit_txn(PSlaveTabletNodes(), false);
ASSERT_TRUE(res.ok());
// publish version success
@@ -767,4 +772,243 @@ TEST_F(TestDeltaWriter, vec_sequence_col) {
delete delta_writer;
}
+TEST_F(TestDeltaWriter, vec_sequence_col_concurrent_write) {
+ TCreateTabletReq request;
+ sleep(20);
+ create_tablet_request_with_sequence_col(10005, 270068377, &request, true);
+ Status res = k_engine->create_tablet(request);
+ ASSERT_TRUE(res.ok());
+
+ TDescriptorTable tdesc_tbl = create_descriptor_tablet_with_sequence_col();
+ ObjectPool obj_pool;
+ DescriptorTbl* desc_tbl = nullptr;
+ DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl);
+ TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0);
+ OlapTableSchemaParam param;
+
+ PUniqueId load_id;
+ load_id.set_hi(0);
+ load_id.set_lo(0);
+ WriteRequest write_req = {
+ 10005, 270068377, WriteType::LOAD, 20003, 30003, load_id,
tuple_desc, &(tuple_desc->slots()),
+ false, ¶m};
+ DeltaWriter* delta_writer1 = nullptr;
+ DeltaWriter* delta_writer2 = nullptr;
+ std::unique_ptr<RuntimeProfile> profile1;
+ profile1 = std::make_unique<RuntimeProfile>("LoadChannels1");
+ std::unique_ptr<RuntimeProfile> profile2;
+ profile2 = std::make_unique<RuntimeProfile>("LoadChannels2");
+ DeltaWriter::open(&write_req, &delta_writer1, profile1.get(), TUniqueId());
+ DeltaWriter::open(&write_req, &delta_writer2, profile2.get(), TUniqueId());
+ ASSERT_NE(delta_writer1, nullptr);
+ ASSERT_NE(delta_writer2, nullptr);
+
+ // write data in delta writer 1
+ {
+ vectorized::Block block;
+ for (const auto& slot_desc : tuple_desc->slots()) {
+
block.insert(vectorized::ColumnWithTypeAndName(slot_desc->get_empty_mutable_column(),
+
slot_desc->get_data_type_ptr(),
+
slot_desc->col_name()));
+ }
+
+ generate_data(&block, 10, 123, 100);
+ res = delta_writer1->write(&block, {0});
+ ASSERT_TRUE(res.ok());
+
+ generate_data(&block, 20, 123, 100);
+ res = delta_writer1->write(&block, {1});
+ ASSERT_TRUE(res.ok());
+
+ res = delta_writer1->close();
+ ASSERT_TRUE(res.ok());
+ res = delta_writer1->wait_flush();
+ ASSERT_TRUE(res.ok());
+ res = delta_writer1->build_rowset();
+ ASSERT_TRUE(res.ok());
+ res = delta_writer1->submit_calc_delete_bitmap_task();
+ ASSERT_TRUE(res.ok());
+ res = delta_writer1->wait_calc_delete_bitmap();
+ ASSERT_TRUE(res.ok());
+ res = delta_writer1->commit_txn(PSlaveTabletNodes(), false);
+ ASSERT_TRUE(res.ok());
+ }
+ // write data in delta writer 2
+ {
+ vectorized::Block block;
+ for (const auto& slot_desc : tuple_desc->slots()) {
+
block.insert(vectorized::ColumnWithTypeAndName(slot_desc->get_empty_mutable_column(),
+
slot_desc->get_data_type_ptr(),
+
slot_desc->col_name()));
+ }
+
+ generate_data(&block, 10, 123, 110);
+ res = delta_writer2->write(&block, {0});
+ ASSERT_TRUE(res.ok());
+
+ generate_data(&block, 20, 123, 90);
+ res = delta_writer2->write(&block, {1});
+ ASSERT_TRUE(res.ok());
+
+ res = delta_writer2->close();
+ ASSERT_TRUE(res.ok());
+ res = delta_writer2->wait_flush();
+ ASSERT_TRUE(res.ok());
+ }
+ TabletSharedPtr tablet =
k_engine->tablet_manager()->get_tablet(write_req.tablet_id);
+ std::cout << "before publish, tablet row nums:" << tablet->num_rows() <<
std::endl;
+ OlapMeta* meta = tablet->data_dir()->get_meta();
+ RowsetSharedPtr rowset1 = nullptr;
+ RowsetSharedPtr rowset2 = nullptr;
+
+ // publish version on delta writer 1 success
+ {
+ Version version;
+ version.first = tablet->rowset_with_max_version()->end_version() + 1;
+ version.second = tablet->rowset_with_max_version()->end_version() + 1;
+ std::cout << "start to add rowset version:" << version.first << "-" <<
version.second
+ << std::endl;
+ std::map<TabletInfo, RowsetSharedPtr> tablet_related_rs;
+ StorageEngine::instance()->txn_manager()->get_txn_related_tablets(
+ write_req.txn_id, write_req.partition_id, &tablet_related_rs);
+ ASSERT_EQ(1, tablet_related_rs.size());
+
+ std::cout << "start to publish txn" << std::endl;
+ rowset1 = tablet_related_rs.begin()->second;
+ TabletPublishStatistics pstats;
+ res = k_engine->txn_manager()->publish_txn(meta,
write_req.partition_id, write_req.txn_id,
+ write_req.tablet_id,
write_req.schema_hash,
+
tablet_related_rs.begin()->first.tablet_uid,
+ version, &pstats);
+ ASSERT_TRUE(res.ok());
+ std::cout << "start to add inc rowset:" << rowset1->rowset_id()
+ << ", num rows:" << rowset1->num_rows()
+ << ", version:" << rowset1->version().first << "-" <<
rowset1->version().second
+ << std::endl;
+ res = tablet->add_inc_rowset(rowset1);
+ ASSERT_TRUE(res.ok());
+ ASSERT_EQ(2, tablet->num_rows());
+ std::vector<segment_v2::SegmentSharedPtr> segments;
+ res = ((BetaRowset*)rowset1.get())->load_segments(&segments);
+ ASSERT_TRUE(res.ok());
+ ASSERT_EQ(1, rowset1->num_segments());
+ ASSERT_EQ(1, segments.size());
+ }
+
+ // commit delta writer2, then publish it.
+ {
+ // commit, calc delete bitmap should happen here
+ res = delta_writer2->build_rowset();
+ ASSERT_TRUE(res.ok());
+ res = delta_writer2->submit_calc_delete_bitmap_task();
+ ASSERT_TRUE(res.ok());
+ res = delta_writer2->wait_calc_delete_bitmap();
+ ASSERT_TRUE(res.ok());
+
+ // verify that delete bitmap calculated correctly
+ // since the delete bitmap not published, versions are 0
+ auto delete_bitmap = delta_writer2->get_delete_bitmap();
+ ASSERT_TRUE(delete_bitmap->contains({rowset1->rowset_id(), 0, 0}, 0));
+ // We can't get the rowset id of rowset2 now, will check the delete
bitmap
+ // contains row 0 of rowset2 at L929.
+
+ res = delta_writer2->commit_txn(PSlaveTabletNodes(), false);
+ ASSERT_TRUE(res.ok());
+
+ Version version;
+ version.first = tablet->rowset_with_max_version()->end_version() + 1;
+ version.second = tablet->rowset_with_max_version()->end_version() + 1;
+ std::cout << "start to add rowset version:" << version.first << "-" <<
version.second
+ << std::endl;
+ std::map<TabletInfo, RowsetSharedPtr> tablet_related_rs;
+ StorageEngine::instance()->txn_manager()->get_txn_related_tablets(
+ write_req.txn_id, write_req.partition_id, &tablet_related_rs);
+ ASSERT_EQ(1, tablet_related_rs.size());
+
+ std::cout << "start to publish txn" << std::endl;
+ rowset2 = tablet_related_rs.begin()->second;
+ ASSERT_TRUE(delete_bitmap->contains({rowset2->rowset_id(), 0, 0}, 1));
+
+ TabletPublishStatistics pstats;
+ res = k_engine->txn_manager()->publish_txn(meta,
write_req.partition_id, write_req.txn_id,
+ write_req.tablet_id,
write_req.schema_hash,
+
tablet_related_rs.begin()->first.tablet_uid,
+ version, &pstats);
+ ASSERT_TRUE(res.ok());
+ std::cout << "start to add inc rowset:" << rowset2->rowset_id()
+ << ", num rows:" << rowset2->num_rows()
+ << ", version:" << rowset2->version().first << "-" <<
rowset2->version().second
+ << std::endl;
+ res = tablet->add_inc_rowset(rowset2);
+ ASSERT_TRUE(res.ok());
+ ASSERT_EQ(4, tablet->num_rows());
+ std::vector<segment_v2::SegmentSharedPtr> segments;
+ res = ((BetaRowset*)rowset2.get())->load_segments(&segments);
+ ASSERT_TRUE(res.ok());
+ ASSERT_EQ(1, rowset2->num_segments());
+ ASSERT_EQ(1, segments.size());
+ }
+
+ auto cur_version = tablet->rowset_with_max_version()->end_version();
+ // read data from rowset 1, verify the data correct
+ {
+ OlapReaderStatistics stats;
+ StorageReadOptions opts;
+ opts.stats = &stats;
+ opts.tablet_schema = rowset1->tablet_schema();
+ opts.delete_bitmap.emplace(0,
tablet->tablet_meta()->delete_bitmap().get_agg(
+ {rowset1->rowset_id(), 0,
cur_version}));
+ std::unique_ptr<RowwiseIterator> iter;
+ std::shared_ptr<Schema> schema =
std::make_shared<Schema>(rowset1->tablet_schema());
+ std::vector<segment_v2::SegmentSharedPtr> segments;
+ ((BetaRowset*)rowset1.get())->load_segments(&segments);
+ auto s = segments[0]->new_iterator(schema, opts, &iter);
+ ASSERT_TRUE(s.ok());
+ auto read_block = rowset1->tablet_schema()->create_block();
+ res = iter->next_batch(&read_block);
+ ASSERT_TRUE(res.ok());
+ // key of (10, 123) is deleted
+ ASSERT_EQ(1, read_block.rows());
+ auto k1 = read_block.get_by_position(0).column->get_int(0);
+ ASSERT_EQ(20, k1);
+ auto k2 = read_block.get_by_position(1).column->get_int(0);
+ ASSERT_EQ(123, k2);
+ // get the value from sequence column
+ auto seq_v = read_block.get_by_position(4).column->get_int(0);
+ ASSERT_EQ(100, seq_v);
+ }
+
+ // read data from rowset 2, verify the data correct
+ {
+ OlapReaderStatistics stats;
+ StorageReadOptions opts;
+ opts.stats = &stats;
+ opts.tablet_schema = rowset2->tablet_schema();
+ opts.delete_bitmap.emplace(0,
tablet->tablet_meta()->delete_bitmap().get_agg(
+ {rowset2->rowset_id(), 0,
cur_version}));
+ std::unique_ptr<RowwiseIterator> iter;
+ std::shared_ptr<Schema> schema =
std::make_shared<Schema>(rowset2->tablet_schema());
+ std::vector<segment_v2::SegmentSharedPtr> segments;
+ ((BetaRowset*)rowset2.get())->load_segments(&segments);
+ auto s = segments[0]->new_iterator(schema, opts, &iter);
+ ASSERT_TRUE(s.ok());
+ auto read_block = rowset2->tablet_schema()->create_block();
+ res = iter->next_batch(&read_block);
+ ASSERT_TRUE(res.ok());
+ // key of (20, 123) is deleted, because it's seq value is low
+ ASSERT_EQ(1, read_block.rows());
+ auto k1 = read_block.get_by_position(0).column->get_int(0);
+ ASSERT_EQ(10, k1);
+ auto k2 = read_block.get_by_position(1).column->get_int(0);
+ ASSERT_EQ(123, k2);
+ // get the value from sequence column
+ auto seq_v = read_block.get_by_position(4).column->get_int(0);
+ ASSERT_EQ(110, seq_v);
+ }
+
+ res = k_engine->tablet_manager()->drop_tablet(request.tablet_id,
request.replica_id, false);
+ ASSERT_TRUE(res.ok());
+ delete delta_writer1;
+ delete delta_writer2;
+}
} // namespace doris
diff --git a/be/test/olap/engine_storage_migration_task_test.cpp
b/be/test/olap/engine_storage_migration_task_test.cpp
index 79fbf26203..5e2312cb0d 100644
--- a/be/test/olap/engine_storage_migration_task_test.cpp
+++ b/be/test/olap/engine_storage_migration_task_test.cpp
@@ -193,7 +193,9 @@ TEST_F(TestEngineStorageMigrationTask, write_and_migration)
{
res = delta_writer->close();
EXPECT_EQ(Status::OK(), res);
- res = delta_writer->close_wait(PSlaveTabletNodes(), false);
+ res = delta_writer->build_rowset();
+ EXPECT_EQ(Status::OK(), res);
+ res = delta_writer->commit_txn(PSlaveTabletNodes(), false);
EXPECT_EQ(Status::OK(), res);
// publish version success
diff --git a/be/test/olap/tablet_cooldown_test.cpp
b/be/test/olap/tablet_cooldown_test.cpp
index 431342c4f2..509425cb15 100644
--- a/be/test/olap/tablet_cooldown_test.cpp
+++ b/be/test/olap/tablet_cooldown_test.cpp
@@ -393,7 +393,9 @@ void createTablet(StorageEngine* engine, TabletSharedPtr*
tablet, int64_t replic
st = delta_writer->close();
ASSERT_EQ(Status::OK(), st);
- st = delta_writer->close_wait(PSlaveTabletNodes(), false);
+ st = delta_writer->build_rowset();
+ ASSERT_EQ(Status::OK(), st);
+ st = delta_writer->commit_txn(PSlaveTabletNodes(), false);
ASSERT_EQ(Status::OK(), st);
delete delta_writer;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]