This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 1f631c388d [enhance](cooldown)accelerate cooldown task produce
efficiency (#16089)
1f631c388d is described below
commit 1f631c388ddc6ecbddbf10de50f61d52d69c2932
Author: AlexYue <[email protected]>
AuthorDate: Fri Feb 10 16:58:27 2023 +0800
[enhance](cooldown)accelerate cooldown task produce efficiency (#16089)
---
.../exec/schema_scanner/schema_rowsets_scanner.cpp | 9 ---
be/src/http/action/pad_rowset_action.cpp | 1 -
be/src/olap/compaction.cpp | 2 -
be/src/olap/compaction.h | 1 -
be/src/olap/delta_writer.cpp | 1 -
be/src/olap/olap_server.cpp | 64 +++++++++-------------
be/src/olap/push_handler.cpp | 1 -
be/src/olap/rowset/beta_rowset_reader.h | 1 -
be/src/olap/rowset/beta_rowset_writer.cpp | 9 ---
be/src/olap/rowset/rowset.h | 2 +-
be/src/olap/rowset/rowset_meta.h | 6 --
be/src/olap/rowset/rowset_reader.h | 1 -
be/src/olap/rowset/rowset_writer_context.h | 1 -
be/src/olap/schema_change.cpp | 10 +---
be/src/olap/schema_change.h | 7 +--
be/src/olap/snapshot_manager.cpp | 1 -
be/src/olap/storage_engine.h | 3 +-
be/src/olap/tablet.cpp | 20 +------
be/src/olap/tablet_manager.cpp | 56 ++++++++++---------
be/src/olap/tablet_manager.h | 3 +-
be/src/util/threadpool.cpp | 6 --
be/src/util/threadpool.h | 18 +++++-
be/test/olap/tablet_test.cpp | 26 +++++----
docs/en/docs/admin-manual/system-table/rowsets.md | 1 -
.../docs/admin-manual/system-table/rowsets.md | 1 -
.../java/org/apache/doris/catalog/SchemaTable.java | 1 -
gensrc/proto/olap_file.proto | 4 +-
27 files changed, 101 insertions(+), 155 deletions(-)
diff --git a/be/src/exec/schema_scanner/schema_rowsets_scanner.cpp
b/be/src/exec/schema_scanner/schema_rowsets_scanner.cpp
index 2744531bd7..5ee608acbb 100644
--- a/be/src/exec/schema_scanner/schema_rowsets_scanner.cpp
+++ b/be/src/exec/schema_scanner/schema_rowsets_scanner.cpp
@@ -44,7 +44,6 @@ std::vector<SchemaScanner::ColumnDesc>
SchemaRowsetsScanner::_s_tbls_columns = {
{"INDEX_DISK_SIZE", TYPE_BIGINT, sizeof(size_t), true},
{"DATA_DISK_SIZE", TYPE_BIGINT, sizeof(size_t), true},
{"CREATION_TIME", TYPE_BIGINT, sizeof(int64_t), true},
- {"OLDEST_WRITE_TIMESTAMP", TYPE_BIGINT, sizeof(int64_t), true},
{"NEWEST_WRITE_TIMESTAMP", TYPE_BIGINT, sizeof(int64_t), true},
};
@@ -189,14 +188,6 @@ Status
SchemaRowsetsScanner::_fill_block_impl(vectorized::Block* block) {
fill_dest_column(block, &src, _s_tbls_columns[10]);
}
}
- // OLDEST_WRITE_TIMESTAMP
- {
- for (int i = fill_idx_begin; i < fill_idx_end; ++i) {
- RowsetSharedPtr rowset = rowsets_[i];
- size_t src = rowset->oldest_write_timestamp();
- fill_dest_column(block, &src, _s_tbls_columns[11]);
- }
- }
// NEWEST_WRITE_TIMESTAMP
{
for (int i = fill_idx_begin; i < fill_idx_end; ++i) {
diff --git a/be/src/http/action/pad_rowset_action.cpp
b/be/src/http/action/pad_rowset_action.cpp
index 225fec4dd2..c353b4fad7 100644
--- a/be/src/http/action/pad_rowset_action.cpp
+++ b/be/src/http/action/pad_rowset_action.cpp
@@ -91,7 +91,6 @@ Status PadRowsetAction::_pad_rowset(TabletSharedPtr tablet,
const Version& versi
ctx.rowset_state = VISIBLE;
ctx.segments_overlap = NONOVERLAPPING;
ctx.tablet_schema = tablet->tablet_schema();
- ctx.oldest_write_timestamp = UnixSeconds();
ctx.newest_write_timestamp = UnixSeconds();
RETURN_IF_ERROR(tablet->create_rowset_writer(ctx, &writer));
auto rowset = writer->build();
diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index e15d263ded..f968f219fe 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -184,7 +184,6 @@ void Compaction::build_basic_info() {
_output_version =
Version(_input_rowsets.front()->start_version(),
_input_rowsets.back()->end_version());
- _oldest_write_timestamp = _input_rowsets.front()->oldest_write_timestamp();
_newest_write_timestamp = _input_rowsets.back()->newest_write_timestamp();
std::vector<RowsetMetaSharedPtr> rowset_metas(_input_rowsets.size());
@@ -364,7 +363,6 @@ Status Compaction::construct_output_rowset_writer(bool
is_vertical) {
ctx.rowset_state = VISIBLE;
ctx.segments_overlap = NONOVERLAPPING;
ctx.tablet_schema = _cur_tablet_schema;
- ctx.oldest_write_timestamp = _oldest_write_timestamp;
ctx.newest_write_timestamp = _newest_write_timestamp;
if (is_vertical) {
return _tablet->create_vertical_rowset_writer(ctx, &_output_rs_writer);
diff --git a/be/src/olap/compaction.h b/be/src/olap/compaction.h
index 11ea2e1ec1..8501df9ef4 100644
--- a/be/src/olap/compaction.h
+++ b/be/src/olap/compaction.h
@@ -105,7 +105,6 @@ protected:
Version _output_version;
- int64_t _oldest_write_timestamp;
int64_t _newest_write_timestamp;
RowIdConversion _rowid_conversion;
TabletSchemaSPtr _cur_tablet_schema;
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index ee0b89570a..678d5006a0 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -137,7 +137,6 @@ Status DeltaWriter::init() {
context.rowset_state = PREPARED;
context.segments_overlap = OVERLAPPING;
context.tablet_schema = _tablet_schema;
- context.oldest_write_timestamp = UnixSeconds();
context.newest_write_timestamp = UnixSeconds();
context.tablet_id = _tablet->table_id();
context.is_direct_write = true;
diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index 9764680ff9..2bfffbbf29 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -692,60 +692,46 @@ Status
StorageEngine::submit_seg_compaction_task(BetaRowsetWriter* writer,
void StorageEngine::_cooldown_tasks_producer_callback() {
int64_t interval = config::generate_cooldown_task_interval_sec;
do {
- if (_cooldown_thread_pool->get_queue_size() > 0) {
- continue;
- }
+ // these tables are ordered by priority desc
std::vector<TabletSharedPtr> tablets;
// TODO(luwei) : a more efficient way to get cooldown tablets
- _tablet_manager->get_cooldown_tablets(&tablets);
+ // we should skip all the tablets which are not running and those
pending to do cooldown
+ auto skip_tablet = [this](const TabletSharedPtr& tablet) -> bool {
+ std::lock_guard<std::mutex> lock(_running_cooldown_mutex);
+ return TABLET_RUNNING != tablet->tablet_state() ||
+ _running_cooldown_tablets.find(tablet->tablet_id()) ==
+ _running_cooldown_tablets.end();
+ };
+ _tablet_manager->get_cooldown_tablets(&tablets,
std::move(skip_tablet));
LOG(INFO) << "cooldown producer get tablet num: " << tablets.size();
+ int max_priority = tablets.size();
for (const auto& tablet : tablets) {
- Status st = _cooldown_thread_pool->submit_func([tablet, tablets,
this]() {
+ {
+ std::lock_guard<std::mutex> lock(_running_cooldown_mutex);
+ _running_cooldown_tablets.insert(tablet->tablet_id());
+ }
+ PriorityThreadPool::Task task;
+ task.work_function = [tablet, task_size = tablets.size(), this]() {
+ Status st = tablet->cooldown();
{
- // Cooldown tasks on the same tablet cannot be executed
concurrently
std::lock_guard<std::mutex> lock(_running_cooldown_mutex);
- auto it =
_running_cooldown_tablets.find(tablet->tablet_id());
- if (it != _running_cooldown_tablets.end()) {
- return;
- }
-
- // the number of concurrent cooldown tasks in each
directory
- // cannot exceed the configured value
- auto dir_it =
_running_cooldown_tasks_cnt.find(tablet->data_dir());
- if (dir_it != _running_cooldown_tasks_cnt.end() &&
- dir_it->second >= config::concurrency_per_dir) {
- return;
- }
-
- _running_cooldown_tablets.insert(tablet->tablet_id());
- dir_it =
_running_cooldown_tasks_cnt.find(tablet->data_dir());
- if (dir_it != _running_cooldown_tasks_cnt.end()) {
- _running_cooldown_tasks_cnt[tablet->data_dir()]++;
- } else {
- _running_cooldown_tasks_cnt[tablet->data_dir()] = 1;
- }
+ _running_cooldown_tablets.erase(tablet->tablet_id());
}
-
- Status st = tablet->cooldown();
if (!st.ok()) {
LOG(WARNING) << "failed to cooldown, tablet: " <<
tablet->tablet_id()
<< " err: " << st;
} else {
LOG(INFO) << "succeed to cooldown, tablet: " <<
tablet->tablet_id()
<< " cooldown progress ("
- << tablets.size() -
_cooldown_thread_pool->get_queue_size() << "/"
- << tablets.size() << ")";
+ << task_size -
_cooldown_thread_pool->get_queue_size() << "/"
+ << task_size << ")";
}
+ };
+ task.priority = max_priority--;
+ bool submited = _cooldown_thread_pool->offer(std::move(task));
- {
- std::lock_guard<std::mutex> lock(_running_cooldown_mutex);
- _running_cooldown_tasks_cnt[tablet->data_dir()]--;
- _running_cooldown_tablets.erase(tablet->tablet_id());
- }
- });
-
- if (!st.ok()) {
- LOG(INFO) << "failed to submit cooldown task, err msg: " << st;
+ if (submited) {
+ LOG(INFO) << "failed to submit cooldown task";
}
}
} while
(!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval)));
diff --git a/be/src/olap/push_handler.cpp b/be/src/olap/push_handler.cpp
index 56c202a300..6ed44ea253 100644
--- a/be/src/olap/push_handler.cpp
+++ b/be/src/olap/push_handler.cpp
@@ -197,7 +197,6 @@ Status PushHandler::_convert_v2(TabletSharedPtr cur_tablet,
RowsetSharedPtr* cur
context.rowset_state = PREPARED;
context.segments_overlap = OVERLAP_UNKNOWN;
context.tablet_schema = tablet_schema;
- context.oldest_write_timestamp = UnixSeconds();
context.newest_write_timestamp = UnixSeconds();
res = cur_tablet->create_rowset_writer(context, &rowset_writer);
if (!res.ok()) {
diff --git a/be/src/olap/rowset/beta_rowset_reader.h
b/be/src/olap/rowset/beta_rowset_reader.h
index 53bc73c156..e5223e5241 100644
--- a/be/src/olap/rowset/beta_rowset_reader.h
+++ b/be/src/olap/rowset/beta_rowset_reader.h
@@ -45,7 +45,6 @@ public:
Version version() override { return _rowset->version(); }
- int64_t oldest_write_timestamp() override { return
_rowset->oldest_write_timestamp(); }
int64_t newest_write_timestamp() override { return
_rowset->newest_write_timestamp(); }
RowsetSharedPtr rowset() override { return
std::dynamic_pointer_cast<Rowset>(_rowset); }
diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp
b/be/src/olap/rowset/beta_rowset_writer.cpp
index 7417af83e0..6d96b8445b 100644
--- a/be/src/olap/rowset/beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer.cpp
@@ -94,7 +94,6 @@ Status BetaRowsetWriter::init(const RowsetWriterContext&
rowset_writer_context)
_rowset_meta->set_load_id(_context.load_id);
} else {
_rowset_meta->set_version(_context.version);
-
_rowset_meta->set_oldest_write_timestamp(_context.oldest_write_timestamp);
_rowset_meta->set_newest_write_timestamp(_context.newest_write_timestamp);
}
_rowset_meta->set_tablet_uid(_context.tablet_uid);
@@ -646,10 +645,6 @@ Status BetaRowsetWriter::_wait_flying_segcompaction() {
}
RowsetSharedPtr BetaRowsetWriter::manual_build(const RowsetMetaSharedPtr&
spec_rowset_meta) {
- if (_rowset_meta->oldest_write_timestamp() == -1) {
- _rowset_meta->set_oldest_write_timestamp(UnixSeconds());
- }
-
if (_rowset_meta->newest_write_timestamp() == -1) {
_rowset_meta->set_newest_write_timestamp(UnixSeconds());
}
@@ -701,10 +696,6 @@ RowsetSharedPtr BetaRowsetWriter::build() {
DCHECK(_segment_writer == nullptr) << "segment must be null when build
rowset";
_build_rowset_meta(_rowset_meta);
- if (_rowset_meta->oldest_write_timestamp() == -1) {
- _rowset_meta->set_oldest_write_timestamp(UnixSeconds());
- }
-
if (_rowset_meta->newest_write_timestamp() == -1) {
_rowset_meta->set_newest_write_timestamp(UnixSeconds());
}
diff --git a/be/src/olap/rowset/rowset.h b/be/src/olap/rowset/rowset.h
index 5fcc599bf8..196de0cfbb 100644
--- a/be/src/olap/rowset/rowset.h
+++ b/be/src/olap/rowset/rowset.h
@@ -148,7 +148,7 @@ public:
int64_t num_segments() const { return rowset_meta()->num_segments(); }
void to_rowset_pb(RowsetMetaPB* rs_meta) const { return
rowset_meta()->to_rowset_pb(rs_meta); }
RowsetMetaPB get_rowset_pb() const { return
rowset_meta()->get_rowset_pb(); }
- int64_t oldest_write_timestamp() const { return
rowset_meta()->oldest_write_timestamp(); }
+ // The writing time of the newest data in rowset, to measure the freshness
of a rowset.
int64_t newest_write_timestamp() const { return
rowset_meta()->newest_write_timestamp(); }
bool is_segments_overlapping() const { return
rowset_meta()->is_segments_overlapping(); }
KeysType keys_type() { return _schema->keys_type(); }
diff --git a/be/src/olap/rowset/rowset_meta.h b/be/src/olap/rowset/rowset_meta.h
index bb122c9834..950323f152 100644
--- a/be/src/olap/rowset/rowset_meta.h
+++ b/be/src/olap/rowset/rowset_meta.h
@@ -354,16 +354,10 @@ public:
}
}
- void set_oldest_write_timestamp(int64_t timestamp) {
- _rowset_meta_pb.set_oldest_write_timestamp(timestamp);
- }
-
void set_newest_write_timestamp(int64_t timestamp) {
_rowset_meta_pb.set_newest_write_timestamp(timestamp);
}
- int64_t oldest_write_timestamp() const { return
_rowset_meta_pb.oldest_write_timestamp(); }
-
int64_t newest_write_timestamp() const { return
_rowset_meta_pb.newest_write_timestamp(); }
void set_tablet_schema(const TabletSchemaSPtr& tablet_schema) {
diff --git a/be/src/olap/rowset/rowset_reader.h
b/be/src/olap/rowset/rowset_reader.h
index 4186088fe5..4d979c6e21 100644
--- a/be/src/olap/rowset/rowset_reader.h
+++ b/be/src/olap/rowset/rowset_reader.h
@@ -63,7 +63,6 @@ public:
virtual RowsetTypePB type() const = 0;
- virtual int64_t oldest_write_timestamp() = 0;
virtual int64_t newest_write_timestamp() = 0;
virtual Status current_block_row_locations(std::vector<RowLocation>*
locations) {
return Status::NotSupported("to be implemented");
diff --git a/be/src/olap/rowset/rowset_writer_context.h
b/be/src/olap/rowset/rowset_writer_context.h
index 60f569e362..8669a072de 100644
--- a/be/src/olap/rowset/rowset_writer_context.h
+++ b/be/src/olap/rowset/rowset_writer_context.h
@@ -73,7 +73,6 @@ struct RowsetWriterContext {
// (because it hard to refactor, and RowsetConvertor will be deprecated in
future)
DataDir* data_dir = nullptr;
- int64_t oldest_write_timestamp;
int64_t newest_write_timestamp;
bool enable_unique_key_merge_on_write = false;
std::set<int32_t> skip_inverted_index;
diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp
index 63a9a211eb..1efe95e11f 100644
--- a/be/src/olap/schema_change.cpp
+++ b/be/src/olap/schema_change.cpp
@@ -460,7 +460,6 @@ Status
VSchemaChangeWithSorting::_inner_process(RowsetReaderSharedPtr rowset_rea
RowsetSharedPtr rowset = rowset_reader->rowset();
SegmentsOverlapPB segments_overlap =
rowset->rowset_meta()->segments_overlap();
- int64_t oldest_write_timestamp = rowset->oldest_write_timestamp();
int64_t newest_write_timestamp = rowset->newest_write_timestamp();
_temp_delta_versions.first = _temp_delta_versions.second;
@@ -472,8 +471,7 @@ Status
VSchemaChangeWithSorting::_inner_process(RowsetReaderSharedPtr rowset_rea
RowsetSharedPtr rowset;
RETURN_IF_ERROR(_internal_sorting(
blocks, Version(_temp_delta_versions.second,
_temp_delta_versions.second),
- oldest_write_timestamp, newest_write_timestamp, new_tablet,
BETA_ROWSET,
- segments_overlap, &rowset));
+ newest_write_timestamp, new_tablet, BETA_ROWSET,
segments_overlap, &rowset));
src_rowsets.push_back(rowset);
for (auto& block : blocks) {
@@ -529,8 +527,8 @@ Status
VSchemaChangeWithSorting::_inner_process(RowsetReaderSharedPtr rowset_rea
Status VSchemaChangeWithSorting::_internal_sorting(
const std::vector<std::unique_ptr<vectorized::Block>>& blocks, const
Version& version,
- int64_t oldest_write_timestamp, int64_t newest_write_timestamp,
TabletSharedPtr new_tablet,
- RowsetTypePB new_rowset_type, SegmentsOverlapPB segments_overlap,
RowsetSharedPtr* rowset) {
+ int64_t newest_write_timestamp, TabletSharedPtr new_tablet,
RowsetTypePB new_rowset_type,
+ SegmentsOverlapPB segments_overlap, RowsetSharedPtr* rowset) {
uint64_t merged_rows = 0;
MultiBlockMerger merger(new_tablet);
@@ -540,7 +538,6 @@ Status VSchemaChangeWithSorting::_internal_sorting(
context.rowset_state = VISIBLE;
context.segments_overlap = segments_overlap;
context.tablet_schema = new_tablet->tablet_schema();
- context.oldest_write_timestamp = oldest_write_timestamp;
context.newest_write_timestamp = newest_write_timestamp;
RETURN_IF_ERROR(new_tablet->create_rowset_writer(context, &rowset_writer));
@@ -1051,7 +1048,6 @@ Status
SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangeParams
context.rowset_state = VISIBLE;
context.segments_overlap =
rs_reader->rowset()->rowset_meta()->segments_overlap();
context.tablet_schema = new_tablet->tablet_schema();
- context.oldest_write_timestamp = rs_reader->oldest_write_timestamp();
context.newest_write_timestamp = rs_reader->newest_write_timestamp();
context.fs = rs_reader->rowset()->rowset_meta()->fs();
Status status = new_tablet->create_rowset_writer(context,
&rowset_writer);
diff --git a/be/src/olap/schema_change.h b/be/src/olap/schema_change.h
index 099d1af81d..7920186a1f 100644
--- a/be/src/olap/schema_change.h
+++ b/be/src/olap/schema_change.h
@@ -158,10 +158,9 @@ private:
TabletSharedPtr new_tablet, TabletSchemaSPtr
base_tablet_schema) override;
Status _internal_sorting(const
std::vector<std::unique_ptr<vectorized::Block>>& blocks,
- const Version& temp_delta_versions, int64_t
oldest_write_timestamp,
- int64_t newest_write_timestamp, TabletSharedPtr
new_tablet,
- RowsetTypePB new_rowset_type, SegmentsOverlapPB
segments_overlap,
- RowsetSharedPtr* rowset);
+ const Version& temp_delta_versions, int64_t
newest_write_timestamp,
+ TabletSharedPtr new_tablet, RowsetTypePB
new_rowset_type,
+ SegmentsOverlapPB segments_overlap,
RowsetSharedPtr* rowset);
Status _external_sorting(std::vector<RowsetSharedPtr>& src_rowsets,
RowsetWriter* rowset_writer,
TabletSharedPtr new_tablet);
diff --git a/be/src/olap/snapshot_manager.cpp b/be/src/olap/snapshot_manager.cpp
index b6288530e9..d7166dd97d 100644
--- a/be/src/olap/snapshot_manager.cpp
+++ b/be/src/olap/snapshot_manager.cpp
@@ -261,7 +261,6 @@ Status SnapshotManager::_rename_rowset_id(const
RowsetMetaPB& rs_meta_pb,
org_rowset_meta->tablet_schema() ?
org_rowset_meta->tablet_schema() : tablet_schema;
context.rowset_state = org_rowset_meta->rowset_state();
context.version = org_rowset_meta->version();
- context.oldest_write_timestamp = org_rowset_meta->oldest_write_timestamp();
context.newest_write_timestamp = org_rowset_meta->newest_write_timestamp();
// keep segments_overlap same as origin rowset
context.segments_overlap = rowset_meta->segments_overlap();
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index 6ccd446fae..b0a6b63b41 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -395,10 +395,9 @@ private:
scoped_refptr<Thread> _cache_file_cleaner_tasks_producer_thread;
- std::unique_ptr<ThreadPool> _cooldown_thread_pool;
+ std::unique_ptr<PriorityThreadPool> _cooldown_thread_pool;
std::mutex _running_cooldown_mutex;
- std::unordered_map<DataDir*, int64_t> _running_cooldown_tasks_cnt;
std::unordered_set<int64_t> _running_cooldown_tablets;
DISALLOW_COPY_AND_ASSIGN(StorageEngine);
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 92b203b339..32c0cc3d98 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -1556,7 +1556,6 @@ Status Tablet::create_initial_rowset(const int64_t
req_version) {
context.rowset_state = VISIBLE;
context.segments_overlap = OVERLAP_UNKNOWN;
context.tablet_schema = tablet_schema();
- context.oldest_write_timestamp = UnixSeconds();
context.newest_write_timestamp = UnixSeconds();
res = create_rowset_writer(context, &rs_writer);
@@ -1890,14 +1889,6 @@ bool Tablet::need_cooldown(int64_t* cooldown_timestamp,
size_t* file_size) {
return false;
}
- int64_t oldest_cooldown_time = std::numeric_limits<int64_t>::max();
- if (cooldown_ttl_sec >= 0) {
- oldest_cooldown_time = rowset->oldest_write_timestamp() +
cooldown_ttl_sec;
- }
- if (cooldown_datetime > 0) {
- oldest_cooldown_time = std::min(oldest_cooldown_time,
cooldown_datetime);
- }
-
int64_t newest_cooldown_time = std::numeric_limits<int64_t>::max();
if (cooldown_ttl_sec >= 0) {
newest_cooldown_time = rowset->newest_write_timestamp() +
cooldown_ttl_sec;
@@ -1906,14 +1897,10 @@ bool Tablet::need_cooldown(int64_t* cooldown_timestamp,
size_t* file_size) {
newest_cooldown_time = std::min(newest_cooldown_time,
cooldown_datetime);
}
- if (oldest_cooldown_time + config::cooldown_lag_time_sec < UnixSeconds()) {
- *cooldown_timestamp = oldest_cooldown_time;
- VLOG_DEBUG << "tablet need cooldown, tablet id: " << tablet_id()
- << " cooldown_timestamp: " << *cooldown_timestamp;
- return true;
- }
-
+ // the rowset should do cooldown job only if it's cooldown ttl plus newest
write time is less than
+ // current time or it's datatime is less than current time
if (newest_cooldown_time < UnixSeconds()) {
+ *cooldown_timestamp = newest_cooldown_time;
*file_size = rowset->data_disk_size();
VLOG_DEBUG << "tablet need cooldown, tablet id: " << tablet_id()
<< " file_size: " << *file_size;
@@ -1922,7 +1909,6 @@ bool Tablet::need_cooldown(int64_t* cooldown_timestamp,
size_t* file_size) {
VLOG_DEBUG << "tablet does not need cooldown, tablet id: " << tablet_id()
<< " ttl sec: " << cooldown_ttl_sec << " cooldown datetime: "
<< cooldown_datetime
- << " oldest write time: " << rowset->oldest_write_timestamp()
<< " newest write time: " << rowset->newest_write_timestamp();
return false;
}
diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp
index 87334cbfe2..407fb78520 100644
--- a/be/src/olap/tablet_manager.cpp
+++ b/be/src/olap/tablet_manager.cpp
@@ -1301,39 +1301,43 @@ struct SortCtx {
SortCtx(TabletSharedPtr tablet, int64_t cooldown_timestamp, int64_t
file_size)
: tablet(tablet), cooldown_timestamp(cooldown_timestamp),
file_size(file_size) {}
TabletSharedPtr tablet;
- int64_t cooldown_timestamp;
+ // to ensure the tablet with -1 would always be greater than other
+ uint64_t cooldown_timestamp;
int64_t file_size;
+ bool operator<(const SortCtx& other) const {
+ if (this->cooldown_timestamp == other.cooldown_timestamp) {
+ return this->file_size > other.file_size;
+ }
+ return this->cooldown_timestamp < other.cooldown_timestamp;
+ }
};
-void TabletManager::get_cooldown_tablets(std::vector<TabletSharedPtr>*
tablets) {
+void TabletManager::get_cooldown_tablets(std::vector<TabletSharedPtr>* tablets,
+ std::function<bool(const
TabletSharedPtr&)> skip_tablet) {
std::vector<SortCtx> sort_ctx_vec;
+ std::vector<std::weak_ptr<Tablet>> candidates;
for (const auto& tablets_shard : _tablets_shards) {
std::shared_lock rdlock(tablets_shard.lock);
- for (const auto& item : tablets_shard.tablet_map) {
- const TabletSharedPtr& tablet = item.second;
- int64_t cooldown_timestamp = -1;
- size_t file_size = -1;
- if (tablet->need_cooldown(&cooldown_timestamp, &file_size)) {
- sort_ctx_vec.emplace_back(tablet, cooldown_timestamp,
file_size);
- }
- }
- }
-
- std::sort(sort_ctx_vec.begin(), sort_ctx_vec.end(), [](SortCtx a, SortCtx
b) {
- if (a.cooldown_timestamp != -1 && b.cooldown_timestamp != -1) {
- return a.cooldown_timestamp < b.cooldown_timestamp;
- }
-
- if (a.cooldown_timestamp != -1 && b.cooldown_timestamp == -1) {
- return true;
- }
-
- if (a.cooldown_timestamp == -1 && b.cooldown_timestamp != -1) {
- return false;
- }
+ std::for_each(
+ tablets_shard.tablet_map.begin(),
tablets_shard.tablet_map.end(),
+ [&candidates](auto& tablet_pair) {
candidates.emplace_back(tablet_pair.second); });
+ }
+ std::for_each(
+ candidates.begin(), candidates.end(),
+ [&sort_ctx_vec, &skip_tablet](std::weak_ptr<Tablet>& t) {
+ const TabletSharedPtr& tablet = t.lock();
+ if (UNLIKELY(nullptr == tablet)) {
+ return;
+ }
+ std::shared_lock rdlock(tablet->get_header_lock());
+ int64_t cooldown_timestamp = -1;
+ size_t file_size = -1;
+ if (skip_tablet(tablet) &&
tablet->need_cooldown(&cooldown_timestamp, &file_size)) {
+ sort_ctx_vec.emplace_back(tablet, cooldown_timestamp,
file_size);
+ }
+ });
- return a.file_size > b.file_size;
- });
+ std::sort(sort_ctx_vec.begin(), sort_ctx_vec.end());
for (SortCtx& ctx : sort_ctx_vec) {
VLOG_DEBUG << "get cooldown tablet: " << ctx.tablet->tablet_id();
diff --git a/be/src/olap/tablet_manager.h b/be/src/olap/tablet_manager.h
index 667013d268..7ae4060142 100644
--- a/be/src/olap/tablet_manager.h
+++ b/be/src/olap/tablet_manager.h
@@ -139,7 +139,8 @@ public:
void get_tablets_distribution_on_different_disks(
std::map<int64_t, std::map<DataDir*, int64_t>>&
tablets_num_on_disk,
std::map<int64_t, std::map<DataDir*, std::vector<TabletSize>>>&
tablets_info_on_disk);
- void get_cooldown_tablets(std::vector<TabletSharedPtr>* tables);
+ void get_cooldown_tablets(std::vector<TabletSharedPtr>* tables,
+ std::function<bool(const TabletSharedPtr&)>
skip_tablet);
void get_all_tablets_storage_format(TCheckStorageFormatResult* result);
diff --git a/be/src/util/threadpool.cpp b/be/src/util/threadpool.cpp
index ff90604017..321825c891 100644
--- a/be/src/util/threadpool.cpp
+++ b/be/src/util/threadpool.cpp
@@ -73,12 +73,6 @@ ThreadPoolBuilder& ThreadPoolBuilder::set_max_queue_size(int
max_queue_size) {
return *this;
}
-Status ThreadPoolBuilder::build(std::unique_ptr<ThreadPool>* pool) const {
- pool->reset(new ThreadPool(*this));
- RETURN_IF_ERROR((*pool)->init());
- return Status::OK();
-}
-
ThreadPoolToken::ThreadPoolToken(ThreadPool* pool, ThreadPool::ExecutionMode
mode,
int max_concurrency)
: _mode(mode),
diff --git a/be/src/util/threadpool.h b/be/src/util/threadpool.h
index bf07e16846..89052274b0 100644
--- a/be/src/util/threadpool.h
+++ b/be/src/util/threadpool.h
@@ -27,11 +27,13 @@
#include <functional>
#include <memory>
#include <string>
+#include <type_traits>
#include <unordered_set>
#include <utility>
#include "common/status.h"
#include "gutil/ref_counted.h"
+#include "util/priority_thread_pool.hpp"
namespace doris {
@@ -105,7 +107,18 @@ public:
return *this;
}
// Instantiate a new ThreadPool with the existing builder arguments.
- Status build(std::unique_ptr<ThreadPool>* pool) const;
+ template <typename ThreadPoolType>
+ Status build(std::unique_ptr<ThreadPoolType>* pool) const {
+ if constexpr (std::is_same_v<ThreadPoolType, ThreadPool>) {
+ pool->reset(new ThreadPoolType(*this));
+ RETURN_IF_ERROR((*pool)->init());
+ } else if constexpr (std::is_same_v<ThreadPoolType,
PriorityThreadPool>) {
+ pool->reset(new ThreadPoolType(_max_threads, _max_queue_size,
_name));
+ } else {
+ static_assert(always_false_v<ThreadPoolType>, "Unsupported
ThreadPoolType");
+ }
+ return Status::OK();
+ }
private:
friend class ThreadPool;
@@ -117,6 +130,9 @@ private:
ThreadPoolBuilder(const ThreadPoolBuilder&) = delete;
void operator=(const ThreadPoolBuilder&) = delete;
+
+ template <typename T>
+ static constexpr bool always_false_v = false;
};
// Thread pool with a variable number of threads.
diff --git a/be/test/olap/tablet_test.cpp b/be/test/olap/tablet_test.cpp
index fe6435930a..9400e0e88c 100644
--- a/be/test/olap/tablet_test.cpp
+++ b/be/test/olap/tablet_test.cpp
@@ -111,10 +111,8 @@ public:
pb1->set_tablet_schema(_tablet_meta->tablet_schema());
}
- void init_rs_meta(RowsetMetaSharedPtr& pb1, int64_t start, int64_t end,
int64_t earliest_ts,
- int64_t latest_ts) {
+ void init_rs_meta(RowsetMetaSharedPtr& pb1, int64_t start, int64_t end,
int64_t latest_ts) {
pb1->init_from_json(_json_rowset_meta);
- pb1->set_oldest_write_timestamp(earliest_ts);
pb1->set_newest_write_timestamp(latest_ts);
pb1->set_start_version(start);
pb1->set_end_version(end);
@@ -282,27 +280,27 @@ TEST_F(TestTablet, pad_rowset) {
TEST_F(TestTablet, cooldown_policy) {
std::vector<RowsetMetaSharedPtr> rs_metas;
RowsetMetaSharedPtr ptr1(new RowsetMeta());
- init_rs_meta(ptr1, 0, 2, 100, 200);
+ init_rs_meta(ptr1, 0, 2, 200);
rs_metas.push_back(ptr1);
RowsetSharedPtr rowset1 = make_shared<BetaRowset>(nullptr, "", ptr1);
RowsetMetaSharedPtr ptr2(new RowsetMeta());
- init_rs_meta(ptr2, 3, 4, 300, 600);
+ init_rs_meta(ptr2, 3, 4, 600);
rs_metas.push_back(ptr2);
RowsetSharedPtr rowset2 = make_shared<BetaRowset>(nullptr, "", ptr2);
RowsetMetaSharedPtr ptr3(new RowsetMeta());
- init_rs_meta(ptr3, 5, 5, 800, 800);
+ init_rs_meta(ptr3, 5, 5, 800);
rs_metas.push_back(ptr3);
RowsetSharedPtr rowset3 = make_shared<BetaRowset>(nullptr, "", ptr3);
RowsetMetaSharedPtr ptr4(new RowsetMeta());
- init_rs_meta(ptr4, 6, 7, 1100, 1400);
+ init_rs_meta(ptr4, 6, 7, 1400);
rs_metas.push_back(ptr4);
RowsetSharedPtr rowset4 = make_shared<BetaRowset>(nullptr, "", ptr4);
RowsetMetaSharedPtr ptr5(new RowsetMeta());
- init_rs_meta(ptr5, 8, 9, 1800, 2000);
+ init_rs_meta(ptr5, 8, 9, 2000);
rs_metas.push_back(ptr5);
RowsetSharedPtr rowset5 = make_shared<BetaRowset>(nullptr, "", ptr5);
@@ -322,6 +320,7 @@ TEST_F(TestTablet, cooldown_policy) {
_tablet->_rs_version_map[ptr5->version()] = rowset5;
_tablet->set_cumulative_layer_point(20);
+ sleep(30);
{
auto storage_policy = std::make_shared<StoragePolicy>();
@@ -334,7 +333,7 @@ TEST_F(TestTablet, cooldown_policy) {
bool ret = _tablet->need_cooldown(&cooldown_timestamp, &file_size);
ASSERT_TRUE(ret);
ASSERT_EQ(cooldown_timestamp, 250);
- ASSERT_EQ(file_size, -1);
+ ASSERT_EQ(file_size, 84699);
}
{
@@ -347,8 +346,8 @@ TEST_F(TestTablet, cooldown_policy) {
size_t file_size = -1;
bool ret = _tablet->need_cooldown(&cooldown_timestamp, &file_size);
ASSERT_TRUE(ret);
- ASSERT_EQ(cooldown_timestamp, 3700);
- ASSERT_EQ(file_size, -1);
+ ASSERT_EQ(cooldown_timestamp, 3800);
+ ASSERT_EQ(file_size, 84699);
}
{
@@ -374,8 +373,11 @@ TEST_F(TestTablet, cooldown_policy) {
int64_t cooldown_timestamp = -1;
size_t file_size = -1;
bool ret = _tablet->need_cooldown(&cooldown_timestamp, &file_size);
+ // the rowset with earliest version woule be picked up to do cooldown
of which the timestamp
+ // is UnixSeconds() - 250
+ int64_t expect_cooldown_timestamp = UnixSeconds() - 50;
ASSERT_TRUE(ret);
- ASSERT_EQ(cooldown_timestamp, -1);
+ ASSERT_EQ(cooldown_timestamp, expect_cooldown_timestamp);
ASSERT_EQ(file_size, 84699);
}
}
diff --git a/docs/en/docs/admin-manual/system-table/rowsets.md
b/docs/en/docs/admin-manual/system-table/rowsets.md
index 7c1664be25..2ad7c7a5c6 100644
--- a/docs/en/docs/admin-manual/system-table/rowsets.md
+++ b/docs/en/docs/admin-manual/system-table/rowsets.md
@@ -51,7 +51,6 @@ MySQL [(none)]> desc information_schema.rowsets;
| INDEX_DISK_SIZE | BIGINT | Yes | false | NULL | |
| DATA_DISK_SIZE | BIGINT | Yes | false | NULL | |
| CREATION_TIME | BIGINT | Yes | false | NULL | |
-| OLDEST_WRITE_TIMESTAMP | BIGINT | Yes | false | NULL | |
| NEWEST_WRITE_TIMESTAMP | BIGINT | Yes | false | NULL | |
+------------------------+------------+------+-------+---------+-------+
```
diff --git a/docs/zh-CN/docs/admin-manual/system-table/rowsets.md
b/docs/zh-CN/docs/admin-manual/system-table/rowsets.md
index c7579b0234..159c6dc288 100644
--- a/docs/zh-CN/docs/admin-manual/system-table/rowsets.md
+++ b/docs/zh-CN/docs/admin-manual/system-table/rowsets.md
@@ -55,7 +55,6 @@ MySQL [(none)]> desc information_schema.rowsets;
| INDEX_DISK_SIZE | BIGINT | Yes | false | NULL | |
| DATA_DISK_SIZE | BIGINT | Yes | false | NULL | |
| CREATION_TIME | BIGINT | Yes | false | NULL | |
-| OLDEST_WRITE_TIMESTAMP | BIGINT | Yes | false | NULL | |
| NEWEST_WRITE_TIMESTAMP | BIGINT | Yes | false | NULL | |
+------------------------+------------+------+-------+---------+-------+
```
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
index 9769006569..4f6ddb032e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
@@ -384,7 +384,6 @@ public class SchemaTable extends Table {
.column("INDEX_DISK_SIZE",
ScalarType.createType(PrimitiveType.BIGINT))
.column("DATA_DISK_SIZE",
ScalarType.createType(PrimitiveType.BIGINT))
.column("CREATION_TIME",
ScalarType.createType(PrimitiveType.BIGINT))
- .column("OLDEST_WRITE_TIMESTAMP",
ScalarType.createType(PrimitiveType.BIGINT))
.column("NEWEST_WRITE_TIMESTAMP",
ScalarType.createType(PrimitiveType.BIGINT))
.build()))
.put("backends", new SchemaTable(SystemIdGenerator.getNextId(),
"backends", TableType.SCHEMA,
diff --git a/gensrc/proto/olap_file.proto b/gensrc/proto/olap_file.proto
index 4452373335..4e65af4f06 100644
--- a/gensrc/proto/olap_file.proto
+++ b/gensrc/proto/olap_file.proto
@@ -100,8 +100,8 @@ message RowsetMetaPB {
optional string rowset_id_v2 = 23;
// resource id
optional string resource_id = 24;
- // earliest write time
- optional int64 oldest_write_timestamp = 25 [default = -1];
+ // used to be oldest write time: earliest write time
+ reserved 25;
// latest write time
optional int64 newest_write_timestamp = 26 [default = -1];
// the encoded segment min/max key of segments in this rowset,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]