This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 58c520dbfd [Feature](remote) Cooldown cold data to object storage only
one replica (#15832)
58c520dbfd is described below
commit 58c520dbfde1ce09c0a23a5ec2276ca687f6c0db
Author: pengxiangyu <[email protected]>
AuthorDate: Sat Jan 14 23:58:00 2023 +0800
[Feature](remote) Cooldown cold data to object storage only one replica
(#15832)
---
be/src/agent/agent_server.cpp | 11 +
be/src/agent/agent_server.h | 1 +
be/src/agent/task_worker_pool.cpp | 44 +++
be/src/agent/task_worker_pool.h | 6 +-
be/src/http/action/pad_rowset_action.cpp | 2 +
be/src/io/fs/file_reader_options.h | 14 +-
be/src/io/fs/remote_file_system.cpp | 3 +-
be/src/olap/delta_writer.cpp | 3 +
be/src/olap/push_handler.cpp | 4 +
be/src/olap/rowset/beta_rowset.cpp | 26 +-
be/src/olap/rowset/beta_rowset.h | 2 +
be/src/olap/rowset/beta_rowset_writer.cpp | 4 +-
be/src/olap/rowset/segment_v2/segment.cpp | 3 +-
be/src/olap/rowset/segment_v2/segment.h | 1 +
be/src/olap/tablet.cpp | 132 ++++++-
be/src/olap/tablet.h | 4 +
be/src/olap/tablet_meta.cpp | 18 +-
be/src/olap/tablet_meta.h | 15 +
be/test/io/cache/remote_file_cache_test.cpp | 4 +-
be/test/tools/benchmark_tool.cpp | 4 +-
.../org/apache/doris/common/FeMetaVersion.java | 4 +-
.../doris/alter/MaterializedViewHandler.java | 3 +-
.../java/org/apache/doris/alter/RollupJobV2.java | 4 +-
.../apache/doris/alter/SchemaChangeHandler.java | 4 +-
.../org/apache/doris/alter/SchemaChangeJobV2.java | 4 +-
.../java/org/apache/doris/backup/RestoreJob.java | 15 +-
.../apache/doris/catalog/CatalogRecycleBin.java | 6 +-
.../main/java/org/apache/doris/catalog/Env.java | 24 ++
.../main/java/org/apache/doris/catalog/Tablet.java | 42 ++-
.../apache/doris/catalog/TabletInvertedIndex.java | 55 ++-
.../java/org/apache/doris/catalog/TabletMeta.java | 24 +-
.../main/java/org/apache/doris/common/Config.java | 7 +
.../org/apache/doris/cooldown/CooldownConf.java | 130 +++++++
.../apache/doris/cooldown/CooldownException.java | 32 ++
.../org/apache/doris/cooldown/CooldownHandler.java | 185 ++++++++++
.../org/apache/doris/cooldown/CooldownJob.java | 397 +++++++++++++++++++++
.../apache/doris/datasource/InternalCatalog.java | 18 +-
.../org/apache/doris/journal/JournalEntity.java | 6 +
.../java/org/apache/doris/master/MasterImpl.java | 10 +
.../org/apache/doris/master/ReportHandler.java | 10 +-
.../java/org/apache/doris/persist/EditLog.java | 9 +
.../org/apache/doris/persist/OperationType.java | 2 +
.../doris/persist/meta/MetaPersistMethod.java | 6 +
.../doris/persist/meta/PersistMetaModules.java | 2 +-
.../java/org/apache/doris/task/AgentBatchTask.java | 10 +
.../apache/doris/task/PushCooldownConfTask.java | 52 +++
.../org/apache/doris/backup/CatalogMocker.java | 10 +-
.../org/apache/doris/catalog/CatalogTestUtil.java | 5 +-
.../java/org/apache/doris/catalog/TabletTest.java | 2 +-
.../doris/clone/ClusterLoadStatisticsTest.java | 6 +-
.../org/apache/doris/clone/RebalancerTestUtil.java | 2 +-
.../org/apache/doris/common/util/UnitTestUtil.java | 2 +-
.../org/apache/doris/cooldown/CooldownJobTest.java | 130 +++++++
.../org/apache/doris/http/DorisHttpTestCase.java | 3 +-
.../org/apache/doris/load/DeleteHandlerTest.java | 2 +-
gensrc/thrift/AgentService.thrift | 11 +
gensrc/thrift/MasterService.thrift | 3 +
gensrc/thrift/Types.thrift | 3 +-
58 files changed, 1459 insertions(+), 82 deletions(-)
diff --git a/be/src/agent/agent_server.cpp b/be/src/agent/agent_server.cpp
index d57777b6e6..c0138a08e1 100644
--- a/be/src/agent/agent_server.cpp
+++ b/be/src/agent/agent_server.cpp
@@ -69,6 +69,7 @@ AgentServer::AgentServer(ExecEnv* exec_env, const
TMasterInfo& master_info)
CREATE_AND_START_POOL(CREATE_TABLE, _create_tablet_workers);
CREATE_AND_START_POOL(DROP_TABLE, _drop_tablet_workers);
+
// Both PUSH and REALTIME_PUSH type use _push_workers
CREATE_AND_START_POOL(PUSH, _push_workers);
CREATE_AND_START_POOL(PUBLISH_VERSION, _publish_version_workers);
@@ -84,6 +85,7 @@ AgentServer::AgentServer(ExecEnv* exec_env, const
TMasterInfo& master_info)
CREATE_AND_START_POOL(RELEASE_SNAPSHOT, _release_snapshot_workers);
CREATE_AND_START_POOL(MOVE, _move_dir_workers);
CREATE_AND_START_POOL(UPDATE_TABLET_META_INFO,
_update_tablet_meta_info_workers);
+ CREATE_AND_START_THREAD(PUSH_COOLDOWN_CONF, _push_cooldown_conf_workers);
CREATE_AND_START_THREAD(REPORT_TASK, _report_task_workers);
CREATE_AND_START_THREAD(REPORT_DISK_STATE, _report_disk_state_workers);
@@ -183,6 +185,15 @@ void AgentServer::submit_tasks(TAgentResult& agent_result,
signature);
}
break;
+ case TTaskType::PUSH_COOLDOWN_CONF:
+ if (task.__isset.push_cooldown_conf) {
+ _push_cooldown_conf_workers->submit_task(task);
+ } else {
+ ret_st = Status::InvalidArgument(
+ "task(signature={}) has wrong request member =
push_cooldown_conf",
+ signature);
+ }
+ break;
default:
ret_st = Status::InvalidArgument("task(signature={}, type={}) has
wrong task type",
signature, task_type);
diff --git a/be/src/agent/agent_server.h b/be/src/agent/agent_server.h
index bfb819dd97..3914f3a236 100644
--- a/be/src/agent/agent_server.h
+++ b/be/src/agent/agent_server.h
@@ -60,6 +60,7 @@ private:
std::unique_ptr<TaskWorkerPool> _clear_transaction_task_workers;
std::unique_ptr<TaskWorkerPool> _delete_workers;
std::unique_ptr<TaskWorkerPool> _alter_tablet_workers;
+ std::unique_ptr<TaskWorkerPool> _push_cooldown_conf_workers;
std::unique_ptr<TaskWorkerPool> _clone_workers;
std::unique_ptr<TaskWorkerPool> _storage_medium_migrate_workers;
std::unique_ptr<TaskWorkerPool> _check_consistency_workers;
diff --git a/be/src/agent/task_worker_pool.cpp
b/be/src/agent/task_worker_pool.cpp
index 15a3914e6d..29bfd3631b 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -204,6 +204,10 @@ void TaskWorkerPool::start() {
cb =
std::bind<void>(&TaskWorkerPool::_storage_update_storage_policy_worker_thread_callback,
this);
break;
+ case TaskWorkerType::PUSH_COOLDOWN_CONF:
+ _worker_count = 1;
+ cb =
std::bind<void>(&TaskWorkerPool::_push_cooldown_conf_worker_thread_callback,
this);
+ break;
default:
// pass
break;
@@ -1693,4 +1697,44 @@ void
TaskWorkerPool::_storage_update_storage_policy_worker_thread_callback() {
}
}
+void TaskWorkerPool::_push_cooldown_conf_worker_thread_callback() {
+ while (_is_work) {
+ TAgentTaskRequest agent_task_req;
+ TPushCooldownConfReq push_cooldown_conf_req;
+ {
+ std::unique_lock<std::mutex>
worker_thread_lock(_worker_thread_lock);
+ while (_is_work && _tasks.empty()) {
+ _worker_thread_condition_variable.wait(worker_thread_lock);
+ }
+
+ agent_task_req = _tasks.front();
+ push_cooldown_conf_req = agent_task_req.push_cooldown_conf;
+ _tasks.pop_front();
+ }
+ for (auto cooldown_conf : push_cooldown_conf_req.cooldown_confs) {
+ int64_t tablet_id = cooldown_conf.tablet_id;
+ TabletSharedPtr tablet =
+
StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id);
+ if (tablet.get() == nullptr) {
+ std::stringstream ss;
+ ss << "failed to get tablet. tablet_id=" << tablet_id;
+ LOG(WARNING) << ss.str();
+ continue;
+ }
+ if (cooldown_conf.cooldown_term >
tablet->tablet_meta()->cooldown_term()) {
+ tablet->tablet_meta()->set_cooldown_replica_id_and_term(
+ cooldown_conf.cooldown_replica_id,
cooldown_conf.cooldown_term);
+ LOG(INFO) << "push_cooldown_conf successfully. tablet_id=" <<
tablet_id
+ << ", cooldown_conf: " <<
cooldown_conf.cooldown_replica_id << "("
+ << cooldown_conf.cooldown_term << ")";
+ } else {
+ LOG(WARNING) << "push_cooldown_conf failed. tablet_id=" <<
tablet_id
+ << ", cooldown_term: " <<
tablet->tablet_meta()->cooldown_term()
+ << " -> " << cooldown_conf.cooldown_term;
+ }
+ }
+ _remove_task_info(agent_task_req.task_type, agent_task_req.signature);
+ }
+}
+
} // namespace doris
diff --git a/be/src/agent/task_worker_pool.h b/be/src/agent/task_worker_pool.h
index ae8b9e8149..f94d8d2ad8 100644
--- a/be/src/agent/task_worker_pool.h
+++ b/be/src/agent/task_worker_pool.h
@@ -67,7 +67,8 @@ public:
UPDATE_TABLET_META_INFO,
SUBMIT_TABLE_COMPACTION,
REFRESH_STORAGE_POLICY,
- UPDATE_STORAGE_POLICY
+ UPDATE_STORAGE_POLICY,
+ PUSH_COOLDOWN_CONF
};
enum ReportType { TASK, DISK, TABLET };
@@ -127,6 +128,8 @@ public:
return "REFRESH_STORAGE_POLICY";
case UPDATE_STORAGE_POLICY:
return "UPDATE_STORAGE_POLICY";
+ case PUSH_COOLDOWN_CONF:
+ return "PUSH_COOLDOWN_CONF";
default:
return "Unknown";
}
@@ -192,6 +195,7 @@ private:
void _submit_table_compaction_worker_thread_callback();
void _storage_refresh_storage_policy_worker_thread_callback();
void _storage_update_storage_policy_worker_thread_callback();
+ void _push_cooldown_conf_worker_thread_callback();
void _alter_tablet(const TAgentTaskRequest& alter_tablet_request, int64_t
signature,
const TTaskType::type task_type, TFinishTaskRequest*
finish_task_request);
diff --git a/be/src/http/action/pad_rowset_action.cpp
b/be/src/http/action/pad_rowset_action.cpp
index da513750a9..225fec4dd2 100644
--- a/be/src/http/action/pad_rowset_action.cpp
+++ b/be/src/http/action/pad_rowset_action.cpp
@@ -91,6 +91,8 @@ Status PadRowsetAction::_pad_rowset(TabletSharedPtr tablet,
const Version& versi
ctx.rowset_state = VISIBLE;
ctx.segments_overlap = NONOVERLAPPING;
ctx.tablet_schema = tablet->tablet_schema();
+ ctx.oldest_write_timestamp = UnixSeconds();
+ ctx.newest_write_timestamp = UnixSeconds();
RETURN_IF_ERROR(tablet->create_rowset_writer(ctx, &writer));
auto rowset = writer->build();
rowset->make_visible(version);
diff --git a/be/src/io/fs/file_reader_options.h
b/be/src/io/fs/file_reader_options.h
index f7cc0d13ab..f5a6eaa8f3 100644
--- a/be/src/io/fs/file_reader_options.h
+++ b/be/src/io/fs/file_reader_options.h
@@ -44,17 +44,19 @@ public:
class NoCachePathPolicy : public CachePathPolicy {
public:
NoCachePathPolicy() = default;
- std::string get_cache_path(const std::string& path) const override {
return path; }
+ std::string get_cache_path(const std::string& path) const override {
return ""; }
};
class SegmentCachePathPolicy : public CachePathPolicy {
public:
SegmentCachePathPolicy() = default;
- std::string get_cache_path(const std::string& path) const override {
- // the segment file path is
{rowset_dir}/{schema_hash}/{rowset_id}_{seg_num}.dat
- // cache path is: {rowset_dir}/{schema_hash}/{rowset_id}_{seg_num}/
- return path.substr(0, path.size() - 4) + "/";
- }
+
+ void set_cache_path(const std::string& cache_path) { _cache_path =
cache_path; }
+
+ std::string get_cache_path(const std::string& path) const override {
return _cache_path; }
+
+private:
+ std::string _cache_path;
};
class FileBlockCachePathPolicy : public CachePathPolicy {
diff --git a/be/src/io/fs/remote_file_system.cpp
b/be/src/io/fs/remote_file_system.cpp
index a2bfb93d66..563cc79fba 100644
--- a/be/src/io/fs/remote_file_system.cpp
+++ b/be/src/io/fs/remote_file_system.cpp
@@ -36,8 +36,7 @@ Status RemoteFileSystem::open_file(const Path& path, const
FileReaderOptions& re
}
case io::FileCachePolicy::SUB_FILE_CACHE:
case io::FileCachePolicy::WHOLE_FILE_CACHE: {
- StringPiece str(path.native());
- std::string cache_path =
reader_options.path_policy.get_cache_path(str.as_string());
+ std::string cache_path =
reader_options.path_policy.get_cache_path(path.native());
io::FileCachePtr cache_reader =
FileCacheManager::instance()->new_file_cache(
cache_path, config::file_cache_alive_time_sec, raw_reader,
reader_options.cache_type);
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index cd7178e1b4..c001b67cad 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -138,6 +138,9 @@ Status DeltaWriter::init() {
context.rowset_state = PREPARED;
context.segments_overlap = OVERLAPPING;
context.tablet_schema = _tablet_schema;
+ context.oldest_write_timestamp = UnixSeconds();
+ context.newest_write_timestamp = UnixSeconds();
+
RETURN_NOT_OK(_tablet->create_rowset_writer(context, &_rowset_writer));
_schema.reset(new Schema(_tablet_schema));
_reset_mem_table();
diff --git a/be/src/olap/push_handler.cpp b/be/src/olap/push_handler.cpp
index 55472c6d82..ad3bb25e79 100644
--- a/be/src/olap/push_handler.cpp
+++ b/be/src/olap/push_handler.cpp
@@ -203,6 +203,8 @@ Status PushHandler::_convert_v2(TabletSharedPtr cur_tablet,
RowsetSharedPtr* cur
context.rowset_state = PREPARED;
context.segments_overlap = OVERLAP_UNKNOWN;
context.tablet_schema = tablet_schema;
+ context.oldest_write_timestamp = UnixSeconds();
+ context.newest_write_timestamp = UnixSeconds();
res = cur_tablet->create_rowset_writer(context, &rowset_writer);
if (!res.ok()) {
LOG(WARNING) << "failed to init rowset writer, tablet=" <<
cur_tablet->full_name()
@@ -366,6 +368,8 @@ Status PushHandler::_convert(TabletSharedPtr cur_tablet,
RowsetSharedPtr* cur_ro
context.rowset_state = PREPARED;
context.segments_overlap = OVERLAP_UNKNOWN;
context.tablet_schema = tablet_schema;
+ context.oldest_write_timestamp = UnixSeconds();
+ context.newest_write_timestamp = UnixSeconds();
res = cur_tablet->create_rowset_writer(context, &rowset_writer);
if (!res.ok()) {
LOG(WARNING) << "failed to init rowset writer, tablet=" <<
cur_tablet->full_name()
diff --git a/be/src/olap/rowset/beta_rowset.cpp
b/be/src/olap/rowset/beta_rowset.cpp
index e15725d811..300a022078 100644
--- a/be/src/olap/rowset/beta_rowset.cpp
+++ b/be/src/olap/rowset/beta_rowset.cpp
@@ -73,6 +73,11 @@ std::string BetaRowset::remote_tablet_path(int64_t
tablet_id) {
return fmt::format("{}/{}", DATA_PREFIX, tablet_id);
}
+std::string BetaRowset::remote_tablet_meta_path(int64_t tablet_id, int64_t
replica_id) {
+ // data/{tablet_id}/{replica_id}.meta
+ return fmt::format("{}/{}.meta", remote_tablet_path(tablet_id),
replica_id);
+}
+
std::string BetaRowset::remote_segment_path(int64_t tablet_id, const RowsetId&
rowset_id,
int segment_id) {
// data/{tablet_id}/{rowset_id}_{seg_num}.dat
@@ -135,7 +140,12 @@ Status
BetaRowset::load_segments(std::vector<segment_v2::SegmentSharedPtr>* segm
for (int seg_id = 0; seg_id < num_segments(); ++seg_id) {
auto seg_path = segment_file_path(seg_id);
std::shared_ptr<segment_v2::Segment> segment;
- auto s = segment_v2::Segment::open(fs, seg_path, seg_id, rowset_id(),
_schema, &segment);
+ io::SegmentCachePathPolicy cache_policy;
+ cache_policy.set_cache_path(segment_cache_path(seg_id));
+ io::FileReaderOptions
reader_options(io::cache_type_from_string(config::file_cache_type),
+ cache_policy);
+ auto s = segment_v2::Segment::open(fs, seg_path, seg_id, rowset_id(),
_schema,
+ reader_options, &segment);
if (!s.ok()) {
LOG(WARNING) << "failed to open segment. " << seg_path << " under
rowset "
<< unique_id() << " : " << s.to_string();
@@ -157,7 +167,12 @@ Status BetaRowset::load_segments(int64_t seg_id_begin,
int64_t seg_id_end,
}
auto seg_path = segment_file_path(seg_id);
std::shared_ptr<segment_v2::Segment> segment;
- auto s = segment_v2::Segment::open(fs, seg_path, seg_id, rowset_id(),
_schema, &segment);
+ io::SegmentCachePathPolicy cache_policy;
+ cache_policy.set_cache_path(segment_cache_path(seg_id));
+ io::FileReaderOptions
reader_options(io::cache_type_from_string(config::file_cache_type),
+ cache_policy);
+ auto s = segment_v2::Segment::open(fs, seg_path, seg_id, rowset_id(),
_schema,
+ reader_options, &segment);
if (!s.ok()) {
LOG(WARNING) << "failed to open segment. " << seg_path << " under
rowset "
<< unique_id() << " : " << s.to_string();
@@ -402,7 +417,12 @@ bool BetaRowset::check_current_rowset_segment() {
for (int seg_id = 0; seg_id < num_segments(); ++seg_id) {
auto seg_path = segment_file_path(seg_id);
std::shared_ptr<segment_v2::Segment> segment;
- auto s = segment_v2::Segment::open(fs, seg_path, seg_id, rowset_id(),
_schema, &segment);
+ io::SegmentCachePathPolicy cache_policy;
+ cache_policy.set_cache_path(segment_cache_path(seg_id));
+ io::FileReaderOptions
reader_options(io::cache_type_from_string(config::file_cache_type),
+ cache_policy);
+ auto s = segment_v2::Segment::open(fs, seg_path, seg_id, rowset_id(),
_schema,
+ reader_options, &segment);
if (!s.ok()) {
LOG(WARNING) << "segment can not be opened. file=" << seg_path;
return false;
diff --git a/be/src/olap/rowset/beta_rowset.h b/be/src/olap/rowset/beta_rowset.h
index 5ac860eaa1..83652d4e57 100644
--- a/be/src/olap/rowset/beta_rowset.h
+++ b/be/src/olap/rowset/beta_rowset.h
@@ -63,6 +63,8 @@ public:
static std::string remote_tablet_path(int64_t tablet_id);
+ static std::string remote_tablet_meta_path(int64_t tablet_id, int64_t
replica_id);
+
Status split_range(const RowCursor& start_key, const RowCursor& end_key,
uint64_t request_block_row_count, size_t key_num,
std::vector<OlapTuple>* ranges) override;
diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp
b/be/src/olap/rowset/beta_rowset_writer.cpp
index 6b72b7c9c6..7e3fea581a 100644
--- a/be/src/olap/rowset/beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer.cpp
@@ -385,8 +385,10 @@ Status BetaRowsetWriter::_load_noncompacted_segments(
auto seg_path =
BetaRowset::segment_file_path(_context.rowset_dir,
_context.rowset_id, seg_id);
std::shared_ptr<segment_v2::Segment> segment;
+ io::FileReaderOptions
reader_options(io::cache_type_from_string(config::file_cache_type),
+ io::SegmentCachePathPolicy());
auto s = segment_v2::Segment::open(fs, seg_path, seg_id, rowset_id(),
- _context.tablet_schema, &segment);
+ _context.tablet_schema,
reader_options, &segment);
if (!s.ok()) {
LOG(WARNING) << "failed to open segment. " << seg_path << ":" <<
s.to_string();
return Status::Error<ROWSET_LOAD_FAILED>();
diff --git a/be/src/olap/rowset/segment_v2/segment.cpp
b/be/src/olap/rowset/segment_v2/segment.cpp
index 15ed949025..b99ea4cff6 100644
--- a/be/src/olap/rowset/segment_v2/segment.cpp
+++ b/be/src/olap/rowset/segment_v2/segment.cpp
@@ -46,9 +46,8 @@ using io::FileCacheManager;
Status Segment::open(io::FileSystemSPtr fs, const std::string& path, uint32_t
segment_id,
RowsetId rowset_id, TabletSchemaSPtr tablet_schema,
+ const io::FileReaderOptions& reader_options,
std::shared_ptr<Segment>* output) {
- io::FileReaderOptions
reader_options(io::cache_type_from_string(config::file_cache_type),
- io::SegmentCachePathPolicy());
io::FileReaderSPtr file_reader;
#ifndef BE_TEST
RETURN_IF_ERROR(fs->open_file(path, reader_options, &file_reader,
nullptr));
diff --git a/be/src/olap/rowset/segment_v2/segment.h
b/be/src/olap/rowset/segment_v2/segment.h
index 70d3450901..1ec4943cf2 100644
--- a/be/src/olap/rowset/segment_v2/segment.h
+++ b/be/src/olap/rowset/segment_v2/segment.h
@@ -63,6 +63,7 @@ class Segment : public std::enable_shared_from_this<Segment> {
public:
static Status open(io::FileSystemSPtr fs, const std::string& path,
uint32_t segment_id,
RowsetId rowset_id, TabletSchemaSPtr tablet_schema,
+ const io::FileReaderOptions& reader_options,
std::shared_ptr<Segment>* output);
~Segment();
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index a67ace80a5..c4b0f835aa 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -75,6 +75,7 @@ using std::nothrow;
using std::sort;
using std::string;
using std::vector;
+using io::FileSystemSPtr;
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(flush_bytes, MetricUnit::BYTES);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(flush_finish_count,
MetricUnit::OPERATIONS);
@@ -1438,6 +1439,10 @@ void Tablet::build_tablet_report_info(TTabletInfo*
tablet_info,
tablet_info->__set_is_in_memory(_tablet_meta->tablet_schema()->is_in_memory());
tablet_info->__set_replica_id(replica_id());
tablet_info->__set_remote_data_size(_tablet_meta->tablet_remote_size());
+ tablet_info->__set_is_cooldown(!_tablet_meta->storage_policy().empty());
+ if (tablet_info->is_cooldown) {
+
tablet_info->__set_cooldown_replica_id(_tablet_meta->cooldown_replica_id());
+ }
}
// should use this method to get a copy of current tablet meta
@@ -1598,6 +1603,8 @@ Status Tablet::create_initial_rowset(const int64_t
req_version) {
context.rowset_state = VISIBLE;
context.segments_overlap = OVERLAP_UNKNOWN;
context.tablet_schema = tablet_schema();
+ context.oldest_write_timestamp = UnixSeconds();
+ context.newest_write_timestamp = UnixSeconds();
res = create_rowset_writer(context, &rs_writer);
if (!res.ok()) {
@@ -1683,11 +1690,22 @@ Status Tablet::cooldown() {
LOG(WARNING) << "Failed to own cumu_compaction_lock. tablet=" <<
tablet_id();
return Status::Error<TRY_LOCK_FAILED>();
}
+
+ if (_tablet_meta->cooldown_replica_id() == _tablet_meta->replica_id()) {
+ RETURN_IF_ERROR(_cooldown_data());
+ } else {
+ RETURN_IF_ERROR(_follow_cooldowned_data());
+ }
+ return Status::OK();
+}
+
+Status Tablet::_cooldown_data() {
auto dest_fs = io::FileSystemMap::instance()->get(storage_policy());
if (!dest_fs) {
return Status::Error<UNINITIALIZED>();
}
DCHECK(dest_fs->type() == io::FileSystemType::S3);
+
auto old_rowset = pick_cooldown_rowset();
if (!old_rowset) {
LOG(WARNING) << "Cannot pick cooldown rowset in tablet " <<
tablet_id();
@@ -1717,27 +1735,125 @@ Status Tablet::cooldown() {
new_rowset_meta->set_resource_id(dest_fs->resource_id());
new_rowset_meta->set_fs(dest_fs);
new_rowset_meta->set_creation_time(time(nullptr));
+
+ // write remote tablet meta
+ TabletMetaPB remote_tablet_meta_pb;
+ _tablet_meta->to_meta_pb(true, &remote_tablet_meta_pb);
+ new_rowset_meta->to_rowset_pb(remote_tablet_meta_pb.add_rs_metas());
+ // upload rowset_meta to remote fs.
+ RETURN_IF_ERROR(_write_remote_tablet_meta(dest_fs, remote_tablet_meta_pb));
+
RowsetSharedPtr new_rowset;
RowsetFactory::create_rowset(_schema, _tablet_path, new_rowset_meta,
&new_rowset);
std::vector to_add {std::move(new_rowset)};
std::vector to_delete {std::move(old_rowset)};
- bool has_shutdown = false;
{
std::unique_lock meta_wlock(_meta_lock);
- has_shutdown = tablet_state() == TABLET_SHUTDOWN;
- if (!has_shutdown) {
+ if (tablet_state() != TABLET_SHUTDOWN) {
modify_rowsets(to_add, to_delete);
- _self_owned_remote_rowsets.insert(to_add.front());
save_meta();
}
}
- if (has_shutdown) {
- record_unused_remote_rowset(new_rowset_id, dest_fs->resource_id(),
- to_add.front()->num_segments());
- return Status::Aborted("tablet {} has shutdown", tablet_id());
+ return Status::OK();
+}
+
+Status Tablet::_read_remote_tablet_meta(FileSystemSPtr fs, TabletMetaPB*
tablet_meta_pb) {
+ std::string remote_meta_path =
+ BetaRowset::remote_tablet_meta_path(tablet_id(),
_tablet_meta->cooldown_replica_id());
+ bool exist = false;
+ RETURN_IF_ERROR(fs->exists(remote_meta_path, &exist));
+ if (exist) {
+ IOContext io_ctx;
+ io::FileReaderSPtr tablet_meta_reader;
+ RETURN_IF_ERROR(fs->open_file(remote_meta_path, &tablet_meta_reader,
&io_ctx));
+ if (tablet_meta_reader == nullptr) {
+ return Status::InternalError("tablet_meta_reader is null");
+ }
+ auto file_size = tablet_meta_reader->size();
+ size_t bytes_read;
+ uint8_t* buf = new uint8_t[file_size];
+ Slice slice(buf, file_size);
+ Status st = tablet_meta_reader->read_at(0, slice, io_ctx, &bytes_read);
+ if (!st.ok()) {
+ tablet_meta_reader->close();
+ return st;
+ }
+ tablet_meta_reader->close();
+ if (!tablet_meta_pb->ParseFromString(slice.to_string())) {
+ LOG(WARNING) << "parse tablet meta failed";
+ return Status::InternalError("parse tablet meta failed");
+ }
+ }
+ LOG(INFO) << "No tablet meta file founded, init needed. tablet_id: " <<
tablet_id();
+ return Status::InternalError("No tablet meta file founded, init needed.");
+}
+
+Status Tablet::_write_remote_tablet_meta(FileSystemSPtr fs, const
TabletMetaPB& tablet_meta_pb) {
+ std::string remote_meta_path =
+ BetaRowset::remote_tablet_meta_path(tablet_id(),
_tablet_meta->replica_id());
+ io::FileWriterPtr tablet_meta_writer;
+ RETURN_IF_ERROR(fs->create_file(remote_meta_path, &tablet_meta_writer));
+ if (tablet_meta_writer == nullptr) {
+ return Status::InternalError("tablet_meta_writer is null");
+ }
+ string value;
+ tablet_meta_pb.SerializeToString(&value);
+ uint8_t* buf = new uint8_t[value.size()];
+ memcpy(buf, value.c_str(), value.size());
+ Slice slice(buf, value.size());
+ Status st = tablet_meta_writer->appendv(&slice, 1);
+ if (!st.ok()) {
+ tablet_meta_writer->close();
+ return st;
}
+ tablet_meta_writer->close();
+ return Status::OK();
+}
+
+Status Tablet::_follow_cooldowned_data() {
+ auto dest_fs = io::FileSystemMap::instance()->get(storage_policy());
+ if (!dest_fs) {
+ return Status::InternalError("storage_policy doesn't exist: " +
storage_policy());
+ }
+ DCHECK(dest_fs->type() == io::FileSystemType::S3);
+ TabletMetaPB remote_tablet_meta_pb;
+ RETURN_IF_ERROR(_read_remote_tablet_meta(dest_fs, &remote_tablet_meta_pb));
+ int64_t max_version = -1;
+ for (auto& rowset_meta_pb : remote_tablet_meta_pb.rs_metas()) {
+ if (max_version < rowset_meta_pb.end_version()) {
+ max_version = rowset_meta_pb.end_version();
+ }
+ }
+
+ std::vector<RowsetSharedPtr> to_add;
+ std::vector<RowsetSharedPtr> to_delete;
+ for (auto& rowset_meta_pb : remote_tablet_meta_pb.rs_metas()) {
+ if (rowset_meta_pb.end_version() > max_version) {
+ continue;
+ }
+ RowsetMetaSharedPtr rowset_meta(new RowsetMeta());
+ rowset_meta->init_from_pb(rowset_meta_pb);
+ RowsetSharedPtr new_rowset;
+ RowsetFactory::create_rowset(_schema, _tablet_path, rowset_meta,
&new_rowset);
+ to_add.push_back(new_rowset);
+ }
+
+ {
+ std::shared_lock meta_rlock(_meta_lock);
+ for (const auto& it : _rs_version_map) {
+ auto& rs = it.second;
+ if (rs->end_version() <= max_version) {
+ to_delete.push_back(rs);
+ }
+ }
+ if (tablet_state() != TABLET_SHUTDOWN) {
+ modify_rowsets(to_add, to_delete);
+ save_meta();
+ }
+ }
+
return Status::OK();
}
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index c82fa461a1..e10b9628ac 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -379,6 +379,10 @@ private:
RowsetIdUnorderedSet* to_add,
RowsetIdUnorderedSet* to_del);
Status _load_rowset_segments(const RowsetSharedPtr& rowset,
std::vector<segment_v2::SegmentSharedPtr>*
segments);
+ Status _cooldown_data();
+ Status _follow_cooldowned_data();
+ Status _read_remote_tablet_meta(io::FileSystemSPtr fs, TabletMetaPB*
tablet_meta_pb);
+ Status _write_remote_tablet_meta(io::FileSystemSPtr fs, const
TabletMetaPB& tablet_meta_pb);
public:
static const int64_t K_INVALID_CUMULATIVE_POINT = -1;
diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp
index e3d4c6f3ff..936817bca6 100644
--- a/be/src/olap/tablet_meta.cpp
+++ b/be/src/olap/tablet_meta.cpp
@@ -253,6 +253,8 @@ TabletMeta::TabletMeta(const TabletMeta& b)
_in_restore_mode(b._in_restore_mode),
_preferred_rowset_type(b._preferred_rowset_type),
_storage_policy(b._storage_policy),
+ _cooldown_replica_id(b._cooldown_replica_id),
+ _cooldown_term(b._cooldown_term),
_enable_unique_key_merge_on_write(b._enable_unique_key_merge_on_write),
_delete_bitmap(b._delete_bitmap) {};
@@ -521,6 +523,8 @@ void TabletMeta::init_from_pb(const TabletMetaPB&
tablet_meta_pb) {
}
_storage_policy = tablet_meta_pb.storage_policy();
+ _cooldown_replica_id = -1;
+ _cooldown_term = -1;
if (tablet_meta_pb.has_enable_unique_key_merge_on_write()) {
_enable_unique_key_merge_on_write =
tablet_meta_pb.enable_unique_key_merge_on_write();
}
@@ -544,6 +548,10 @@ void TabletMeta::init_from_pb(const TabletMetaPB&
tablet_meta_pb) {
}
void TabletMeta::to_meta_pb(TabletMetaPB* tablet_meta_pb) {
+ to_meta_pb(false, tablet_meta_pb);
+}
+
+void TabletMeta::to_meta_pb(bool only_include_remote_rowset, TabletMetaPB*
tablet_meta_pb) {
tablet_meta_pb->set_table_id(table_id());
tablet_meta_pb->set_partition_id(partition_id());
tablet_meta_pb->set_tablet_id(tablet_id());
@@ -573,7 +581,9 @@ void TabletMeta::to_meta_pb(TabletMetaPB* tablet_meta_pb) {
}
for (auto& rs : _rs_metas) {
- rs->to_rowset_pb(tablet_meta_pb->add_rs_metas());
+ if ((only_include_remote_rowset && !rs->is_local()) ||
!only_include_remote_rowset) {
+ rs->to_rowset_pb(tablet_meta_pb->add_rs_metas());
+ }
}
for (auto rs : _stale_rs_metas) {
rs->to_rowset_pb(tablet_meta_pb->add_stale_rs_metas());
@@ -866,6 +876,12 @@ bool operator==(const TabletMeta& a, const TabletMeta& b) {
if (a._in_restore_mode != b._in_restore_mode) return false;
if (a._preferred_rowset_type != b._preferred_rowset_type) return false;
if (a._storage_policy != b._storage_policy) return false;
+ if (a._cooldown_replica_id != b._cooldown_replica_id) {
+ return false;
+ }
+ if (a._cooldown_term != b._cooldown_term) {
+ return false;
+ }
return true;
}
diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h
index 3655b71602..f287662ad2 100644
--- a/be/src/olap/tablet_meta.h
+++ b/be/src/olap/tablet_meta.h
@@ -114,6 +114,7 @@ public:
void init_rs_metas_fs(const io::FileSystemSPtr& fs);
void to_meta_pb(TabletMetaPB* tablet_meta_pb);
+ void to_meta_pb(bool only_include_remote_rowset, TabletMetaPB*
tablet_meta_pb);
void to_json(std::string* json_string, json2pb::Pb2JsonOptions& options);
uint32_t mem_size() const;
@@ -200,6 +201,18 @@ public:
_storage_policy = policy;
}
+ const int64_t cooldown_replica_id() const { return _cooldown_replica_id; }
+
+ void set_cooldown_replica_id_and_term(int64_t cooldown_replica_id, int64_t
cooldown_term) {
+ VLOG_NOTICE << "set tablet_id : " << _table_id << "
cooldown_replica_id from "
+ << _cooldown_replica_id << " to " << cooldown_replica_id
+ << ", cooldown_term from " << _cooldown_term << " to " <<
cooldown_term;
+ _cooldown_replica_id = cooldown_replica_id;
+ _cooldown_term = cooldown_term;
+ }
+
+ const int64_t cooldown_term() const { return _cooldown_term; }
+
static void init_column_from_tcolumn(uint32_t unique_id, const TColumn&
tcolumn,
ColumnPB* column);
@@ -243,6 +256,8 @@ private:
RowsetTypePB _preferred_rowset_type = BETA_ROWSET;
std::string _storage_policy;
+ int64_t _cooldown_replica_id = -1;
+ int64_t _cooldown_term = -1;
// For unique key data model, the feature Merge-on-Write will leverage a
primary
// key index and a delete-bitmap to mark duplicate keys as deleted in load
stage,
diff --git a/be/test/io/cache/remote_file_cache_test.cpp
b/be/test/io/cache/remote_file_cache_test.cpp
index 176e306a8a..1a1ad93750 100644
--- a/be/test/io/cache/remote_file_cache_test.cpp
+++ b/be/test/io/cache/remote_file_cache_test.cpp
@@ -141,7 +141,9 @@ protected:
EXPECT_NE("", writer.min_encoded_key().to_string());
EXPECT_NE("", writer.max_encoded_key().to_string());
- st = segment_v2::Segment::open(fs, path, 0, {}, query_schema, res);
+ io::FileReaderOptions reader_options(io::FileCachePolicy::NO_CACHE,
+ io::SegmentCachePathPolicy());
+ st = segment_v2::Segment::open(fs, path, 0, {}, query_schema,
reader_options, res);
EXPECT_TRUE(st.ok());
EXPECT_EQ(nrows, (*res)->num_rows());
}
diff --git a/be/test/tools/benchmark_tool.cpp b/be/test/tools/benchmark_tool.cpp
index 1bc3decb4b..b408570a35 100644
--- a/be/test/tools/benchmark_tool.cpp
+++ b/be/test/tools/benchmark_tool.cpp
@@ -363,7 +363,9 @@ public:
writer.finalize(&file_size, &index_size);
file_writer->close();
- Segment::open(fs, path, "", seg_id, {}, &_tablet_schema, res);
+ io::FileReaderOptions reader_options(io::FileCachePolicy::NO_CACHE,
+ io::SegmentCachePathPolicy());
+ Segment::open(fs, path, seg_id, {}, &_tablet_schema, reader_options,
res);
}
std::vector<std::vector<std::string>> generate_dataset(int rows_number) {
diff --git
a/fe/fe-common/src/main/java/org/apache/doris/common/FeMetaVersion.java
b/fe/fe-common/src/main/java/org/apache/doris/common/FeMetaVersion.java
index 1e28dd1099..2ef346a4f3 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/FeMetaVersion.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/FeMetaVersion.java
@@ -48,8 +48,10 @@ public final class FeMetaVersion {
public static final int VERSION_113 = 113;
// add new recover info for recover ddl
public static final int VERSION_114 = 114;
+ // change replica meta to json
+ public static final int VERSION_115 = 115;
// note: when increment meta version, should assign the latest version to
VERSION_CURRENT
- public static final int VERSION_CURRENT = VERSION_114;
+ public static final int VERSION_CURRENT = VERSION_115;
// all logs meta version should >= the minimum version, so that we could
remove many if clause, for example
// if (FE_METAVERSION < VERSION_94) ...
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java
index f6624c0b81..77568a1109 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java
@@ -376,9 +376,10 @@ public class MaterializedViewHandler extends AlterHandler {
// index state is SHADOW
MaterializedIndex mvIndex = new MaterializedIndex(mvIndexId,
IndexState.SHADOW);
MaterializedIndex baseIndex = partition.getIndex(baseIndexId);
- TabletMeta mvTabletMeta = new TabletMeta(dbId, tableId,
partitionId, mvIndexId, mvSchemaHash, medium);
short replicationNum =
olapTable.getPartitionInfo().getReplicaAllocation(partitionId).getTotalReplicaNum();
for (Tablet baseTablet : baseIndex.getTablets()) {
+ TabletMeta mvTabletMeta = new TabletMeta(
+ dbId, tableId, partitionId, mvIndexId, mvSchemaHash,
medium, -1, 0);
long baseTabletId = baseTablet.getId();
long mvTabletId = idGeneratorBuffer.getNextId();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
index c600c15f6d..78e17db6dc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
@@ -654,10 +654,10 @@ public class RollupJobV2 extends AlterJobV2 implements
GsonPostProcessable {
for (Long partitionId : partitionIdToRollupIndex.keySet()) {
MaterializedIndex rollupIndex =
partitionIdToRollupIndex.get(partitionId);
TStorageMedium medium =
tbl.getPartitionInfo().getDataProperty(partitionId).getStorageMedium();
- TabletMeta rollupTabletMeta = new TabletMeta(dbId, tableId,
partitionId, rollupIndexId,
- rollupSchemaHash, medium);
for (Tablet rollupTablet : rollupIndex.getTablets()) {
+ TabletMeta rollupTabletMeta = new TabletMeta(dbId, tableId,
partitionId, rollupIndexId,
+ rollupSchemaHash, medium, -1, 0);
invertedIndex.addTablet(rollupTablet.getId(),
rollupTabletMeta);
for (Replica rollupReplica : rollupTablet.getReplicas()) {
invertedIndex.addReplica(rollupTablet.getId(),
rollupReplica);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
index d009aae41e..8429d9e975 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
@@ -1486,11 +1486,11 @@ public class SchemaChangeHandler extends AlterHandler {
// index state is SHADOW
MaterializedIndex shadowIndex = new
MaterializedIndex(shadowIndexId, IndexState.SHADOW);
MaterializedIndex originIndex =
partition.getIndex(originIndexId);
- TabletMeta shadowTabletMeta = new TabletMeta(dbId, tableId,
partitionId, shadowIndexId, newSchemaHash,
- medium);
ReplicaAllocation replicaAlloc =
olapTable.getPartitionInfo().getReplicaAllocation(partitionId);
Short totalReplicaNum = replicaAlloc.getTotalReplicaNum();
for (Tablet originTablet : originIndex.getTablets()) {
+ TabletMeta shadowTabletMeta = new TabletMeta(dbId,
tableId, partitionId, shadowIndexId,
+ newSchemaHash, medium, -1, 0);
long originTabletId = originTablet.getId();
long shadowTabletId = idGeneratorBuffer.getNextId();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
index 51321cac8e..e3a303b2ba 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
@@ -742,10 +742,10 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
MaterializedIndex shadowIndex = cell.getValue();
TStorageMedium medium =
olapTable.getPartitionInfo().getDataProperty(partitionId).getStorageMedium();
- TabletMeta shadowTabletMeta = new TabletMeta(dbId, tableId,
partitionId, shadowIndexId,
-
indexSchemaVersionAndHashMap.get(shadowIndexId).schemaHash, medium);
for (Tablet shadownTablet : shadowIndex.getTablets()) {
+ TabletMeta shadowTabletMeta = new TabletMeta(dbId,
tableId, partitionId, shadowIndexId,
+
indexSchemaVersionAndHashMap.get(shadowIndexId).schemaHash, medium, -1, 0);
invertedIndex.addTablet(shadownTablet.getId(),
shadowTabletMeta);
for (Replica shadowReplica : shadownTablet.getReplicas()) {
invertedIndex.addReplica(shadownTablet.getId(),
shadowReplica);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
index dad2bf3b11..154e86aa15 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
@@ -985,9 +985,10 @@ public class RestoreJob extends AbstractJob {
double bfFpp = localTbl.getBfFpp();
for (MaterializedIndex restoredIdx :
restorePart.getMaterializedIndices(IndexExtState.VISIBLE)) {
MaterializedIndexMeta indexMeta =
localTbl.getIndexMetaByIndexId(restoredIdx.getId());
- TabletMeta tabletMeta = new TabletMeta(db.getId(),
localTbl.getId(), restorePart.getId(),
- restoredIdx.getId(), indexMeta.getSchemaHash(),
TStorageMedium.HDD);
for (Tablet restoreTablet : restoredIdx.getTablets()) {
+ TabletMeta tabletMeta = new TabletMeta(db.getId(),
localTbl.getId(), restorePart.getId(),
+ restoredIdx.getId(), indexMeta.getSchemaHash(),
TStorageMedium.HDD,
+ restoreTablet.getCooldownReplicaId(),
restoreTablet.getCooldownTerm());
Env.getCurrentInvertedIndex().addTablet(restoreTablet.getId(),
tabletMeta);
for (Replica restoreReplica : restoreTablet.getReplicas()) {
Env.getCurrentInvertedIndex().addReplica(restoreTablet.getId(), restoreReplica);
@@ -1172,9 +1173,10 @@ public class RestoreJob extends AbstractJob {
// modify tablet inverted index
for (MaterializedIndex restoreIdx :
restorePart.getMaterializedIndices(IndexExtState.VISIBLE)) {
int schemaHash =
localTbl.getSchemaHashByIndexId(restoreIdx.getId());
- TabletMeta tabletMeta = new TabletMeta(db.getId(),
localTbl.getId(), restorePart.getId(),
- restoreIdx.getId(), schemaHash, TStorageMedium.HDD);
for (Tablet restoreTablet : restoreIdx.getTablets()) {
+ TabletMeta tabletMeta = new TabletMeta(db.getId(),
localTbl.getId(), restorePart.getId(),
+ restoreIdx.getId(), schemaHash,
TStorageMedium.HDD, restoreTablet.getCooldownReplicaId(),
+ restoreTablet.getCooldownTerm());
Env.getCurrentInvertedIndex().addTablet(restoreTablet.getId(), tabletMeta);
for (Replica restoreReplica : restoreTablet.getReplicas())
{
Env.getCurrentInvertedIndex().addReplica(restoreTablet.getId(), restoreReplica);
@@ -1204,9 +1206,10 @@ public class RestoreJob extends AbstractJob {
for (Partition restorePart : olapRestoreTbl.getPartitions()) {
for (MaterializedIndex restoreIdx :
restorePart.getMaterializedIndices(IndexExtState.VISIBLE)) {
int schemaHash =
olapRestoreTbl.getSchemaHashByIndexId(restoreIdx.getId());
- TabletMeta tabletMeta = new TabletMeta(db.getId(),
restoreTbl.getId(), restorePart.getId(),
- restoreIdx.getId(), schemaHash,
TStorageMedium.HDD);
for (Tablet restoreTablet : restoreIdx.getTablets()) {
+ TabletMeta tabletMeta = new TabletMeta(db.getId(),
restoreTbl.getId(), restorePart.getId(),
+ restoreIdx.getId(), schemaHash,
TStorageMedium.HDD,
+ restoreTablet.getCooldownReplicaId(),
restoreTablet.getCooldownTerm());
Env.getCurrentInvertedIndex().addTablet(restoreTablet.getId(), tabletMeta);
for (Replica restoreReplica :
restoreTablet.getReplicas()) {
Env.getCurrentInvertedIndex().addReplica(restoreTablet.getId(), restoreReplica);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java
index 011df7eb15..ec2588ae2e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java
@@ -839,8 +839,9 @@ public class CatalogRecycleBin extends MasterDaemon
implements Writable {
for (MaterializedIndex index :
partition.getMaterializedIndices(IndexExtState.ALL)) {
long indexId = index.getId();
int schemaHash = olapTable.getSchemaHashByIndexId(indexId);
- TabletMeta tabletMeta = new TabletMeta(dbId, tableId,
partitionId, indexId, schemaHash, medium);
for (Tablet tablet : index.getTablets()) {
+ TabletMeta tabletMeta = new TabletMeta(dbId, tableId,
partitionId, indexId, schemaHash, medium,
+ tablet.getCooldownReplicaId(),
tablet.getCooldownTerm());
long tabletId = tablet.getId();
invertedIndex.addTablet(tabletId, tabletMeta);
for (Replica replica : tablet.getReplicas()) {
@@ -891,8 +892,9 @@ public class CatalogRecycleBin extends MasterDaemon
implements Writable {
for (MaterializedIndex index :
partition.getMaterializedIndices(IndexExtState.ALL)) {
long indexId = index.getId();
int schemaHash = olapTable.getSchemaHashByIndexId(indexId);
- TabletMeta tabletMeta = new TabletMeta(dbId, tableId,
partitionId, indexId, schemaHash, medium);
for (Tablet tablet : index.getTablets()) {
+ TabletMeta tabletMeta = new TabletMeta(dbId, tableId,
partitionId, indexId, schemaHash, medium,
+ tablet.getCooldownReplicaId(),
tablet.getCooldownTerm());
long tabletId = tablet.getId();
invertedIndex.addTablet(tabletId, tabletMeta);
for (Replica replica : tablet.getReplicas()) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 83bde25845..112360a3b3 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -121,6 +121,7 @@ import org.apache.doris.common.util.SmallFileMgr;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.common.util.Util;
import org.apache.doris.consistency.ConsistencyChecker;
+import org.apache.doris.cooldown.CooldownHandler;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.datasource.CatalogMgr;
import org.apache.doris.datasource.EsExternalCatalog;
@@ -317,6 +318,7 @@ public class Env {
private DeleteHandler deleteHandler;
private DbUsedDataQuotaInfoCollector dbUsedDataQuotaInfoCollector;
private PartitionInMemoryInfoCollector partitionInMemoryInfoCollector;
+ private CooldownHandler cooldownHandler;
private MetastoreEventsProcessor metastoreEventsProcessor;
private MasterDaemon labelCleaner; // To clean old LabelInfo,
ExportJobInfos
@@ -551,6 +553,7 @@ public class Env {
this.deleteHandler = new DeleteHandler();
this.dbUsedDataQuotaInfoCollector = new DbUsedDataQuotaInfoCollector();
this.partitionInMemoryInfoCollector = new
PartitionInMemoryInfoCollector();
+ this.cooldownHandler = new CooldownHandler();
this.metastoreEventsProcessor = new MetastoreEventsProcessor();
this.replayedJournalId = new AtomicLong(0L);
@@ -1399,6 +1402,9 @@ public class Env {
dbUsedDataQuotaInfoCollector.start();
// start daemon thread to update global partition in memory
information periodically
partitionInMemoryInfoCollector.start();
+ if (Config.cooldown_single_remote_file) {
+ cooldownHandler.start();
+ }
streamLoadRecordMgr.start();
getInternalCatalog().getIcebergTableCreationRecordMgr().start();
new InternalSchemaInitializer().start();
@@ -1781,6 +1787,12 @@ public class Env {
return checksum;
}
+ public long loadCooldownJob(DataInputStream dis, long checksum) throws
IOException {
+ cooldownHandler.readField(dis);
+ LOG.info("finished replay loadCooldownJob from image");
+ return checksum;
+ }
+
public long loadAlterJob(DataInputStream dis, long checksum) throws
IOException {
long newChecksum = checksum;
for (JobType type : JobType.values()) {
@@ -2231,6 +2243,14 @@ public class Env {
return checksum;
}
+ /**
+ * Save CooldownJob.
+ */
+ public long saveCooldownJob(CountingDataOutputStream out, long checksum)
throws IOException {
+ Env.getCurrentEnv().getCooldownHandler().write(out);
+ return checksum;
+ }
+
public long saveMTMVJobManager(CountingDataOutputStream out, long
checksum) throws IOException {
if (Config.enable_mtmv_scheduler_framework) {
Env.getCurrentEnv().getMTMVJobManager().write(out, checksum);
@@ -3438,6 +3458,10 @@ public class Env {
return (MaterializedViewHandler)
this.alter.getMaterializedViewHandler();
}
+ public CooldownHandler getCooldownHandler() {
+ return cooldownHandler;
+ }
+
public SystemHandler getClusterHandler() {
return (SystemHandler) this.alter.getClusterHandler();
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
index fbe60b74c8..7f594afa7d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
@@ -22,8 +22,11 @@ import org.apache.doris.clone.TabletSchedCtx;
import org.apache.doris.clone.TabletSchedCtx.Priority;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.Pair;
+import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
+import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.resource.Tag;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
@@ -87,6 +90,10 @@ public class Tablet extends MetaObject implements Writable {
private long checkedVersionHash;
@SerializedName(value = "isConsistent")
private boolean isConsistent;
+ @SerializedName(value = "cooldownReplicaId")
+ private long cooldownReplicaId;
+ @SerializedName(value = "cooldownTerm")
+ private long cooldownTerm;
// last time that the tablet checker checks this tablet.
// no need to persist
@@ -136,6 +143,22 @@ public class Tablet extends MetaObject implements Writable
{
return isConsistent;
}
+ public long getCooldownReplicaId() {
+ return cooldownReplicaId;
+ }
+
+ public void setCooldownReplicaId(long cooldownReplicaId) {
+ this.cooldownReplicaId = cooldownReplicaId;
+ }
+
+ public long getCooldownTerm() {
+ return cooldownTerm;
+ }
+
+ public void setCooldownTerm(long cooldownTerm) {
+ this.cooldownTerm = cooldownTerm;
+ }
+
private boolean deleteRedundantReplica(long backendId, long version) {
boolean delete = false;
boolean hasBackend = false;
@@ -326,18 +349,8 @@ public class Tablet extends MetaObject implements Writable
{
@Override
public void write(DataOutput out) throws IOException {
- super.write(out);
-
- out.writeLong(id);
- int replicaCount = replicas.size();
- out.writeInt(replicaCount);
- for (int i = 0; i < replicaCount; ++i) {
- replicas.get(i).write(out);
- }
-
- out.writeLong(checkedVersion);
- out.writeLong(checkedVersionHash);
- out.writeBoolean(isConsistent);
+ String json = GsonUtils.GSON.toJson(this);
+ Text.writeString(out, json);
}
@Override
@@ -359,6 +372,11 @@ public class Tablet extends MetaObject implements Writable
{
}
public static Tablet read(DataInput in) throws IOException {
+ if (Env.getCurrentEnvJournalVersion() >= FeMetaVersion.VERSION_115) {
+ String json = Text.readString(in);
+ return GsonUtils.GSON.fromJson(json, Tablet.class);
+ }
+
Tablet tablet = new Tablet();
tablet.readFields(in);
return tablet;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
index 73fff8fd70..98fcf380f6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
@@ -46,6 +46,7 @@ import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -65,7 +66,7 @@ public class TabletInvertedIndex {
public static final int NOT_EXIST_VALUE = -1;
public static final TabletMeta NOT_EXIST_TABLET_META = new
TabletMeta(NOT_EXIST_VALUE, NOT_EXIST_VALUE,
- NOT_EXIST_VALUE, NOT_EXIST_VALUE, NOT_EXIST_VALUE,
TStorageMedium.HDD);
+ NOT_EXIST_VALUE, NOT_EXIST_VALUE, NOT_EXIST_VALUE,
TStorageMedium.HDD, -1, -1);
private StampedLock lock = new StampedLock();
@@ -124,7 +125,8 @@ public class TabletInvertedIndex {
Map<Long, ListMultimap<Long,
TPartitionVersionInfo>> transactionsToPublish,
ListMultimap<Long, Long> transactionsToClear,
ListMultimap<Long, Long> tabletRecoveryMap,
- List<Triple<Long, Integer, Boolean>>
tabletToInMemory) {
+ List<Triple<Long, Integer, Boolean>>
tabletToInMemory,
+ Map<Long, TabletMeta> syncCooldownTabletMap) {
long stamp = readLock();
long start = System.currentTimeMillis();
try {
@@ -186,6 +188,11 @@ public class TabletInvertedIndex {
}
}
+ if (Config.cooldown_single_remote_file
+ && needChangeCooldownConf(tabletMeta,
backendTabletInfo)) {
+
syncCooldownTabletMap.put(backendTabletInfo.getTabletId(), tabletMeta);
+ }
+
long partitionId = tabletMeta.getPartitionId();
if (!Config.disable_storage_medium_check) {
// check if need migration
@@ -328,6 +335,50 @@ public class TabletInvertedIndex {
return false;
}
+ private boolean needChangeCooldownConf(TabletMeta tabletMeta, TTabletInfo
beTabletInfo) {
+ if (!beTabletInfo.isIsCooldown()) {
+ return false;
+ }
+ // check cooldown type in fe and be, they need to be the same.
+ if (tabletMeta.getCooldownReplicaId() !=
beTabletInfo.getCooldownReplicaId()) {
+ LOG.warn("cooldownReplicaId is wrong for tablet: {}, Fe: {}, Be:
{}", beTabletInfo.getTabletId(),
+ tabletMeta.getCooldownReplicaId(),
beTabletInfo.getCooldownReplicaId());
+ return true;
+ }
+ // check cooldown type in one tablet, One UPLOAD_DATA is needed in the
replicas.
+ long stamp = readLock();
+ try {
+ boolean replicaInvalid = true;
+ Map<Long, Replica> replicaMap =
replicaMetaTable.row(beTabletInfo.getTabletId());
+ for (Map.Entry<Long, Replica> entry : replicaMap.entrySet()) {
+ if (entry.getValue().getId() ==
beTabletInfo.getCooldownReplicaId()) {
+ replicaInvalid = false;
+ break;
+ }
+ }
+ if (replicaInvalid) {
+ return true;
+ }
+ } finally {
+ readUnlock(stamp);
+ }
+ return false;
+ }
+
+ public List<Replica> getReplicas(Long tabletId) {
+ List<Replica> replicas = new LinkedList<>();
+ long stamp = readLock();
+ try {
+ Map<Long, Replica> replicaMap = replicaMetaTable.row(tabletId);
+ for (Map.Entry<Long, Replica> entry : replicaMap.entrySet()) {
+ replicas.add(entry.getValue());
+ }
+ } finally {
+ readUnlock(stamp);
+ }
+ return replicas;
+ }
+
/**
* Be will set `used' to false for bad replicas and `version_miss' to true
for replicas with hole
* in their version chain. In either case, those replicas need to be fixed
by TabletScheduler.
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletMeta.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletMeta.java
index 2679977b6d..bfaaba3eb2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletMeta.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletMeta.java
@@ -37,10 +37,14 @@ public class TabletMeta {
private TStorageMedium storageMedium;
+ private long cooldownReplicaId;
+
+ private long cooldownTerm;
+
private ReentrantReadWriteLock lock;
public TabletMeta(long dbId, long tableId, long partitionId, long indexId,
int schemaHash,
- TStorageMedium storageMedium) {
+ TStorageMedium storageMedium, long cooldownReplicaId, long
cooldownTerm) {
this.dbId = dbId;
this.tableId = tableId;
this.partitionId = partitionId;
@@ -50,6 +54,8 @@ public class TabletMeta {
this.newSchemaHash = -1;
this.storageMedium = storageMedium;
+ this.cooldownReplicaId = cooldownReplicaId;
+ this.cooldownTerm = cooldownTerm;
lock = new ReentrantReadWriteLock();
}
@@ -78,6 +84,22 @@ public class TabletMeta {
this.storageMedium = storageMedium;
}
+ public long getCooldownReplicaId() {
+ return cooldownReplicaId;
+ }
+
+ public void setCooldownReplicaId(long cooldownReplicaId) {
+ this.cooldownReplicaId = cooldownReplicaId;
+ }
+
+ public long getCooldownTerm() {
+ return cooldownTerm;
+ }
+
+ public void setCooldownTerm(long cooldownTerm) {
+ this.cooldownTerm = cooldownTerm;
+ }
+
public int getOldSchemaHash() {
lock.readLock().lock();
try {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
index 71cb5fd188..5e03365b52 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
@@ -802,6 +802,13 @@ public class Config extends ConfigBase {
*/
@ConfField(mutable = true, masterOnly = true)
public static int alter_table_timeout_second = 86400 * 30; // 1month
+ /**
+ * Maximal timeout of push cooldown conf request.
+ */
+ @ConfField(mutable = false, masterOnly = true)
+ public static boolean cooldown_single_remote_file = false;
+ @ConfField(mutable = false, masterOnly = true)
+ public static int push_cooldown_conf_timeout_second = 600; // 10 min
/**
* If a backend is down for *max_backend_down_time_second*, a BACKEND_DOWN
event will be triggered.
* Do not set this if you know what you are doing.
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownConf.java
b/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownConf.java
new file mode 100644
index 0000000000..9275a2633c
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownConf.java
@@ -0,0 +1,130 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cooldown;
+
+import org.apache.doris.common.io.Text;
+import org.apache.doris.common.io.Writable;
+import org.apache.doris.persist.gson.GsonUtils;
+
+import com.google.gson.annotations.SerializedName;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * This class represents the olap replica related metadata.
+ */
+public class CooldownConf implements Writable {
+ private static final Logger LOG = LogManager.getLogger(CooldownConf.class);
+
+ @SerializedName(value = "dbId")
+ protected long dbId;
+ @SerializedName(value = "tableId")
+ protected long tableId;
+ @SerializedName(value = "partitionId")
+ protected long partitionId;
+ @SerializedName(value = "indexId")
+ protected long indexId;
+ @SerializedName(value = "tabletId")
+ protected long tabletId;
+ @SerializedName(value = "cooldownReplicaId")
+ protected long cooldownReplicaId;
+ @SerializedName(value = "cooldownTerm")
+ protected long cooldownTerm;
+
+ public CooldownConf(long dbId, long tableId, long partitionId, long
indexId, long tabletId, long cooldownReplicaId,
+ long cooldownTerm) {
+ this.dbId = dbId;
+ this.tableId = tableId;
+ this.partitionId = partitionId;
+ this.indexId = indexId;
+ this.tabletId = tabletId;
+ this.cooldownReplicaId = cooldownReplicaId;
+ this.cooldownTerm = cooldownTerm;
+ }
+
+ public long getDbId() {
+ return dbId;
+ }
+
+ public void setDbId(long dbId) {
+ this.dbId = dbId;
+ }
+
+ public long getTableId() {
+ return tableId;
+ }
+
+ public void setTableId(long tableId) {
+ this.tableId = tableId;
+ }
+
+ public long getPartitionId() {
+ return partitionId;
+ }
+
+ public void setPartitionId(long partitionId) {
+ this.partitionId = partitionId;
+ }
+
+ public long getIndexId() {
+ return indexId;
+ }
+
+ public void setIndexId(long indexId) {
+ this.indexId = indexId;
+ }
+
+ public long getTabletId() {
+ return tabletId;
+ }
+
+ public void setTabletId(long tabletId) {
+ this.tabletId = tabletId;
+ }
+
+ public long getCooldownReplicaId() {
+ return cooldownReplicaId;
+ }
+
+ public void setCooldownReplicaId(long cooldownReplicaId) {
+ this.cooldownReplicaId = cooldownReplicaId;
+ }
+
+ public long getCooldownTerm() {
+ return cooldownTerm;
+ }
+
+ public void setCooldownTerm(long cooldownTerm) {
+ this.cooldownTerm = cooldownTerm;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ String json = GsonUtils.GSON.toJson(this);
+ Text.writeString(out, json);
+ }
+
+ public static CooldownJob read(DataInput in) throws IOException {
+ String json = Text.readString(in);
+ return GsonUtils.GSON.fromJson(json, CooldownJob.class);
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownException.java
b/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownException.java
new file mode 100644
index 0000000000..c0ec6bd5e7
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownException.java
@@ -0,0 +1,32 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cooldown;
+
+import org.apache.doris.common.DdlException;
+
+/*
+ * This exception will be thrown when the cooldown job(v2) running failed.
+ */
+public class CooldownException extends DdlException {
+
+ private static final long serialVersionUID = 4844951783432954268L;
+
+ public CooldownException(String msg) {
+ super(msg);
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownHandler.java
new file mode 100644
index 0000000000..e8c5ba6c9a
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownHandler.java
@@ -0,0 +1,185 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cooldown;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.Replica;
+import org.apache.doris.catalog.TabletMeta;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.ThreadPoolManager;
+import org.apache.doris.common.util.MasterDaemon;
+
+import com.google.common.collect.Maps;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ThreadPoolExecutor;
+
+public class CooldownHandler extends MasterDaemon {
+ private static final Logger LOG =
LogManager.getLogger(CooldownHandler.class);
+ private static final int MAX_ACTIVE_COOLDOWN_JOB_SIZE = 10;
+
+ private static final int MAX_RUNABLE_COOLDOWN_JOB_SIZE = 100;
+
+ private static final int MAX_TABLET_PER_JOB = 100;
+
+ private static final long timeoutMs = 1000L *
Config.push_cooldown_conf_timeout_second;
+
+ // jobId -> CooldownJob, it is used to hold CooldownJob which is used to
sent conf to be.
+ private final Map<Long, CooldownJob> runableCooldownJobs =
Maps.newConcurrentMap();
+ private final Map<Long, Boolean> resetingTablet = Maps.newConcurrentMap();
+ // jobId -> CooldownJob,
+ public final Map<Long, CooldownJob> activeCooldownJobs =
Maps.newConcurrentMap();
+
+ public final ThreadPoolExecutor cooldownThreadPool =
ThreadPoolManager.newDaemonCacheThreadPool(
+ MAX_ACTIVE_COOLDOWN_JOB_SIZE, "cooldown-pool", true);
+
+ // syncCooldownTabletMap: tabletId -> TabletMeta
+ public void handleCooldownConf(Map<Long, TabletMeta>
syncCooldownTabletMap) {
+ List<CooldownConf> cooldownConfList = new LinkedList<>();
+ for (Map.Entry<Long, TabletMeta> entry :
syncCooldownTabletMap.entrySet()) {
+ if (runableCooldownJobs.size() >= MAX_RUNABLE_COOLDOWN_JOB_SIZE) {
+ return;
+ }
+ Long tabletId = entry.getKey();
+ TabletMeta tabletMeta = entry.getValue();
+ if (resetingTablet.containsKey(tabletId)) {
+ continue;
+ }
+ long cooldownReplicaId = -1;
+ List<Replica> replicas =
Env.getCurrentInvertedIndex().getReplicas(tabletId);
+ if (replicas.size() == 0) {
+ continue;
+ }
+ for (Replica replica : replicas) {
+ if (tabletMeta.getCooldownReplicaId() == replica.getId()) {
+ cooldownReplicaId = tabletMeta.getCooldownReplicaId();
+ break;
+ }
+ }
+ long cooldownTerm = tabletMeta.getCooldownTerm();
+ if (cooldownReplicaId == -1) {
+ Random rand = new Random(System.currentTimeMillis());
+ int index = rand.nextInt(replicas.size());
+ cooldownReplicaId = replicas.get(index).getId();
+ ++cooldownTerm;
+ }
+ CooldownConf cooldownConf = new CooldownConf(tabletMeta.getDbId(),
tabletMeta.getTableId(),
+ tabletMeta.getPartitionId(), tabletMeta.getIndexId(),
tabletId, cooldownReplicaId, cooldownTerm);
+ cooldownConfList.add(cooldownConf);
+ if (cooldownConfList.size() >= MAX_TABLET_PER_JOB) {
+ long jobId = Env.getCurrentEnv().getNextId();
+ CooldownJob cooldownJob = new CooldownJob(jobId,
cooldownConfList, timeoutMs);
+ runableCooldownJobs.put(jobId, cooldownJob);
+ for (CooldownConf conf : cooldownConfList) {
+ resetingTablet.put(conf.getTabletId(), true);
+ }
+ cooldownConfList = new LinkedList<>();
+ }
+ }
+ if (cooldownConfList.size() > 0) {
+ long jobId = Env.getCurrentEnv().getNextId();
+ CooldownJob cooldownJob = new CooldownJob(jobId, cooldownConfList,
timeoutMs);
+ runableCooldownJobs.put(jobId, cooldownJob);
+ for (CooldownConf conf : cooldownConfList) {
+ resetingTablet.put(conf.getTabletId(), true);
+ }
+ }
+ }
+
+ @Override
+ protected void runAfterCatalogReady() {
+ clearFinishedOrCancelledCooldownJob();
+ runableCooldownJobs.values().forEach(cooldownJob -> {
+ if (!cooldownJob.isDone() &&
!activeCooldownJobs.containsKey(cooldownJob.getJobId())
+ && activeCooldownJobs.size() <
MAX_ACTIVE_COOLDOWN_JOB_SIZE) {
+ if (FeConstants.runningUnitTest) {
+ cooldownJob.run();
+ } else {
+ cooldownThreadPool.submit(() -> {
+ if
(activeCooldownJobs.putIfAbsent(cooldownJob.getJobId(), cooldownJob) == null) {
+ try {
+ cooldownJob.run();
+ } finally {
+
activeCooldownJobs.remove(cooldownJob.getJobId());
+ }
+ }
+ });
+ }
+ }
+ });
+ }
+
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(runableCooldownJobs.size());
+ for (CooldownJob cooldownJob : runableCooldownJobs.values()) {
+ cooldownJob.write(out);
+ }
+ }
+
+ public void readField(DataInput in) throws IOException {
+ int size = in.readInt();
+ for (int i = 0; i < size; i++) {
+ CooldownJob cooldownJob = CooldownJob.read(in);
+ replayCooldownJob(cooldownJob);
+ }
+ }
+
+ public void replayCooldownJob(CooldownJob cooldownJob) {
+ CooldownJob replayCooldownJob = null;
+ if (!runableCooldownJobs.containsKey(cooldownJob.getJobId())) {
+ replayCooldownJob = new CooldownJob(cooldownJob.jobId,
cooldownJob.getCooldownConfList(),
+ cooldownJob.timeoutMs);
+ runableCooldownJobs.put(cooldownJob.getJobId(), replayCooldownJob);
+ for (CooldownConf cooldownConf :
cooldownJob.getCooldownConfList()) {
+ resetingTablet.put(cooldownConf.getTabletId(), true);
+ }
+ } else {
+ replayCooldownJob =
runableCooldownJobs.get(cooldownJob.getJobId());
+ }
+ replayCooldownJob.replay(cooldownJob);
+ if (replayCooldownJob.isDone()) {
+ runableCooldownJobs.remove(cooldownJob.getJobId());
+ for (CooldownConf cooldownConf :
cooldownJob.getCooldownConfList()) {
+ resetingTablet.remove(cooldownConf.getTabletId());
+ }
+ }
+ }
+
+ private void clearFinishedOrCancelledCooldownJob() {
+ Iterator<Map.Entry<Long, CooldownJob>> iterator =
runableCooldownJobs.entrySet().iterator();
+ while (iterator.hasNext()) {
+ CooldownJob cooldownJob = iterator.next().getValue();
+ if (cooldownJob.isDone()) {
+ iterator.remove();
+ for (CooldownConf cooldownConf :
cooldownJob.getCooldownConfList()) {
+ resetingTablet.remove(cooldownConf.getTabletId());
+ }
+ }
+ }
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownJob.java
b/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownJob.java
new file mode 100644
index 0000000000..4c7eb168b6
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownJob.java
@@ -0,0 +1,397 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cooldown;
+
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.MaterializedIndex;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.Replica;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.catalog.Tablet;
+import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.common.io.Text;
+import org.apache.doris.common.io.Writable;
+import org.apache.doris.persist.gson.GsonUtils;
+import org.apache.doris.task.AgentBatchTask;
+import org.apache.doris.task.AgentTask;
+import org.apache.doris.task.AgentTaskExecutor;
+import org.apache.doris.task.AgentTaskQueue;
+import org.apache.doris.task.PushCooldownConfTask;
+import org.apache.doris.thrift.TTaskType;
+
+import com.google.common.base.Preconditions;
+import com.google.gson.annotations.SerializedName;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+public class CooldownJob implements Writable {
+ private static final Logger LOG = LogManager.getLogger(CooldownJob.class);
+
+ public enum JobState {
+ PENDING, // Job is created
+ SEND_CONF, // send cooldown task to BE.
+ RUNNING, // cooldown tasks are sent to BE, and waiting for them
finished.
+ FINISHED, // job is done
+ CANCELLED; // job is cancelled(failed or be cancelled by user)
+
+ public boolean isFinalState() {
+ return this == CooldownJob.JobState.FINISHED || this ==
CooldownJob.JobState.CANCELLED;
+ }
+ }
+
+ @SerializedName(value = "jobId")
+ protected long jobId;
+ @SerializedName(value = "jobState")
+ protected CooldownJob.JobState jobState;
+ @SerializedName(value = "cooldownConfList")
+ protected List<CooldownConf> cooldownConfList;
+
+ @SerializedName(value = "errMsg")
+ protected String errMsg = "";
+ @SerializedName(value = "createTimeMs")
+ protected long createTimeMs = -1;
+ @SerializedName(value = "finishedTimeMs")
+ protected long finishedTimeMs = -1;
+ @SerializedName(value = "timeoutMs")
+ protected long timeoutMs = -1;
+
+ public long getJobId() {
+ return jobId;
+ }
+
+ public JobState getJobState() {
+ return jobState;
+ }
+
+ public List<CooldownConf> getCooldownConfList() {
+ return cooldownConfList;
+ }
+
+ public long getTimeoutMs() {
+ return timeoutMs;
+ }
+
+ private AgentBatchTask cooldownBatchTask = new AgentBatchTask();
+
+ public CooldownJob(long jobId, List<CooldownConf> cooldownConfList, long
timeoutMs) {
+ this.jobId = jobId;
+ this.jobState = JobState.PENDING;
+ this.cooldownConfList = cooldownConfList;
+ this.createTimeMs = System.currentTimeMillis();
+ this.timeoutMs = timeoutMs;
+ }
+
+ protected void runPendingJob() throws CooldownException {
+ Preconditions.checkState(jobState == CooldownJob.JobState.PENDING,
jobState);
+ this.jobState = JobState.SEND_CONF;
+ // write edit log
+ for (CooldownConf cooldownConf : cooldownConfList) {
+ setCooldownReplica(cooldownConf.getDbId(),
cooldownConf.getTableId(), cooldownConf.getPartitionId(),
+ cooldownConf.getIndexId(), cooldownConf.getTabletId(),
cooldownConf.getCooldownReplicaId(),
+ cooldownConf.getCooldownTerm());
+
Env.getCurrentInvertedIndex().getTabletMeta(cooldownConf.getTabletId()).setCooldownReplicaId(
+ cooldownConf.getCooldownReplicaId());
+
Env.getCurrentInvertedIndex().getTabletMeta(cooldownConf.getTabletId()).setCooldownTerm(
+ cooldownConf.getCooldownTerm());
+ }
+ Env.getCurrentEnv().getEditLog().logCooldownJob(this);
+ LOG.info("send cooldown job {} state to {}", jobId, this.jobState);
+ }
+
+ protected void runSendJob() throws CooldownException {
+ Preconditions.checkState(jobState == JobState.SEND_CONF, jobState);
+ LOG.info("begin to send cooldown conf tasks. job: {}", jobId);
+ if (!FeConstants.runningUnitTest) {
+ Map<Long, List<CooldownConf>> cooldownMap = new HashMap<>();
+ for (CooldownConf cooldownConf : cooldownConfList) {
+ Database db = Env.getCurrentInternalCatalog()
+ .getDbOrException(cooldownConf.getDbId(), s -> new
CooldownException(
+ "Database " + s + " does not exist"));
+ OlapTable tbl;
+ try {
+ tbl = (OlapTable)
db.getTableOrMetaException(cooldownConf.getTableId(), TableIf.TableType.OLAP);
+ } catch (MetaNotFoundException e) {
+ throw new CooldownException(e.getMessage());
+ }
+ if (tbl == null) {
+ throw new CooldownException(String.format("No table: %d",
cooldownConf.getTableId()));
+ }
+ tbl.readLock();
+ try {
+ Partition partition =
tbl.getPartition(cooldownConf.getPartitionId());
+ if (partition == null) {
+ throw new CooldownException(String.format("No
partition: %d", cooldownConf.getPartitionId()));
+ }
+ MaterializedIndex index =
partition.getIndex(cooldownConf.getIndexId());
+ if (index == null) {
+ throw new CooldownException(String.format("No index:
%d", cooldownConf.getIndexId()));
+ }
+ Tablet tablet =
index.getTablet(cooldownConf.getTabletId());
+ if (tablet == null) {
+ throw new CooldownException(String.format("No tablet:
%d", cooldownConf.getTabletId()));
+ }
+ for (Replica replica : tablet.getReplicas()) {
+ if (!cooldownMap.containsKey(replica.getBackendId())) {
+ cooldownMap.put(replica.getBackendId(), new
LinkedList<>());
+ }
+
cooldownMap.get(replica.getBackendId()).add(cooldownConf);
+ }
+ } finally {
+ tbl.readUnlock();
+ }
+ }
+ for (Map.Entry<Long, List<CooldownConf>> entry :
cooldownMap.entrySet()) {
+ PushCooldownConfTask pushCooldownConfTask = new
PushCooldownConfTask(entry.getKey(), entry.getValue());
+ cooldownBatchTask.addTask(pushCooldownConfTask);
+ }
+ AgentTaskQueue.addBatchTask(cooldownBatchTask);
+ AgentTaskExecutor.submit(cooldownBatchTask);
+ }
+
+ this.jobState = JobState.RUNNING;
+ // write edit log
+ Env.getCurrentEnv().getEditLog().logCooldownJob(this);
+ LOG.info("send cooldown job {} state to {}", jobId, this.jobState);
+ }
+
+ protected void runRunningJob() throws CooldownException {
+ if (!cooldownBatchTask.isFinished()) {
+ LOG.info("cooldown tasks not finished. job: {}", jobId);
+ List<AgentTask> tasks = cooldownBatchTask.getUnfinishedTasks(2000);
+ for (AgentTask task : tasks) {
+ if (task.getFailedTimes() >= 3) {
+ task.setFinished(true);
+ AgentTaskQueue.removeTask(task.getBackendId(),
TTaskType.PUSH_COOLDOWN_CONF, task.getSignature());
+ LOG.warn("push cooldown conf task failed after try three
times: " + task.getErrorMsg());
+ throw new CooldownException("cooldown tasks failed on
backend: " + task.getBackendId());
+ }
+ }
+ return;
+ }
+ this.jobState = CooldownJob.JobState.FINISHED;
+ this.finishedTimeMs = System.currentTimeMillis();
+
+ Env.getCurrentEnv().getEditLog().logCooldownJob(this);
+ LOG.info("push cooldown conf job finished: {}", jobId);
+ }
+
+ public boolean isTimeout() {
+ return System.currentTimeMillis() - createTimeMs > timeoutMs;
+ }
+
+ public boolean isDone() {
+ return jobState.isFinalState();
+ }
+
+ /*
+ * cancelImpl() can be called any time any place.
+ * We need to clean any possible residual of this job.
+ */
+ protected synchronized void cancelImpl(String errMsg) {
+ if (jobState.isFinalState()) {
+ return;
+ }
+
+ cancelInternal();
+
+ this.errMsg = errMsg;
+ this.finishedTimeMs = System.currentTimeMillis();
+ LOG.info("cancel cooldown job {}, err: {}", jobId, errMsg);
+ Env.getCurrentEnv().getEditLog().logCooldownJob(this);
+ }
+
+ /**
+ * The keyword 'synchronized' only protects 2 methods:
+ * run() and cancel()
+ * Only these 2 methods can be visited by different thread(internal
working thread and user connection thread)
+ * So using 'synchronized' to make sure only one thread can run the job at
one time.
+ *
+ * lock order:
+ * synchronized
+ * db lock
+ */
+ public synchronized void run() {
+ if (isTimeout()) {
+ cancelImpl("Timeout");
+ return;
+ }
+
+ try {
+ switch (jobState) {
+ case PENDING:
+ runPendingJob();
+ break;
+ case SEND_CONF:
+ runSendJob();
+ break;
+ case RUNNING:
+ runRunningJob();
+ break;
+ default:
+ break;
+ }
+ } catch (CooldownException e) {
+ cancelImpl(e.getMessage());
+ }
+ }
+
+ public void replay(CooldownJob replayedJob) {
+ try {
+ switch (replayedJob.jobState) {
+ case PENDING:
+ replayCreateJob(replayedJob);
+ break;
+ case SEND_CONF:
+ replayPengingJob();
+ break;
+ case FINISHED:
+ replayRunningJob(replayedJob);
+ break;
+ case CANCELLED:
+ replayCancelled(replayedJob);
+ break;
+ default:
+ break;
+ }
+ } catch (CooldownException e) {
+ LOG.warn("[INCONSISTENT META] replay cooldown job failed {}",
replayedJob.jobId, e);
+ }
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ String json = GsonUtils.GSON.toJson(this);
+ Text.writeString(out, json);
+ }
+
+ public static CooldownJob read(DataInput in) throws IOException {
+ String json = Text.readString(in);
+ return GsonUtils.GSON.fromJson(json, CooldownJob.class);
+ }
+
+ /**
+ * Replay job in PENDING state.
+ * Should replay all changes before this job's state transfer to PENDING.
+ * These changes should be same as changes in CooldownHandler.createJob()
+ */
+ private void replayCreateJob(CooldownJob replayedJob) {
+ jobId = replayedJob.jobId;
+ for (CooldownConf replayedConf : replayedJob.cooldownConfList) {
+ CooldownConf cooldownConf = new
CooldownConf(replayedConf.getDbId(), replayedConf.getTableId(),
+ replayedConf.getPartitionId(), replayedConf.getIndexId(),
replayedConf.getTabletId(),
+ replayedConf.getCooldownReplicaId(),
replayedConf.getCooldownTerm());
+ cooldownConfList.add(cooldownConf);
+ }
+ createTimeMs = replayedJob.createTimeMs;
+ timeoutMs = replayedJob.timeoutMs;
+ jobState = JobState.PENDING;
+ LOG.info("replay create cooldown job: {}, conf size: {}", jobId,
cooldownConfList.size());
+ }
+
+ /**
+ * Replay job in PENDING state. set cooldown type in Replica
+ */
+ private void replayPengingJob() throws CooldownException {
+ for (CooldownConf cooldownConf : cooldownConfList) {
+ setCooldownReplica(cooldownConf.getDbId(),
cooldownConf.getTableId(), cooldownConf.getPartitionId(),
+ cooldownConf.getIndexId(), cooldownConf.getTabletId(),
cooldownConf.getCooldownReplicaId(),
+ cooldownConf.getCooldownTerm());
+ if
(Env.getCurrentInvertedIndex().getTabletMeta(cooldownConf.getTabletId()) !=
null) {
+
Env.getCurrentInvertedIndex().getTabletMeta(cooldownConf.getTabletId()).setCooldownReplicaId(
+ cooldownConf.getCooldownReplicaId());
+
Env.getCurrentInvertedIndex().getTabletMeta(cooldownConf.getTabletId()).setCooldownTerm(
+ cooldownConf.getCooldownTerm());
+ }
+ }
+ jobState = JobState.SEND_CONF;
+ LOG.info("replay send cooldown conf, job: {}", jobId);
+ }
+
+ /**
+ * Replay job in FINISHED state.
+ * Should replay all changes in runRunningJob()
+ */
+ private void replayRunningJob(CooldownJob replayedJob) throws
CooldownException {
+ jobState = CooldownJob.JobState.FINISHED;
+ this.finishedTimeMs = replayedJob.finishedTimeMs;
+ LOG.info("replay finished cooldown job: {}", jobId);
+ }
+
+ private void setCooldownReplica(long dbId, long tableId, long partitionId,
long indexId, long tabletId,
+ long cooldownReplicaId, long cooldownTerm)
throws CooldownException {
+ Database db = Env.getCurrentInternalCatalog()
+ .getDbOrException(dbId, s -> new CooldownException("Database "
+ s + " does not exist"));
+ OlapTable tbl;
+ try {
+ tbl = (OlapTable) db.getTableOrMetaException(tableId,
TableIf.TableType.OLAP);
+ } catch (MetaNotFoundException e) {
+ throw new CooldownException(e.getMessage());
+ }
+ if (tbl != null) {
+ tbl.writeLock();
+ try {
+ Partition partition = tbl.getPartition(partitionId);
+ if (partition != null) {
+ MaterializedIndex index = partition.getIndex(indexId);
+ if (index != null) {
+ Tablet tablet = index.getTablet(tabletId);
+ if (tablet != null) {
+ tablet.setCooldownReplicaId(cooldownReplicaId);
+ tablet.setCooldownTerm(cooldownTerm);
+ LOG.info("setCooldownReplicaId to {} when cancel
job: {}:{}", cooldownReplicaId,
+ tablet.getId(), jobId);
+ return;
+ }
+ }
+ }
+ throw new CooldownException("set cooldown type failed.");
+ } finally {
+ tbl.writeUnlock();
+ }
+ }
+ }
+
+ private void cancelInternal() {
+ // clear tasks if has
+ AgentTaskQueue.removeBatchTask(cooldownBatchTask,
TTaskType.PUSH_COOLDOWN_CONF);
+ jobState = CooldownJob.JobState.CANCELLED;
+ }
+
+ /**
+ * Replay job in CANCELLED state.
+ */
+ private void replayCancelled(CooldownJob replayedJob) {
+ cancelInternal();
+ this.jobState = CooldownJob.JobState.CANCELLED;
+ this.finishedTimeMs = replayedJob.finishedTimeMs;
+ this.errMsg = replayedJob.errMsg;
+ LOG.info("replay cancelled cooldown job: {}", jobId);
+ }
+
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
index 261a1704af..04c03cbdef 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
@@ -361,8 +361,9 @@ public class InternalCatalog implements CatalogIf<Database>
{
for (MaterializedIndex index :
partition.getMaterializedIndices(IndexExtState.ALL)) {
long indexId = index.getId();
int schemaHash =
olapTable.getSchemaHashByIndexId(indexId);
- TabletMeta tabletMeta = new TabletMeta(dbId, tableId,
partitionId, indexId, schemaHash, medium);
for (Tablet tablet : index.getTablets()) {
+ TabletMeta tabletMeta = new TabletMeta(dbId,
tableId, partitionId, indexId, schemaHash,
+ medium, tablet.getCooldownReplicaId(),
tablet.getCooldownTerm());
long tabletId = tablet.getId();
invertedIndex.addTablet(tabletId, tabletMeta);
for (Replica replica : tablet.getReplicas()) {
@@ -1285,8 +1286,9 @@ public class InternalCatalog implements
CatalogIf<Database> {
for (MaterializedIndex mIndex :
partition.getMaterializedIndices(IndexExtState.ALL)) {
long indexId = mIndex.getId();
int schemaHash =
olapTable.getSchemaHashByIndexId(indexId);
- TabletMeta tabletMeta = new TabletMeta(dbId, tableId,
partitionId, indexId, schemaHash, medium);
for (Tablet tablet : mIndex.getTablets()) {
+ TabletMeta tabletMeta = new TabletMeta(dbId,
tableId, partitionId, indexId, schemaHash,
+ medium, tablet.getCooldownReplicaId(),
tablet.getCooldownTerm());
long tabletId = tablet.getId();
invertedIndex.addTablet(tabletId, tabletMeta);
for (Replica replica : tablet.getReplicas()) {
@@ -1547,9 +1549,10 @@ public class InternalCatalog implements
CatalogIf<Database> {
for (MaterializedIndex index :
partition.getMaterializedIndices(IndexExtState.ALL)) {
long indexId = index.getId();
int schemaHash = olapTable.getSchemaHashByIndexId(indexId);
- TabletMeta tabletMeta = new TabletMeta(info.getDbId(),
info.getTableId(), partition.getId(),
- index.getId(), schemaHash,
info.getDataProperty().getStorageMedium());
for (Tablet tablet : index.getTablets()) {
+ TabletMeta tabletMeta = new TabletMeta(info.getDbId(),
info.getTableId(), partition.getId(),
+ index.getId(), schemaHash,
info.getDataProperty().getStorageMedium(),
+ tablet.getCooldownReplicaId(),
tablet.getCooldownTerm());
long tabletId = tablet.getId();
invertedIndex.addTablet(tabletId, tabletMeta);
for (Replica replica : tablet.getReplicas()) {
@@ -1699,7 +1702,8 @@ public class InternalCatalog implements
CatalogIf<Database> {
// create tablets
int schemaHash = indexMeta.getSchemaHash();
- TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId,
indexId, schemaHash, storageMedium);
+ TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId,
indexId, schemaHash, storageMedium, -1,
+ -1);
createTablets(clusterName, index, ReplicaState.NORMAL,
distributionInfo, version, replicaAlloc, tabletMeta,
tabletIdSet, idGeneratorBuffer);
@@ -2673,9 +2677,9 @@ public class InternalCatalog implements
CatalogIf<Database> {
for (MaterializedIndex mIndex :
partition.getMaterializedIndices(IndexExtState.ALL)) {
long indexId = mIndex.getId();
int schemaHash =
olapTable.getSchemaHashByIndexId(indexId);
- TabletMeta tabletMeta = new TabletMeta(db.getId(),
olapTable.getId(), partitionId, indexId,
- schemaHash, medium);
for (Tablet tablet : mIndex.getTablets()) {
+ TabletMeta tabletMeta = new TabletMeta(db.getId(),
olapTable.getId(), partitionId, indexId,
+ schemaHash, medium,
tablet.getCooldownReplicaId(), tablet.getCooldownTerm());
long tabletId = tablet.getId();
invertedIndex.addTablet(tabletId, tabletMeta);
for (Replica replica : tablet.getReplicas()) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
index fb8fbb5b99..232509f60b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
@@ -36,6 +36,7 @@ import org.apache.doris.cluster.Cluster;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.SmallFileMgr.SmallFile;
+import org.apache.doris.cooldown.CooldownJob;
import org.apache.doris.datasource.CatalogLog;
import org.apache.doris.datasource.ExternalObjectLog;
import org.apache.doris.datasource.InitCatalogLog;
@@ -579,6 +580,11 @@ public class JournalEntity implements Writable {
isRead = true;
break;
}
+ case OperationType.OP_PUSH_COOLDOWN_CONF: {
+ data = CooldownJob.read(in);
+ isRead = true;
+ break;
+ }
case OperationType.OP_BATCH_ADD_ROLLUP: {
data = BatchAlterJobPersistInfo.read(in);
isRead = true;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
index 6868de1a9b..021f049aa0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
@@ -44,6 +44,7 @@ import org.apache.doris.task.CreateReplicaTask;
import org.apache.doris.task.DirMoveTask;
import org.apache.doris.task.DownloadTask;
import org.apache.doris.task.PublishVersionTask;
+import org.apache.doris.task.PushCooldownConfTask;
import org.apache.doris.task.PushTask;
import org.apache.doris.task.SnapshotTask;
import org.apache.doris.task.StorageMediaMigrationTask;
@@ -196,6 +197,9 @@ public class MasterImpl {
case UPDATE_TABLET_META_INFO:
finishUpdateTabletMeta(task, request);
break;
+ case PUSH_COOLDOWN_CONF:
+ finishPushCooldownConfTask(task);
+ break;
default:
break;
}
@@ -598,4 +602,10 @@ public class MasterImpl {
}
AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.ALTER,
task.getSignature());
}
+
+ private void finishPushCooldownConfTask(AgentTask task) {
+ PushCooldownConfTask cooldownTask = (PushCooldownConfTask) task;
+ cooldownTask.setFinished(true);
+ AgentTaskQueue.removeTask(task.getBackendId(),
TTaskType.PUSH_COOLDOWN_CONF, task.getSignature());
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
index c8e03e4d6b..50add0b24e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
@@ -259,6 +259,8 @@ public class ReportHandler extends Daemon {
Set<Long> tabletFoundInMeta = Sets.newConcurrentHashSet();
// storage medium -> tablet id
ListMultimap<TStorageMedium, Long> tabletMigrationMap =
LinkedListMultimap.create();
+ // the cooldown type of replicas which need to be sync. tabletId ->
TabletMeta
+ Map<Long, TabletMeta> syncCooldownTabletMap = new HashMap<>();
// dbid -> txn id -> [partition info]
Map<Long, ListMultimap<Long, TPartitionVersionInfo>>
transactionsToPublish = Maps.newHashMap();
@@ -278,7 +280,8 @@ public class ReportHandler extends Daemon {
transactionsToPublish,
transactionsToClear,
tabletRecoveryMap,
- tabletToInMemory);
+ tabletToInMemory,
+ syncCooldownTabletMap);
// 2. sync
if (!tabletSyncMap.isEmpty()) {
@@ -321,6 +324,11 @@ public class ReportHandler extends Daemon {
handleSetTabletInMemory(backendId, tabletToInMemory);
}
+ // 10. send cooldownType which need sync to CooldownHandler
+ if (!syncCooldownTabletMap.isEmpty()) {
+
Env.getCurrentEnv().getCooldownHandler().handleCooldownConf(syncCooldownTabletMap);
+ }
+
final SystemInfoService currentSystemInfo = Env.getCurrentSystemInfo();
Backend reportBackend = currentSystemInfo.getBackend(backendId);
if (reportBackend != null) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
index 6ac6ab0700..fe757ff3aa 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
@@ -41,6 +41,7 @@ import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.SmallFileMgr.SmallFile;
+import org.apache.doris.cooldown.CooldownJob;
import org.apache.doris.datasource.CatalogLog;
import org.apache.doris.datasource.ExternalObjectLog;
import org.apache.doris.datasource.InitCatalogLog;
@@ -744,6 +745,10 @@ public class EditLog {
}
break;
}
+ case OperationType.OP_PUSH_COOLDOWN_CONF:
+ CooldownJob cooldownJob = (CooldownJob) journal.getData();
+ env.getCooldownHandler().replayCooldownJob(cooldownJob);
+ break;
case OperationType.OP_BATCH_ADD_ROLLUP: {
BatchAlterJobPersistInfo batchAlterJobV2 =
(BatchAlterJobPersistInfo) journal.getData();
for (AlterJobV2 alterJobV2 :
batchAlterJobV2.getAlterJobV2List()) {
@@ -1505,6 +1510,10 @@ public class EditLog {
logEdit(OperationType.OP_ALTER_JOB_V2, alterJob);
}
+ public void logCooldownJob(CooldownJob cooldownJob) {
+ logEdit(OperationType.OP_PUSH_COOLDOWN_CONF, cooldownJob);
+ }
+
public void logBatchAlterJob(BatchAlterJobPersistInfo batchAlterJobV2) {
logEdit(OperationType.OP_BATCH_ADD_ROLLUP, batchAlterJobV2);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
index c5889a8001..13009ba85d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
@@ -76,6 +76,8 @@ public class OperationType {
//schema change for add and drop columns
public static final short OP_MODIFY_TABLE_ADD_OR_DROP_COLUMNS = 128;
+ // set cooldown conf in replica
+ public static final short OP_PUSH_COOLDOWN_CONF = 129;
// 30~39 130~139 230~239 ...
// load job for only hadoop load
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaPersistMethod.java
b/fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaPersistMethod.java
index 5ab8c19d0e..896aaae698 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaPersistMethod.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaPersistMethod.java
@@ -209,6 +209,12 @@ public class MetaPersistMethod {
metaPersistMethod.writeMethod =
Env.class.getDeclaredMethod("saveMTMVJobManager",
CountingDataOutputStream.class, long.class);
break;
+ case "cooldownJob":
+ metaPersistMethod.readMethod =
Env.class.getDeclaredMethod("loadCooldownJob", DataInputStream.class,
+ long.class);
+ metaPersistMethod.writeMethod =
Env.class.getDeclaredMethod("saveCooldownJob",
+ CountingDataOutputStream.class, long.class);
+ break;
default:
break;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/persist/meta/PersistMetaModules.java
b/fe/fe-core/src/main/java/org/apache/doris/persist/meta/PersistMetaModules.java
index 5defa01e9d..e4db00bc8e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/persist/meta/PersistMetaModules.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/persist/meta/PersistMetaModules.java
@@ -38,7 +38,7 @@ public class PersistMetaModules {
"masterInfo", "frontends", "backends", "datasource", "db",
"loadJob", "alterJob", "recycleBin",
"globalVariable", "cluster", "broker", "resources", "exportJob",
"syncJob", "backupHandler",
"paloAuth", "transactionState", "colocateTableIndex",
"routineLoadJobs", "loadJobV2", "smallFiles",
- "plugins", "deleteHandler", "sqlBlockRule", "policy",
"mtmvJobManager");
+ "plugins", "deleteHandler", "sqlBlockRule", "policy",
"mtmvJobManager", "cooldownJob");
static {
MODULES_MAP = Maps.newHashMap();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
b/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
index 2319bf8886..0208058166 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
@@ -37,6 +37,7 @@ import org.apache.doris.thrift.TGetStoragePolicy;
import org.apache.doris.thrift.TMoveDirReq;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TPublishVersionRequest;
+import org.apache.doris.thrift.TPushCooldownConfReq;
import org.apache.doris.thrift.TPushReq;
import org.apache.doris.thrift.TPushType;
import org.apache.doris.thrift.TReleaseSnapshotRequest;
@@ -359,6 +360,15 @@ public class AgentBatchTask implements Runnable {
tAgentTaskRequest.setUpdatePolicy(request);
return tAgentTaskRequest;
}
+ case PUSH_COOLDOWN_CONF: {
+ PushCooldownConfTask pushCooldownConfTask =
(PushCooldownConfTask) task;
+ TPushCooldownConfReq request = pushCooldownConfTask.toThrift();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(request.toString());
+ }
+ tAgentTaskRequest.setPushCooldownConf(request);
+ return tAgentTaskRequest;
+ }
default:
LOG.debug("could not find task type for task [{}]", task);
return null;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/task/PushCooldownConfTask.java
b/fe/fe-core/src/main/java/org/apache/doris/task/PushCooldownConfTask.java
new file mode 100644
index 0000000000..06ea11d659
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/PushCooldownConfTask.java
@@ -0,0 +1,52 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.task;
+
+import org.apache.doris.cooldown.CooldownConf;
+import org.apache.doris.thrift.TCooldownConf;
+import org.apache.doris.thrift.TPushCooldownConfReq;
+import org.apache.doris.thrift.TTaskType;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.List;
+
+public class PushCooldownConfTask extends AgentTask {
+ private static final Logger LOG =
LogManager.getLogger(PushCooldownConfTask.class);
+
+ private List<CooldownConf> cooldownConfList;
+
+ public PushCooldownConfTask(long backendId, List<CooldownConf>
cooldownConfList) {
+ super(null, backendId, TTaskType.PUSH_COOLDOWN_CONF, -1, -1, -1, -1,
-1);
+
+ this.cooldownConfList = cooldownConfList;
+ }
+
+ public TPushCooldownConfReq toThrift() {
+ TPushCooldownConfReq pushCooldownConfReq = new TPushCooldownConfReq();
+ for (CooldownConf cooldownConf : cooldownConfList) {
+ TCooldownConf tCooldownConf = new TCooldownConf();
+ tCooldownConf.setTabletId(cooldownConf.getTabletId());
+
tCooldownConf.setCooldownReplicaId(cooldownConf.getCooldownReplicaId());
+ tCooldownConf.setCooldownTerm(cooldownConf.getCooldownTerm());
+ pushCooldownConfReq.addToCooldownConfs(tCooldownConf);
+ }
+ return pushCooldownConfReq;
+ }
+}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/backup/CatalogMocker.java
b/fe/fe-core/src/test/java/org/apache/doris/backup/CatalogMocker.java
index 1c8a0130cc..42e788b537 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/backup/CatalogMocker.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/backup/CatalogMocker.java
@@ -249,7 +249,7 @@ public class CatalogMocker {
Tablet tablet0 = new Tablet(TEST_TABLET0_ID);
TabletMeta tabletMeta = new TabletMeta(TEST_DB_ID, TEST_TBL_ID,
TEST_SINGLE_PARTITION_ID,
- TEST_TBL_ID, SCHEMA_HASH,
TStorageMedium.HDD);
+ TEST_TBL_ID, SCHEMA_HASH,
TStorageMedium.HDD, -1, -1);
baseIndex.addTablet(tablet0, tabletMeta);
Replica replica0 = new Replica(TEST_REPLICA0_ID, BACKEND1_ID, 0,
ReplicaState.NORMAL);
Replica replica1 = new Replica(TEST_REPLICA1_ID, BACKEND2_ID, 0,
ReplicaState.NORMAL);
@@ -323,7 +323,7 @@ public class CatalogMocker {
Tablet baseTabletP1 = new Tablet(TEST_BASE_TABLET_P1_ID);
TabletMeta tabletMetaBaseTabletP1 = new TabletMeta(TEST_DB_ID,
TEST_TBL2_ID, TEST_PARTITION1_ID,
- TEST_TBL2_ID,
SCHEMA_HASH, TStorageMedium.HDD);
+ TEST_TBL2_ID,
SCHEMA_HASH, TStorageMedium.HDD, -1, -1);
baseIndexP1.addTablet(baseTabletP1, tabletMetaBaseTabletP1);
Replica replica3 = new Replica(TEST_REPLICA3_ID, BACKEND1_ID, 0,
ReplicaState.NORMAL);
Replica replica4 = new Replica(TEST_REPLICA4_ID, BACKEND2_ID, 0,
ReplicaState.NORMAL);
@@ -335,7 +335,7 @@ public class CatalogMocker {
Tablet baseTabletP2 = new Tablet(TEST_BASE_TABLET_P2_ID);
TabletMeta tabletMetaBaseTabletP2 = new TabletMeta(TEST_DB_ID,
TEST_TBL2_ID, TEST_PARTITION2_ID,
- TEST_TBL2_ID,
SCHEMA_HASH, TStorageMedium.HDD);
+ TEST_TBL2_ID,
SCHEMA_HASH, TStorageMedium.HDD, -1, -1);
baseIndexP2.addTablet(baseTabletP2, tabletMetaBaseTabletP2);
Replica replica6 = new Replica(TEST_REPLICA6_ID, BACKEND1_ID, 0,
ReplicaState.NORMAL);
Replica replica7 = new Replica(TEST_REPLICA7_ID, BACKEND2_ID, 0,
ReplicaState.NORMAL);
@@ -356,7 +356,7 @@ public class CatalogMocker {
Tablet rollupTabletP1 = new Tablet(TEST_ROLLUP_TABLET_P1_ID);
TabletMeta tabletMetaRollupTabletP1 = new TabletMeta(TEST_DB_ID,
TEST_TBL2_ID, TEST_PARTITION1_ID,
TEST_ROLLUP_TABLET_P1_ID, ROLLUP_SCHEMA_HASH,
-
TStorageMedium.HDD);
+
TStorageMedium.HDD, -1, -1);
rollupIndexP1.addTablet(rollupTabletP1, tabletMetaRollupTabletP1);
Replica replica9 = new Replica(TEST_REPLICA9_ID, BACKEND1_ID, 0,
ReplicaState.NORMAL);
Replica replica10 = new Replica(TEST_REPLICA10_ID, BACKEND2_ID, 0,
ReplicaState.NORMAL);
@@ -373,7 +373,7 @@ public class CatalogMocker {
Tablet rollupTabletP2 = new Tablet(TEST_ROLLUP_TABLET_P2_ID);
TabletMeta tabletMetaRollupTabletP2 = new TabletMeta(TEST_DB_ID,
TEST_TBL2_ID, TEST_PARTITION1_ID,
TEST_ROLLUP_TABLET_P2_ID, ROLLUP_SCHEMA_HASH,
-
TStorageMedium.HDD);
+
TStorageMedium.HDD, -1, -1);
rollupIndexP2.addTablet(rollupTabletP2, tabletMetaRollupTabletP2);
Replica replica12 = new Replica(TEST_REPLICA12_ID, BACKEND1_ID, 0,
ReplicaState.NORMAL);
Replica replica13 = new Replica(TEST_REPLICA13_ID, BACKEND2_ID, 0,
ReplicaState.NORMAL);
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java
b/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java
index 0c180e68aa..d79855ce67 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java
@@ -193,7 +193,7 @@ public class CatalogTestUtil {
// index
MaterializedIndex index = new MaterializedIndex(indexId,
IndexState.NORMAL);
- TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId,
indexId, 0, TStorageMedium.HDD);
+ TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId,
indexId, 0, TStorageMedium.HDD, -1, -1);
index.addTablet(tablet, tabletMeta);
tablet.addReplica(replica1);
@@ -260,7 +260,8 @@ public class CatalogTestUtil {
// index
MaterializedIndex index = new MaterializedIndex(testIndexId2,
IndexState.NORMAL);
- TabletMeta tabletMeta = new TabletMeta(testDbId1, testTableId2,
testPartitionId2, testIndexId2, 0, TStorageMedium.HDD);
+ TabletMeta tabletMeta = new TabletMeta(testDbId1, testTableId2,
testPartitionId2, testIndexId2, 0,
+ TStorageMedium.HDD, -1, -1);
index.addTablet(tablet, tabletMeta);
tablet.addReplica(replica);
diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/TabletTest.java
b/fe/fe-core/src/test/java/org/apache/doris/catalog/TabletTest.java
index d7fdb2694a..14e20a1bd7 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/catalog/TabletTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/TabletTest.java
@@ -66,7 +66,7 @@ public class TabletTest {
};
tablet = new Tablet(1);
- TabletMeta tabletMeta = new TabletMeta(10, 20, 30, 40, 1,
TStorageMedium.HDD);
+ TabletMeta tabletMeta = new TabletMeta(10, 20, 30, 40, 1,
TStorageMedium.HDD, -1, -1);
invertedIndex.addTablet(1, tabletMeta);
replica1 = new Replica(1L, 1L, 100L, 0, 200000L, 0, 3000L,
ReplicaState.NORMAL, 0, 0);
replica2 = new Replica(2L, 2L, 100L, 0, 200000L, 0, 3000L,
ReplicaState.NORMAL, 0, 0);
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/clone/ClusterLoadStatisticsTest.java
b/fe/fe-core/src/test/java/org/apache/doris/clone/ClusterLoadStatisticsTest.java
index c81c3839c4..1ac66444ef 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/clone/ClusterLoadStatisticsTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/clone/ClusterLoadStatisticsTest.java
@@ -145,16 +145,16 @@ public class ClusterLoadStatisticsTest {
// tablet
invertedIndex = new TabletInvertedIndex();
- invertedIndex.addTablet(50000, new TabletMeta(1, 2, 3, 4, 5,
TStorageMedium.HDD));
+ invertedIndex.addTablet(50000, new TabletMeta(1, 2, 3, 4, 5,
TStorageMedium.HDD, -1, -1));
invertedIndex.addReplica(50000, new Replica(50001, be1.getId(), 0,
ReplicaState.NORMAL));
invertedIndex.addReplica(50000, new Replica(50002, be2.getId(), 0,
ReplicaState.NORMAL));
invertedIndex.addReplica(50000, new Replica(50003, be3.getId(), 0,
ReplicaState.NORMAL));
- invertedIndex.addTablet(60000, new TabletMeta(1, 2, 3, 4, 5,
TStorageMedium.HDD));
+ invertedIndex.addTablet(60000, new TabletMeta(1, 2, 3, 4, 5,
TStorageMedium.HDD, -1, -1));
invertedIndex.addReplica(60000, new Replica(60002, be2.getId(), 0,
ReplicaState.NORMAL));
invertedIndex.addReplica(60000, new Replica(60003, be3.getId(), 0,
ReplicaState.NORMAL));
- invertedIndex.addTablet(70000, new TabletMeta(1, 2, 3, 4, 5,
TStorageMedium.HDD));
+ invertedIndex.addTablet(70000, new TabletMeta(1, 2, 3, 4, 5,
TStorageMedium.HDD, -1, -1));
invertedIndex.addReplica(70000, new Replica(70002, be2.getId(), 0,
ReplicaState.NORMAL));
invertedIndex.addReplica(70000, new Replica(70003, be3.getId(), 0,
ReplicaState.NORMAL));
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/clone/RebalancerTestUtil.java
b/fe/fe-core/src/test/java/org/apache/doris/clone/RebalancerTestUtil.java
index 88f02df4cb..756510de77 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/clone/RebalancerTestUtil.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/clone/RebalancerTestUtil.java
@@ -81,7 +81,7 @@ public class RebalancerTestUtil {
int schemaHash = olapTable.getSchemaHashByIndexId(baseIndex.getId());
TabletMeta tabletMeta = new TabletMeta(db.getId(), olapTable.getId(),
- partition.getId(), baseIndex.getId(), schemaHash, medium);
+ partition.getId(), baseIndex.getId(), schemaHash, medium, -1,
-1);
Tablet tablet = new Tablet(tabletId);
// add tablet to olapTable
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/common/util/UnitTestUtil.java
b/fe/fe-core/src/test/java/org/apache/doris/common/util/UnitTestUtil.java
index 4460fcb31f..d933b80982 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/common/util/UnitTestUtil.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/common/util/UnitTestUtil.java
@@ -78,7 +78,7 @@ public class UnitTestUtil {
// index
MaterializedIndex index = new MaterializedIndex(indexId,
IndexState.NORMAL);
- TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId,
indexId, 0, TStorageMedium.HDD);
+ TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId,
indexId, 0, TStorageMedium.HDD, -1, -1);
index.addTablet(tablet, tabletMeta);
tablet.addReplica(replica1);
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/cooldown/CooldownJobTest.java
b/fe/fe-core/src/test/java/org/apache/doris/cooldown/CooldownJobTest.java
new file mode 100644
index 0000000000..6e16e9e945
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/cooldown/CooldownJobTest.java
@@ -0,0 +1,130 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cooldown;
+
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.KeysType;
+import org.apache.doris.catalog.MaterializedIndex;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.PartitionInfo;
+import org.apache.doris.catalog.Replica;
+import org.apache.doris.catalog.Tablet;
+import org.apache.doris.catalog.TabletMeta;
+import org.apache.doris.cluster.Cluster;
+import org.apache.doris.persist.EditLog;
+import org.apache.doris.thrift.TStorageMedium;
+
+import mockit.Mocked;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+public class CooldownJobTest {
+
+ private static long jobId = 100L;
+ private static long dbId = 101L;
+ private static long tableId = 102L;
+ private static long partitionId = 103L;
+ private static long indexId = 104L;
+ private static long tabletId = 105L;
+ private static long replicaId = 106L;
+ private static long backendId = 107L;
+ private static long cooldownReplicaId = 106L;
+ private static long cooldownTerm = 109L;
+ private static long timeoutMs = 10000L;
+ private static Tablet tablet = new Tablet(tabletId);
+ private static Replica replica = new Replica(replicaId, backendId, 1,
Replica.ReplicaState.NORMAL);
+
+ private static CooldownConf cooldownConf = new CooldownConf(dbId, tableId,
partitionId, indexId, tabletId,
+ cooldownReplicaId, cooldownTerm);
+
+ private static List<CooldownConf> cooldownConfList = new LinkedList<>();
+
+ @Mocked
+ private EditLog editLog;
+
+ public static CooldownJob createCooldownJob() {
+ tablet.setCooldownReplicaId(cooldownReplicaId);
+ tablet.setCooldownTerm(cooldownTerm);
+ cooldownConfList.add(cooldownConf);
+ return new CooldownJob(jobId, cooldownConfList, timeoutMs);
+ }
+
+ @Before
+ public void setUp() {
+ Cluster testCluster = new Cluster("test_cluster", 0);
+ Database db = new Database(dbId, "db1");
+ db.setClusterName("test_cluster");
+ Env.getCurrentEnv().addCluster(testCluster);
+ Env.getCurrentEnv().unprotectCreateDb(db);
+ OlapTable table = new OlapTable(tableId, "testTable", new
ArrayList<>(), KeysType.DUP_KEYS,
+ new PartitionInfo(), null);
+ table.setId(tableId);
+ db.createTable(table);
+ MaterializedIndex baseIndex = new MaterializedIndex();
+ baseIndex.setIdForRestore(indexId);
+ Partition partition = new Partition(partitionId, "part1", baseIndex,
null);
+ table.addPartition(partition);
+ TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId,
indexId, 1, TStorageMedium.HDD,
+ cooldownReplicaId, cooldownTerm);
+ baseIndex.addTablet(tablet, tabletMeta);
+ tablet.addReplica(replica);
+ Env.getCurrentEnv().setEditLog(editLog);
+ }
+
+ @Test
+ public void testPending() throws Exception {
+ CooldownJob cooldownJob = createCooldownJob();
+ cooldownJob.runPendingJob();
+ Assert.assertEquals(CooldownJob.JobState.SEND_CONF,
cooldownJob.jobState);
+ for (CooldownConf conf : cooldownJob.getCooldownConfList()) {
+ Assert.assertEquals(conf.getCooldownReplicaId(), replica.getId());
+ }
+ CooldownJob job1 = createCooldownJob();
+ job1.replay(cooldownJob);
+ Assert.assertEquals(CooldownJob.JobState.SEND_CONF, job1.jobState);
+ // run send job
+ cooldownJob.runSendJob();
+ Assert.assertEquals(CooldownJob.JobState.RUNNING,
cooldownJob.jobState);
+ // run replay finish job
+ cooldownJob.jobState = CooldownJob.JobState.FINISHED;
+ job1.replay(cooldownJob);
+ Assert.assertEquals(CooldownJob.JobState.FINISHED, job1.jobState);
+ }
+
+ @Test
+ public void testCancelJob() throws Exception {
+ CooldownJob cooldownJob = createCooldownJob();
+ cooldownJob.runPendingJob();
+ Assert.assertEquals(CooldownJob.JobState.SEND_CONF,
cooldownJob.jobState);
+ Assert.assertEquals(cooldownReplicaId, replica.getId());
+ // run send job
+ cooldownJob.runSendJob();
+ Assert.assertEquals(CooldownJob.JobState.RUNNING,
cooldownJob.jobState);
+ // run cancel job
+ cooldownJob.cancelImpl("test cancel");
+ Assert.assertEquals(CooldownJob.JobState.CANCELLED,
cooldownJob.jobState);
+ }
+
+}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/http/DorisHttpTestCase.java
b/fe/fe-core/src/test/java/org/apache/doris/http/DorisHttpTestCase.java
index e4310243c4..df8ed5a316 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/http/DorisHttpTestCase.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/http/DorisHttpTestCase.java
@@ -156,7 +156,8 @@ public abstract class DorisHttpTestCase {
// index
MaterializedIndex baseIndex = new MaterializedIndex(testIndexId,
MaterializedIndex.IndexState.NORMAL);
- TabletMeta tabletMeta = new TabletMeta(testDbId, testTableId,
testPartitionId, testIndexId, testSchemaHash, TStorageMedium.HDD);
+ TabletMeta tabletMeta = new TabletMeta(testDbId, testTableId,
testPartitionId, testIndexId, testSchemaHash,
+ TStorageMedium.HDD, -1, -1);
baseIndex.addTablet(tablet, tabletMeta);
tablet.addReplica(replica1);
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/load/DeleteHandlerTest.java
b/fe/fe-core/src/test/java/org/apache/doris/load/DeleteHandlerTest.java
index 5c2c005972..e464517d7f 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/load/DeleteHandlerTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/load/DeleteHandlerTest.java
@@ -116,7 +116,7 @@ public class DeleteHandlerTest {
auth = AccessTestUtil.fetchAdminAccess();
analyzer = AccessTestUtil.fetchAdminAnalyzer(false);
db = CatalogMocker.mockDb();
- TabletMeta tabletMeta = new TabletMeta(DB_ID, TBL_ID, PARTITION_ID,
TBL_ID, 0, null);
+ TabletMeta tabletMeta = new TabletMeta(DB_ID, TBL_ID, PARTITION_ID,
TBL_ID, 0, null, -1, -1);
invertedIndex.addTablet(TABLET_ID, tabletMeta);
invertedIndex.addReplica(TABLET_ID, new Replica(REPLICA_ID_1,
BACKEND_ID_1, 0, Replica.ReplicaState.NORMAL));
invertedIndex.addReplica(TABLET_ID, new Replica(REPLICA_ID_2,
BACKEND_ID_2, 0, Replica.ReplicaState.NORMAL));
diff --git a/gensrc/thrift/AgentService.thrift
b/gensrc/thrift/AgentService.thrift
index 38033224d6..3215c63d19 100644
--- a/gensrc/thrift/AgentService.thrift
+++ b/gensrc/thrift/AgentService.thrift
@@ -349,6 +349,16 @@ struct TPluginMetaInfo {
4: optional string source
}
+struct TCooldownConf {
+ 1: required Types.TTabletId tablet_id
+ 2: optional Types.TReplicaId cooldown_replica_id
+ 3: optional i64 cooldown_term
+}
+
+struct TPushCooldownConfReq {
+ 1: required list<TCooldownConf> cooldown_confs
+}
+
struct TAgentTaskRequest {
1: required TAgentServiceVersion protocol_version
2: required Types.TTaskType task_type
@@ -380,6 +390,7 @@ struct TAgentTaskRequest {
27: optional TCompactionReq compaction_req
28: optional TStorageMigrationReqV2 storage_migration_req_v2
29: optional TGetStoragePolicy update_policy
+ 30: optional TPushCooldownConfReq push_cooldown_conf
}
struct TAgentResult {
diff --git a/gensrc/thrift/MasterService.thrift
b/gensrc/thrift/MasterService.thrift
index 7c9b923198..6852f21426 100644
--- a/gensrc/thrift/MasterService.thrift
+++ b/gensrc/thrift/MasterService.thrift
@@ -42,6 +42,9 @@ struct TTabletInfo {
15: optional Types.TReplicaId replica_id
// data size on remote storage
16: optional Types.TSize remote_data_size
+ 17: optional Types.TReplicaId cooldown_replica_id
+ 18: optional bool is_cooldown = false
+ 19: optional i64 cooldown_term = -1
}
struct TFinishTaskRequest {
diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift
index 589b95f77c..1a92304b38 100644
--- a/gensrc/thrift/Types.thrift
+++ b/gensrc/thrift/Types.thrift
@@ -202,7 +202,8 @@ enum TTaskType {
UNINSTALL_PLUGIN,
COMPACTION,
STORAGE_MEDIUM_MIGRATE_V2,
- NOTIFY_UPDATE_STORAGE_POLICY
+ NOTIFY_UPDATE_STORAGE_POLICY,
+ PUSH_COOLDOWN_CONF
}
enum TStmtType {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]