This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 58c520dbfd [Feature](remote) Cooldown cold data to object storage only 
one replica (#15832)
58c520dbfd is described below

commit 58c520dbfde1ce09c0a23a5ec2276ca687f6c0db
Author: pengxiangyu <[email protected]>
AuthorDate: Sat Jan 14 23:58:00 2023 +0800

    [Feature](remote) Cooldown cold data to object storage only one replica 
(#15832)
---
 be/src/agent/agent_server.cpp                      |  11 +
 be/src/agent/agent_server.h                        |   1 +
 be/src/agent/task_worker_pool.cpp                  |  44 +++
 be/src/agent/task_worker_pool.h                    |   6 +-
 be/src/http/action/pad_rowset_action.cpp           |   2 +
 be/src/io/fs/file_reader_options.h                 |  14 +-
 be/src/io/fs/remote_file_system.cpp                |   3 +-
 be/src/olap/delta_writer.cpp                       |   3 +
 be/src/olap/push_handler.cpp                       |   4 +
 be/src/olap/rowset/beta_rowset.cpp                 |  26 +-
 be/src/olap/rowset/beta_rowset.h                   |   2 +
 be/src/olap/rowset/beta_rowset_writer.cpp          |   4 +-
 be/src/olap/rowset/segment_v2/segment.cpp          |   3 +-
 be/src/olap/rowset/segment_v2/segment.h            |   1 +
 be/src/olap/tablet.cpp                             | 132 ++++++-
 be/src/olap/tablet.h                               |   4 +
 be/src/olap/tablet_meta.cpp                        |  18 +-
 be/src/olap/tablet_meta.h                          |  15 +
 be/test/io/cache/remote_file_cache_test.cpp        |   4 +-
 be/test/tools/benchmark_tool.cpp                   |   4 +-
 .../org/apache/doris/common/FeMetaVersion.java     |   4 +-
 .../doris/alter/MaterializedViewHandler.java       |   3 +-
 .../java/org/apache/doris/alter/RollupJobV2.java   |   4 +-
 .../apache/doris/alter/SchemaChangeHandler.java    |   4 +-
 .../org/apache/doris/alter/SchemaChangeJobV2.java  |   4 +-
 .../java/org/apache/doris/backup/RestoreJob.java   |  15 +-
 .../apache/doris/catalog/CatalogRecycleBin.java    |   6 +-
 .../main/java/org/apache/doris/catalog/Env.java    |  24 ++
 .../main/java/org/apache/doris/catalog/Tablet.java |  42 ++-
 .../apache/doris/catalog/TabletInvertedIndex.java  |  55 ++-
 .../java/org/apache/doris/catalog/TabletMeta.java  |  24 +-
 .../main/java/org/apache/doris/common/Config.java  |   7 +
 .../org/apache/doris/cooldown/CooldownConf.java    | 130 +++++++
 .../apache/doris/cooldown/CooldownException.java   |  32 ++
 .../org/apache/doris/cooldown/CooldownHandler.java | 185 ++++++++++
 .../org/apache/doris/cooldown/CooldownJob.java     | 397 +++++++++++++++++++++
 .../apache/doris/datasource/InternalCatalog.java   |  18 +-
 .../org/apache/doris/journal/JournalEntity.java    |   6 +
 .../java/org/apache/doris/master/MasterImpl.java   |  10 +
 .../org/apache/doris/master/ReportHandler.java     |  10 +-
 .../java/org/apache/doris/persist/EditLog.java     |   9 +
 .../org/apache/doris/persist/OperationType.java    |   2 +
 .../doris/persist/meta/MetaPersistMethod.java      |   6 +
 .../doris/persist/meta/PersistMetaModules.java     |   2 +-
 .../java/org/apache/doris/task/AgentBatchTask.java |  10 +
 .../apache/doris/task/PushCooldownConfTask.java    |  52 +++
 .../org/apache/doris/backup/CatalogMocker.java     |  10 +-
 .../org/apache/doris/catalog/CatalogTestUtil.java  |   5 +-
 .../java/org/apache/doris/catalog/TabletTest.java  |   2 +-
 .../doris/clone/ClusterLoadStatisticsTest.java     |   6 +-
 .../org/apache/doris/clone/RebalancerTestUtil.java |   2 +-
 .../org/apache/doris/common/util/UnitTestUtil.java |   2 +-
 .../org/apache/doris/cooldown/CooldownJobTest.java | 130 +++++++
 .../org/apache/doris/http/DorisHttpTestCase.java   |   3 +-
 .../org/apache/doris/load/DeleteHandlerTest.java   |   2 +-
 gensrc/thrift/AgentService.thrift                  |  11 +
 gensrc/thrift/MasterService.thrift                 |   3 +
 gensrc/thrift/Types.thrift                         |   3 +-
 58 files changed, 1459 insertions(+), 82 deletions(-)

diff --git a/be/src/agent/agent_server.cpp b/be/src/agent/agent_server.cpp
index d57777b6e6..c0138a08e1 100644
--- a/be/src/agent/agent_server.cpp
+++ b/be/src/agent/agent_server.cpp
@@ -69,6 +69,7 @@ AgentServer::AgentServer(ExecEnv* exec_env, const 
TMasterInfo& master_info)
 
     CREATE_AND_START_POOL(CREATE_TABLE, _create_tablet_workers);
     CREATE_AND_START_POOL(DROP_TABLE, _drop_tablet_workers);
+
     // Both PUSH and REALTIME_PUSH type use _push_workers
     CREATE_AND_START_POOL(PUSH, _push_workers);
     CREATE_AND_START_POOL(PUBLISH_VERSION, _publish_version_workers);
@@ -84,6 +85,7 @@ AgentServer::AgentServer(ExecEnv* exec_env, const 
TMasterInfo& master_info)
     CREATE_AND_START_POOL(RELEASE_SNAPSHOT, _release_snapshot_workers);
     CREATE_AND_START_POOL(MOVE, _move_dir_workers);
     CREATE_AND_START_POOL(UPDATE_TABLET_META_INFO, 
_update_tablet_meta_info_workers);
+    CREATE_AND_START_THREAD(PUSH_COOLDOWN_CONF, _push_cooldown_conf_workers);
 
     CREATE_AND_START_THREAD(REPORT_TASK, _report_task_workers);
     CREATE_AND_START_THREAD(REPORT_DISK_STATE, _report_disk_state_workers);
@@ -183,6 +185,15 @@ void AgentServer::submit_tasks(TAgentResult& agent_result,
                         signature);
             }
             break;
+        case TTaskType::PUSH_COOLDOWN_CONF:
+            if (task.__isset.push_cooldown_conf) {
+                _push_cooldown_conf_workers->submit_task(task);
+            } else {
+                ret_st = Status::InvalidArgument(
+                        "task(signature={}) has wrong request member = 
push_cooldown_conf",
+                        signature);
+            }
+            break;
         default:
             ret_st = Status::InvalidArgument("task(signature={}, type={}) has 
wrong task type",
                                              signature, task_type);
diff --git a/be/src/agent/agent_server.h b/be/src/agent/agent_server.h
index bfb819dd97..3914f3a236 100644
--- a/be/src/agent/agent_server.h
+++ b/be/src/agent/agent_server.h
@@ -60,6 +60,7 @@ private:
     std::unique_ptr<TaskWorkerPool> _clear_transaction_task_workers;
     std::unique_ptr<TaskWorkerPool> _delete_workers;
     std::unique_ptr<TaskWorkerPool> _alter_tablet_workers;
+    std::unique_ptr<TaskWorkerPool> _push_cooldown_conf_workers;
     std::unique_ptr<TaskWorkerPool> _clone_workers;
     std::unique_ptr<TaskWorkerPool> _storage_medium_migrate_workers;
     std::unique_ptr<TaskWorkerPool> _check_consistency_workers;
diff --git a/be/src/agent/task_worker_pool.cpp 
b/be/src/agent/task_worker_pool.cpp
index 15a3914e6d..29bfd3631b 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -204,6 +204,10 @@ void TaskWorkerPool::start() {
         cb = 
std::bind<void>(&TaskWorkerPool::_storage_update_storage_policy_worker_thread_callback,
                              this);
         break;
+    case TaskWorkerType::PUSH_COOLDOWN_CONF:
+        _worker_count = 1;
+        cb = 
std::bind<void>(&TaskWorkerPool::_push_cooldown_conf_worker_thread_callback, 
this);
+        break;
     default:
         // pass
         break;
@@ -1693,4 +1697,44 @@ void 
TaskWorkerPool::_storage_update_storage_policy_worker_thread_callback() {
     }
 }
 
+void TaskWorkerPool::_push_cooldown_conf_worker_thread_callback() {
+    while (_is_work) {
+        TAgentTaskRequest agent_task_req;
+        TPushCooldownConfReq push_cooldown_conf_req;
+        {
+            std::unique_lock<std::mutex> 
worker_thread_lock(_worker_thread_lock);
+            while (_is_work && _tasks.empty()) {
+                _worker_thread_condition_variable.wait(worker_thread_lock);
+            }
+
+            agent_task_req = _tasks.front();
+            push_cooldown_conf_req = agent_task_req.push_cooldown_conf;
+            _tasks.pop_front();
+        }
+        for (auto cooldown_conf : push_cooldown_conf_req.cooldown_confs) {
+            int64_t tablet_id = cooldown_conf.tablet_id;
+            TabletSharedPtr tablet =
+                    
StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id);
+            if (tablet.get() == nullptr) {
+                std::stringstream ss;
+                ss << "failed to get tablet. tablet_id=" << tablet_id;
+                LOG(WARNING) << ss.str();
+                continue;
+            }
+            if (cooldown_conf.cooldown_term > 
tablet->tablet_meta()->cooldown_term()) {
+                tablet->tablet_meta()->set_cooldown_replica_id_and_term(
+                        cooldown_conf.cooldown_replica_id, 
cooldown_conf.cooldown_term);
+                LOG(INFO) << "push_cooldown_conf successfully. tablet_id=" << 
tablet_id
+                          << ", cooldown_conf: " << 
cooldown_conf.cooldown_replica_id << "("
+                          << cooldown_conf.cooldown_term << ")";
+            } else {
+                LOG(WARNING) << "push_cooldown_conf failed. tablet_id=" << 
tablet_id
+                             << ", cooldown_term: " << 
tablet->tablet_meta()->cooldown_term()
+                             << " -> " << cooldown_conf.cooldown_term;
+            }
+        }
+        _remove_task_info(agent_task_req.task_type, agent_task_req.signature);
+    }
+}
+
 } // namespace doris
diff --git a/be/src/agent/task_worker_pool.h b/be/src/agent/task_worker_pool.h
index ae8b9e8149..f94d8d2ad8 100644
--- a/be/src/agent/task_worker_pool.h
+++ b/be/src/agent/task_worker_pool.h
@@ -67,7 +67,8 @@ public:
         UPDATE_TABLET_META_INFO,
         SUBMIT_TABLE_COMPACTION,
         REFRESH_STORAGE_POLICY,
-        UPDATE_STORAGE_POLICY
+        UPDATE_STORAGE_POLICY,
+        PUSH_COOLDOWN_CONF
     };
 
     enum ReportType { TASK, DISK, TABLET };
@@ -127,6 +128,8 @@ public:
             return "REFRESH_STORAGE_POLICY";
         case UPDATE_STORAGE_POLICY:
             return "UPDATE_STORAGE_POLICY";
+        case PUSH_COOLDOWN_CONF:
+            return "PUSH_COOLDOWN_CONF";
         default:
             return "Unknown";
         }
@@ -192,6 +195,7 @@ private:
     void _submit_table_compaction_worker_thread_callback();
     void _storage_refresh_storage_policy_worker_thread_callback();
     void _storage_update_storage_policy_worker_thread_callback();
+    void _push_cooldown_conf_worker_thread_callback();
 
     void _alter_tablet(const TAgentTaskRequest& alter_tablet_request, int64_t 
signature,
                        const TTaskType::type task_type, TFinishTaskRequest* 
finish_task_request);
diff --git a/be/src/http/action/pad_rowset_action.cpp 
b/be/src/http/action/pad_rowset_action.cpp
index da513750a9..225fec4dd2 100644
--- a/be/src/http/action/pad_rowset_action.cpp
+++ b/be/src/http/action/pad_rowset_action.cpp
@@ -91,6 +91,8 @@ Status PadRowsetAction::_pad_rowset(TabletSharedPtr tablet, 
const Version& versi
     ctx.rowset_state = VISIBLE;
     ctx.segments_overlap = NONOVERLAPPING;
     ctx.tablet_schema = tablet->tablet_schema();
+    ctx.oldest_write_timestamp = UnixSeconds();
+    ctx.newest_write_timestamp = UnixSeconds();
     RETURN_IF_ERROR(tablet->create_rowset_writer(ctx, &writer));
     auto rowset = writer->build();
     rowset->make_visible(version);
diff --git a/be/src/io/fs/file_reader_options.h 
b/be/src/io/fs/file_reader_options.h
index f7cc0d13ab..f5a6eaa8f3 100644
--- a/be/src/io/fs/file_reader_options.h
+++ b/be/src/io/fs/file_reader_options.h
@@ -44,17 +44,19 @@ public:
 class NoCachePathPolicy : public CachePathPolicy {
 public:
     NoCachePathPolicy() = default;
-    std::string get_cache_path(const std::string& path) const override { 
return path; }
+    std::string get_cache_path(const std::string& path) const override { 
return ""; }
 };
 
 class SegmentCachePathPolicy : public CachePathPolicy {
 public:
     SegmentCachePathPolicy() = default;
-    std::string get_cache_path(const std::string& path) const override {
-        // the segment file path is 
{rowset_dir}/{schema_hash}/{rowset_id}_{seg_num}.dat
-        // cache path is: {rowset_dir}/{schema_hash}/{rowset_id}_{seg_num}/
-        return path.substr(0, path.size() - 4) + "/";
-    }
+
+    void set_cache_path(const std::string& cache_path) { _cache_path = 
cache_path; }
+
+    std::string get_cache_path(const std::string& path) const override { 
return _cache_path; }
+
+private:
+    std::string _cache_path;
 };
 
 class FileBlockCachePathPolicy : public CachePathPolicy {
diff --git a/be/src/io/fs/remote_file_system.cpp 
b/be/src/io/fs/remote_file_system.cpp
index a2bfb93d66..563cc79fba 100644
--- a/be/src/io/fs/remote_file_system.cpp
+++ b/be/src/io/fs/remote_file_system.cpp
@@ -36,8 +36,7 @@ Status RemoteFileSystem::open_file(const Path& path, const 
FileReaderOptions& re
     }
     case io::FileCachePolicy::SUB_FILE_CACHE:
     case io::FileCachePolicy::WHOLE_FILE_CACHE: {
-        StringPiece str(path.native());
-        std::string cache_path = 
reader_options.path_policy.get_cache_path(str.as_string());
+        std::string cache_path = 
reader_options.path_policy.get_cache_path(path.native());
         io::FileCachePtr cache_reader = 
FileCacheManager::instance()->new_file_cache(
                 cache_path, config::file_cache_alive_time_sec, raw_reader,
                 reader_options.cache_type);
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index cd7178e1b4..c001b67cad 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -138,6 +138,9 @@ Status DeltaWriter::init() {
     context.rowset_state = PREPARED;
     context.segments_overlap = OVERLAPPING;
     context.tablet_schema = _tablet_schema;
+    context.oldest_write_timestamp = UnixSeconds();
+    context.newest_write_timestamp = UnixSeconds();
+
     RETURN_NOT_OK(_tablet->create_rowset_writer(context, &_rowset_writer));
     _schema.reset(new Schema(_tablet_schema));
     _reset_mem_table();
diff --git a/be/src/olap/push_handler.cpp b/be/src/olap/push_handler.cpp
index 55472c6d82..ad3bb25e79 100644
--- a/be/src/olap/push_handler.cpp
+++ b/be/src/olap/push_handler.cpp
@@ -203,6 +203,8 @@ Status PushHandler::_convert_v2(TabletSharedPtr cur_tablet, 
RowsetSharedPtr* cur
         context.rowset_state = PREPARED;
         context.segments_overlap = OVERLAP_UNKNOWN;
         context.tablet_schema = tablet_schema;
+        context.oldest_write_timestamp = UnixSeconds();
+        context.newest_write_timestamp = UnixSeconds();
         res = cur_tablet->create_rowset_writer(context, &rowset_writer);
         if (!res.ok()) {
             LOG(WARNING) << "failed to init rowset writer, tablet=" << 
cur_tablet->full_name()
@@ -366,6 +368,8 @@ Status PushHandler::_convert(TabletSharedPtr cur_tablet, 
RowsetSharedPtr* cur_ro
         context.rowset_state = PREPARED;
         context.segments_overlap = OVERLAP_UNKNOWN;
         context.tablet_schema = tablet_schema;
+        context.oldest_write_timestamp = UnixSeconds();
+        context.newest_write_timestamp = UnixSeconds();
         res = cur_tablet->create_rowset_writer(context, &rowset_writer);
         if (!res.ok()) {
             LOG(WARNING) << "failed to init rowset writer, tablet=" << 
cur_tablet->full_name()
diff --git a/be/src/olap/rowset/beta_rowset.cpp 
b/be/src/olap/rowset/beta_rowset.cpp
index e15725d811..300a022078 100644
--- a/be/src/olap/rowset/beta_rowset.cpp
+++ b/be/src/olap/rowset/beta_rowset.cpp
@@ -73,6 +73,11 @@ std::string BetaRowset::remote_tablet_path(int64_t 
tablet_id) {
     return fmt::format("{}/{}", DATA_PREFIX, tablet_id);
 }
 
+std::string BetaRowset::remote_tablet_meta_path(int64_t tablet_id, int64_t 
replica_id) {
+    // data/{tablet_id}/{replica_id}.meta
+    return fmt::format("{}/{}.meta", remote_tablet_path(tablet_id), 
replica_id);
+}
+
 std::string BetaRowset::remote_segment_path(int64_t tablet_id, const RowsetId& 
rowset_id,
                                             int segment_id) {
     // data/{tablet_id}/{rowset_id}_{seg_num}.dat
@@ -135,7 +140,12 @@ Status 
BetaRowset::load_segments(std::vector<segment_v2::SegmentSharedPtr>* segm
     for (int seg_id = 0; seg_id < num_segments(); ++seg_id) {
         auto seg_path = segment_file_path(seg_id);
         std::shared_ptr<segment_v2::Segment> segment;
-        auto s = segment_v2::Segment::open(fs, seg_path, seg_id, rowset_id(), 
_schema, &segment);
+        io::SegmentCachePathPolicy cache_policy;
+        cache_policy.set_cache_path(segment_cache_path(seg_id));
+        io::FileReaderOptions 
reader_options(io::cache_type_from_string(config::file_cache_type),
+                                             cache_policy);
+        auto s = segment_v2::Segment::open(fs, seg_path, seg_id, rowset_id(), 
_schema,
+                                           reader_options, &segment);
         if (!s.ok()) {
             LOG(WARNING) << "failed to open segment. " << seg_path << " under 
rowset "
                          << unique_id() << " : " << s.to_string();
@@ -157,7 +167,12 @@ Status BetaRowset::load_segments(int64_t seg_id_begin, 
int64_t seg_id_end,
         }
         auto seg_path = segment_file_path(seg_id);
         std::shared_ptr<segment_v2::Segment> segment;
-        auto s = segment_v2::Segment::open(fs, seg_path, seg_id, rowset_id(), 
_schema, &segment);
+        io::SegmentCachePathPolicy cache_policy;
+        cache_policy.set_cache_path(segment_cache_path(seg_id));
+        io::FileReaderOptions 
reader_options(io::cache_type_from_string(config::file_cache_type),
+                                             cache_policy);
+        auto s = segment_v2::Segment::open(fs, seg_path, seg_id, rowset_id(), 
_schema,
+                                           reader_options, &segment);
         if (!s.ok()) {
             LOG(WARNING) << "failed to open segment. " << seg_path << " under 
rowset "
                          << unique_id() << " : " << s.to_string();
@@ -402,7 +417,12 @@ bool BetaRowset::check_current_rowset_segment() {
     for (int seg_id = 0; seg_id < num_segments(); ++seg_id) {
         auto seg_path = segment_file_path(seg_id);
         std::shared_ptr<segment_v2::Segment> segment;
-        auto s = segment_v2::Segment::open(fs, seg_path, seg_id, rowset_id(), 
_schema, &segment);
+        io::SegmentCachePathPolicy cache_policy;
+        cache_policy.set_cache_path(segment_cache_path(seg_id));
+        io::FileReaderOptions 
reader_options(io::cache_type_from_string(config::file_cache_type),
+                                             cache_policy);
+        auto s = segment_v2::Segment::open(fs, seg_path, seg_id, rowset_id(), 
_schema,
+                                           reader_options, &segment);
         if (!s.ok()) {
             LOG(WARNING) << "segment can not be opened. file=" << seg_path;
             return false;
diff --git a/be/src/olap/rowset/beta_rowset.h b/be/src/olap/rowset/beta_rowset.h
index 5ac860eaa1..83652d4e57 100644
--- a/be/src/olap/rowset/beta_rowset.h
+++ b/be/src/olap/rowset/beta_rowset.h
@@ -63,6 +63,8 @@ public:
 
     static std::string remote_tablet_path(int64_t tablet_id);
 
+    static std::string remote_tablet_meta_path(int64_t tablet_id, int64_t 
replica_id);
+
     Status split_range(const RowCursor& start_key, const RowCursor& end_key,
                        uint64_t request_block_row_count, size_t key_num,
                        std::vector<OlapTuple>* ranges) override;
diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp 
b/be/src/olap/rowset/beta_rowset_writer.cpp
index 6b72b7c9c6..7e3fea581a 100644
--- a/be/src/olap/rowset/beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer.cpp
@@ -385,8 +385,10 @@ Status BetaRowsetWriter::_load_noncompacted_segments(
         auto seg_path =
                 BetaRowset::segment_file_path(_context.rowset_dir, 
_context.rowset_id, seg_id);
         std::shared_ptr<segment_v2::Segment> segment;
+        io::FileReaderOptions 
reader_options(io::cache_type_from_string(config::file_cache_type),
+                                             io::SegmentCachePathPolicy());
         auto s = segment_v2::Segment::open(fs, seg_path, seg_id, rowset_id(),
-                                           _context.tablet_schema, &segment);
+                                           _context.tablet_schema, 
reader_options, &segment);
         if (!s.ok()) {
             LOG(WARNING) << "failed to open segment. " << seg_path << ":" << 
s.to_string();
             return Status::Error<ROWSET_LOAD_FAILED>();
diff --git a/be/src/olap/rowset/segment_v2/segment.cpp 
b/be/src/olap/rowset/segment_v2/segment.cpp
index 15ed949025..b99ea4cff6 100644
--- a/be/src/olap/rowset/segment_v2/segment.cpp
+++ b/be/src/olap/rowset/segment_v2/segment.cpp
@@ -46,9 +46,8 @@ using io::FileCacheManager;
 
 Status Segment::open(io::FileSystemSPtr fs, const std::string& path, uint32_t 
segment_id,
                      RowsetId rowset_id, TabletSchemaSPtr tablet_schema,
+                     const io::FileReaderOptions& reader_options,
                      std::shared_ptr<Segment>* output) {
-    io::FileReaderOptions 
reader_options(io::cache_type_from_string(config::file_cache_type),
-                                         io::SegmentCachePathPolicy());
     io::FileReaderSPtr file_reader;
 #ifndef BE_TEST
     RETURN_IF_ERROR(fs->open_file(path, reader_options, &file_reader, 
nullptr));
diff --git a/be/src/olap/rowset/segment_v2/segment.h 
b/be/src/olap/rowset/segment_v2/segment.h
index 70d3450901..1ec4943cf2 100644
--- a/be/src/olap/rowset/segment_v2/segment.h
+++ b/be/src/olap/rowset/segment_v2/segment.h
@@ -63,6 +63,7 @@ class Segment : public std::enable_shared_from_this<Segment> {
 public:
     static Status open(io::FileSystemSPtr fs, const std::string& path, 
uint32_t segment_id,
                        RowsetId rowset_id, TabletSchemaSPtr tablet_schema,
+                       const io::FileReaderOptions& reader_options,
                        std::shared_ptr<Segment>* output);
 
     ~Segment();
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index a67ace80a5..c4b0f835aa 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -75,6 +75,7 @@ using std::nothrow;
 using std::sort;
 using std::string;
 using std::vector;
+using io::FileSystemSPtr;
 
 DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(flush_bytes, MetricUnit::BYTES);
 DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(flush_finish_count, 
MetricUnit::OPERATIONS);
@@ -1438,6 +1439,10 @@ void Tablet::build_tablet_report_info(TTabletInfo* 
tablet_info,
     
tablet_info->__set_is_in_memory(_tablet_meta->tablet_schema()->is_in_memory());
     tablet_info->__set_replica_id(replica_id());
     tablet_info->__set_remote_data_size(_tablet_meta->tablet_remote_size());
+    tablet_info->__set_is_cooldown(!_tablet_meta->storage_policy().empty());
+    if (tablet_info->is_cooldown) {
+        
tablet_info->__set_cooldown_replica_id(_tablet_meta->cooldown_replica_id());
+    }
 }
 
 // should use this method to get a copy of current tablet meta
@@ -1598,6 +1603,8 @@ Status Tablet::create_initial_rowset(const int64_t 
req_version) {
         context.rowset_state = VISIBLE;
         context.segments_overlap = OVERLAP_UNKNOWN;
         context.tablet_schema = tablet_schema();
+        context.oldest_write_timestamp = UnixSeconds();
+        context.newest_write_timestamp = UnixSeconds();
         res = create_rowset_writer(context, &rs_writer);
 
         if (!res.ok()) {
@@ -1683,11 +1690,22 @@ Status Tablet::cooldown() {
         LOG(WARNING) << "Failed to own cumu_compaction_lock. tablet=" << 
tablet_id();
         return Status::Error<TRY_LOCK_FAILED>();
     }
+
+    if (_tablet_meta->cooldown_replica_id() == _tablet_meta->replica_id()) {
+        RETURN_IF_ERROR(_cooldown_data());
+    } else {
+        RETURN_IF_ERROR(_follow_cooldowned_data());
+    }
+    return Status::OK();
+}
+
+Status Tablet::_cooldown_data() {
     auto dest_fs = io::FileSystemMap::instance()->get(storage_policy());
     if (!dest_fs) {
         return Status::Error<UNINITIALIZED>();
     }
     DCHECK(dest_fs->type() == io::FileSystemType::S3);
+
     auto old_rowset = pick_cooldown_rowset();
     if (!old_rowset) {
         LOG(WARNING) << "Cannot pick cooldown rowset in tablet " << 
tablet_id();
@@ -1717,27 +1735,125 @@ Status Tablet::cooldown() {
     new_rowset_meta->set_resource_id(dest_fs->resource_id());
     new_rowset_meta->set_fs(dest_fs);
     new_rowset_meta->set_creation_time(time(nullptr));
+
+    // write remote tablet meta
+    TabletMetaPB remote_tablet_meta_pb;
+    _tablet_meta->to_meta_pb(true, &remote_tablet_meta_pb);
+    new_rowset_meta->to_rowset_pb(remote_tablet_meta_pb.add_rs_metas());
+    // upload rowset_meta to remote fs.
+    RETURN_IF_ERROR(_write_remote_tablet_meta(dest_fs, remote_tablet_meta_pb));
+
     RowsetSharedPtr new_rowset;
     RowsetFactory::create_rowset(_schema, _tablet_path, new_rowset_meta, 
&new_rowset);
 
     std::vector to_add {std::move(new_rowset)};
     std::vector to_delete {std::move(old_rowset)};
 
-    bool has_shutdown = false;
     {
         std::unique_lock meta_wlock(_meta_lock);
-        has_shutdown = tablet_state() == TABLET_SHUTDOWN;
-        if (!has_shutdown) {
+        if (tablet_state() != TABLET_SHUTDOWN) {
             modify_rowsets(to_add, to_delete);
-            _self_owned_remote_rowsets.insert(to_add.front());
             save_meta();
         }
     }
-    if (has_shutdown) {
-        record_unused_remote_rowset(new_rowset_id, dest_fs->resource_id(),
-                                    to_add.front()->num_segments());
-        return Status::Aborted("tablet {} has shutdown", tablet_id());
+    return Status::OK();
+}
+
+Status Tablet::_read_remote_tablet_meta(FileSystemSPtr fs, TabletMetaPB* 
tablet_meta_pb) {
+    std::string remote_meta_path =
+            BetaRowset::remote_tablet_meta_path(tablet_id(), 
_tablet_meta->cooldown_replica_id());
+    bool exist = false;
+    RETURN_IF_ERROR(fs->exists(remote_meta_path, &exist));
+    if (exist) {
+        IOContext io_ctx;
+        io::FileReaderSPtr tablet_meta_reader;
+        RETURN_IF_ERROR(fs->open_file(remote_meta_path, &tablet_meta_reader, 
&io_ctx));
+        if (tablet_meta_reader == nullptr) {
+            return Status::InternalError("tablet_meta_reader is null");
+        }
+        auto file_size = tablet_meta_reader->size();
+        size_t bytes_read;
+        uint8_t* buf = new uint8_t[file_size];
+        Slice slice(buf, file_size);
+        Status st = tablet_meta_reader->read_at(0, slice, io_ctx, &bytes_read);
+        if (!st.ok()) {
+            tablet_meta_reader->close();
+            return st;
+        }
+        tablet_meta_reader->close();
+        if (!tablet_meta_pb->ParseFromString(slice.to_string())) {
+            LOG(WARNING) << "parse tablet meta failed";
+            return Status::InternalError("parse tablet meta failed");
+        }
+    }
+    LOG(INFO) << "No tablet meta file founded, init needed. tablet_id: " << 
tablet_id();
+    return Status::InternalError("No tablet meta file founded, init needed.");
+}
+
+Status Tablet::_write_remote_tablet_meta(FileSystemSPtr fs, const 
TabletMetaPB& tablet_meta_pb) {
+    std::string remote_meta_path =
+            BetaRowset::remote_tablet_meta_path(tablet_id(), 
_tablet_meta->replica_id());
+    io::FileWriterPtr tablet_meta_writer;
+    RETURN_IF_ERROR(fs->create_file(remote_meta_path, &tablet_meta_writer));
+    if (tablet_meta_writer == nullptr) {
+        return Status::InternalError("tablet_meta_writer is null");
+    }
+    string value;
+    tablet_meta_pb.SerializeToString(&value);
+    uint8_t* buf = new uint8_t[value.size()];
+    memcpy(buf, value.c_str(), value.size());
+    Slice slice(buf, value.size());
+    Status st = tablet_meta_writer->appendv(&slice, 1);
+    if (!st.ok()) {
+        tablet_meta_writer->close();
+        return st;
     }
+    tablet_meta_writer->close();
+    return Status::OK();
+}
+
+Status Tablet::_follow_cooldowned_data() {
+    auto dest_fs = io::FileSystemMap::instance()->get(storage_policy());
+    if (!dest_fs) {
+        return Status::InternalError("storage_policy doesn't exist: " + 
storage_policy());
+    }
+    DCHECK(dest_fs->type() == io::FileSystemType::S3);
+    TabletMetaPB remote_tablet_meta_pb;
+    RETURN_IF_ERROR(_read_remote_tablet_meta(dest_fs, &remote_tablet_meta_pb));
+    int64_t max_version = -1;
+    for (auto& rowset_meta_pb : remote_tablet_meta_pb.rs_metas()) {
+        if (max_version < rowset_meta_pb.end_version()) {
+            max_version = rowset_meta_pb.end_version();
+        }
+    }
+
+    std::vector<RowsetSharedPtr> to_add;
+    std::vector<RowsetSharedPtr> to_delete;
+    for (auto& rowset_meta_pb : remote_tablet_meta_pb.rs_metas()) {
+        if (rowset_meta_pb.end_version() > max_version) {
+            continue;
+        }
+        RowsetMetaSharedPtr rowset_meta(new RowsetMeta());
+        rowset_meta->init_from_pb(rowset_meta_pb);
+        RowsetSharedPtr new_rowset;
+        RowsetFactory::create_rowset(_schema, _tablet_path, rowset_meta, 
&new_rowset);
+        to_add.push_back(new_rowset);
+    }
+
+    {
+        std::shared_lock meta_rlock(_meta_lock);
+        for (const auto& it : _rs_version_map) {
+            auto& rs = it.second;
+            if (rs->end_version() <= max_version) {
+                to_delete.push_back(rs);
+            }
+        }
+        if (tablet_state() != TABLET_SHUTDOWN) {
+            modify_rowsets(to_add, to_delete);
+            save_meta();
+        }
+    }
+
     return Status::OK();
 }
 
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index c82fa461a1..e10b9628ac 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -379,6 +379,10 @@ private:
                                 RowsetIdUnorderedSet* to_add, 
RowsetIdUnorderedSet* to_del);
     Status _load_rowset_segments(const RowsetSharedPtr& rowset,
                                  std::vector<segment_v2::SegmentSharedPtr>* 
segments);
+    Status _cooldown_data();
+    Status _follow_cooldowned_data();
+    Status _read_remote_tablet_meta(io::FileSystemSPtr fs, TabletMetaPB* 
tablet_meta_pb);
+    Status _write_remote_tablet_meta(io::FileSystemSPtr fs, const 
TabletMetaPB& tablet_meta_pb);
 
 public:
     static const int64_t K_INVALID_CUMULATIVE_POINT = -1;
diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp
index e3d4c6f3ff..936817bca6 100644
--- a/be/src/olap/tablet_meta.cpp
+++ b/be/src/olap/tablet_meta.cpp
@@ -253,6 +253,8 @@ TabletMeta::TabletMeta(const TabletMeta& b)
           _in_restore_mode(b._in_restore_mode),
           _preferred_rowset_type(b._preferred_rowset_type),
           _storage_policy(b._storage_policy),
+          _cooldown_replica_id(b._cooldown_replica_id),
+          _cooldown_term(b._cooldown_term),
           
_enable_unique_key_merge_on_write(b._enable_unique_key_merge_on_write),
           _delete_bitmap(b._delete_bitmap) {};
 
@@ -521,6 +523,8 @@ void TabletMeta::init_from_pb(const TabletMetaPB& 
tablet_meta_pb) {
     }
 
     _storage_policy = tablet_meta_pb.storage_policy();
+    _cooldown_replica_id = -1;
+    _cooldown_term = -1;
     if (tablet_meta_pb.has_enable_unique_key_merge_on_write()) {
         _enable_unique_key_merge_on_write = 
tablet_meta_pb.enable_unique_key_merge_on_write();
     }
@@ -544,6 +548,10 @@ void TabletMeta::init_from_pb(const TabletMetaPB& 
tablet_meta_pb) {
 }
 
 void TabletMeta::to_meta_pb(TabletMetaPB* tablet_meta_pb) {
+    to_meta_pb(false, tablet_meta_pb);
+}
+
+void TabletMeta::to_meta_pb(bool only_include_remote_rowset, TabletMetaPB* 
tablet_meta_pb) {
     tablet_meta_pb->set_table_id(table_id());
     tablet_meta_pb->set_partition_id(partition_id());
     tablet_meta_pb->set_tablet_id(tablet_id());
@@ -573,7 +581,9 @@ void TabletMeta::to_meta_pb(TabletMetaPB* tablet_meta_pb) {
     }
 
     for (auto& rs : _rs_metas) {
-        rs->to_rowset_pb(tablet_meta_pb->add_rs_metas());
+        if ((only_include_remote_rowset && !rs->is_local()) || 
!only_include_remote_rowset) {
+            rs->to_rowset_pb(tablet_meta_pb->add_rs_metas());
+        }
     }
     for (auto rs : _stale_rs_metas) {
         rs->to_rowset_pb(tablet_meta_pb->add_stale_rs_metas());
@@ -866,6 +876,12 @@ bool operator==(const TabletMeta& a, const TabletMeta& b) {
     if (a._in_restore_mode != b._in_restore_mode) return false;
     if (a._preferred_rowset_type != b._preferred_rowset_type) return false;
     if (a._storage_policy != b._storage_policy) return false;
+    if (a._cooldown_replica_id != b._cooldown_replica_id) {
+        return false;
+    }
+    if (a._cooldown_term != b._cooldown_term) {
+        return false;
+    }
     return true;
 }
 
diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h
index 3655b71602..f287662ad2 100644
--- a/be/src/olap/tablet_meta.h
+++ b/be/src/olap/tablet_meta.h
@@ -114,6 +114,7 @@ public:
     void init_rs_metas_fs(const io::FileSystemSPtr& fs);
 
     void to_meta_pb(TabletMetaPB* tablet_meta_pb);
+    void to_meta_pb(bool only_include_remote_rowset, TabletMetaPB* 
tablet_meta_pb);
     void to_json(std::string* json_string, json2pb::Pb2JsonOptions& options);
     uint32_t mem_size() const;
 
@@ -200,6 +201,18 @@ public:
         _storage_policy = policy;
     }
 
+    const int64_t cooldown_replica_id() const { return _cooldown_replica_id; }
+
+    void set_cooldown_replica_id_and_term(int64_t cooldown_replica_id, int64_t 
cooldown_term) {
+        VLOG_NOTICE << "set tablet_id : " << _table_id << " 
cooldown_replica_id from "
+                    << _cooldown_replica_id << " to " << cooldown_replica_id
+                    << ", cooldown_term from " << _cooldown_term << " to " << 
cooldown_term;
+        _cooldown_replica_id = cooldown_replica_id;
+        _cooldown_term = cooldown_term;
+    }
+
+    const int64_t cooldown_term() const { return _cooldown_term; }
+
     static void init_column_from_tcolumn(uint32_t unique_id, const TColumn& 
tcolumn,
                                          ColumnPB* column);
 
@@ -243,6 +256,8 @@ private:
     RowsetTypePB _preferred_rowset_type = BETA_ROWSET;
 
     std::string _storage_policy;
+    int64_t _cooldown_replica_id = -1;
+    int64_t _cooldown_term = -1;
 
     // For unique key data model, the feature Merge-on-Write will leverage a 
primary
     // key index and a delete-bitmap to mark duplicate keys as deleted in load 
stage,
diff --git a/be/test/io/cache/remote_file_cache_test.cpp 
b/be/test/io/cache/remote_file_cache_test.cpp
index 176e306a8a..1a1ad93750 100644
--- a/be/test/io/cache/remote_file_cache_test.cpp
+++ b/be/test/io/cache/remote_file_cache_test.cpp
@@ -141,7 +141,9 @@ protected:
         EXPECT_NE("", writer.min_encoded_key().to_string());
         EXPECT_NE("", writer.max_encoded_key().to_string());
 
-        st = segment_v2::Segment::open(fs, path, 0, {}, query_schema, res);
+        io::FileReaderOptions reader_options(io::FileCachePolicy::NO_CACHE,
+                                             io::SegmentCachePathPolicy());
+        st = segment_v2::Segment::open(fs, path, 0, {}, query_schema, 
reader_options, res);
         EXPECT_TRUE(st.ok());
         EXPECT_EQ(nrows, (*res)->num_rows());
     }
diff --git a/be/test/tools/benchmark_tool.cpp b/be/test/tools/benchmark_tool.cpp
index 1bc3decb4b..b408570a35 100644
--- a/be/test/tools/benchmark_tool.cpp
+++ b/be/test/tools/benchmark_tool.cpp
@@ -363,7 +363,9 @@ public:
         writer.finalize(&file_size, &index_size);
         file_writer->close();
 
-        Segment::open(fs, path, "", seg_id, {}, &_tablet_schema, res);
+        io::FileReaderOptions reader_options(io::FileCachePolicy::NO_CACHE,
+                                             io::SegmentCachePathPolicy());
+        Segment::open(fs, path, seg_id, {}, &_tablet_schema, reader_options, 
res);
     }
 
     std::vector<std::vector<std::string>> generate_dataset(int rows_number) {
diff --git 
a/fe/fe-common/src/main/java/org/apache/doris/common/FeMetaVersion.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/FeMetaVersion.java
index 1e28dd1099..2ef346a4f3 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/FeMetaVersion.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/FeMetaVersion.java
@@ -48,8 +48,10 @@ public final class FeMetaVersion {
     public static final int VERSION_113 = 113;
     // add new recover info for recover ddl
     public static final int VERSION_114 = 114;
+    // change replica meta to json
+    public static final int VERSION_115 = 115;
     // note: when increment meta version, should assign the latest version to 
VERSION_CURRENT
-    public static final int VERSION_CURRENT = VERSION_114;
+    public static final int VERSION_CURRENT = VERSION_115;
 
     // all logs meta version should >= the minimum version, so that we could 
remove many if clause, for example
     // if (FE_METAVERSION < VERSION_94) ...
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java
index f6624c0b81..77568a1109 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java
@@ -376,9 +376,10 @@ public class MaterializedViewHandler extends AlterHandler {
             // index state is SHADOW
             MaterializedIndex mvIndex = new MaterializedIndex(mvIndexId, 
IndexState.SHADOW);
             MaterializedIndex baseIndex = partition.getIndex(baseIndexId);
-            TabletMeta mvTabletMeta = new TabletMeta(dbId, tableId, 
partitionId, mvIndexId, mvSchemaHash, medium);
             short replicationNum = 
olapTable.getPartitionInfo().getReplicaAllocation(partitionId).getTotalReplicaNum();
             for (Tablet baseTablet : baseIndex.getTablets()) {
+                TabletMeta mvTabletMeta = new TabletMeta(
+                        dbId, tableId, partitionId, mvIndexId, mvSchemaHash, 
medium, -1, 0);
                 long baseTabletId = baseTablet.getId();
                 long mvTabletId = idGeneratorBuffer.getNextId();
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
index c600c15f6d..78e17db6dc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
@@ -654,10 +654,10 @@ public class RollupJobV2 extends AlterJobV2 implements 
GsonPostProcessable {
         for (Long partitionId : partitionIdToRollupIndex.keySet()) {
             MaterializedIndex rollupIndex = 
partitionIdToRollupIndex.get(partitionId);
             TStorageMedium medium = 
tbl.getPartitionInfo().getDataProperty(partitionId).getStorageMedium();
-            TabletMeta rollupTabletMeta = new TabletMeta(dbId, tableId, 
partitionId, rollupIndexId,
-                    rollupSchemaHash, medium);
 
             for (Tablet rollupTablet : rollupIndex.getTablets()) {
+                TabletMeta rollupTabletMeta = new TabletMeta(dbId, tableId, 
partitionId, rollupIndexId,
+                        rollupSchemaHash, medium, -1, 0);
                 invertedIndex.addTablet(rollupTablet.getId(), 
rollupTabletMeta);
                 for (Replica rollupReplica : rollupTablet.getReplicas()) {
                     invertedIndex.addReplica(rollupTablet.getId(), 
rollupReplica);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
index d009aae41e..8429d9e975 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
@@ -1486,11 +1486,11 @@ public class SchemaChangeHandler extends AlterHandler {
                 // index state is SHADOW
                 MaterializedIndex shadowIndex = new 
MaterializedIndex(shadowIndexId, IndexState.SHADOW);
                 MaterializedIndex originIndex = 
partition.getIndex(originIndexId);
-                TabletMeta shadowTabletMeta = new TabletMeta(dbId, tableId, 
partitionId, shadowIndexId, newSchemaHash,
-                        medium);
                 ReplicaAllocation replicaAlloc = 
olapTable.getPartitionInfo().getReplicaAllocation(partitionId);
                 Short totalReplicaNum = replicaAlloc.getTotalReplicaNum();
                 for (Tablet originTablet : originIndex.getTablets()) {
+                    TabletMeta shadowTabletMeta = new TabletMeta(dbId, 
tableId, partitionId, shadowIndexId,
+                            newSchemaHash, medium, -1, 0);
                     long originTabletId = originTablet.getId();
                     long shadowTabletId = idGeneratorBuffer.getNextId();
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
index 51321cac8e..e3a303b2ba 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
@@ -742,10 +742,10 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
                 MaterializedIndex shadowIndex = cell.getValue();
 
                 TStorageMedium medium = 
olapTable.getPartitionInfo().getDataProperty(partitionId).getStorageMedium();
-                TabletMeta shadowTabletMeta = new TabletMeta(dbId, tableId, 
partitionId, shadowIndexId,
-                        
indexSchemaVersionAndHashMap.get(shadowIndexId).schemaHash, medium);
 
                 for (Tablet shadownTablet : shadowIndex.getTablets()) {
+                    TabletMeta shadowTabletMeta = new TabletMeta(dbId, 
tableId, partitionId, shadowIndexId,
+                            
indexSchemaVersionAndHashMap.get(shadowIndexId).schemaHash, medium, -1, 0);
                     invertedIndex.addTablet(shadownTablet.getId(), 
shadowTabletMeta);
                     for (Replica shadowReplica : shadownTablet.getReplicas()) {
                         invertedIndex.addReplica(shadownTablet.getId(), 
shadowReplica);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
index dad2bf3b11..154e86aa15 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
@@ -985,9 +985,10 @@ public class RestoreJob extends AbstractJob {
         double bfFpp = localTbl.getBfFpp();
         for (MaterializedIndex restoredIdx : 
restorePart.getMaterializedIndices(IndexExtState.VISIBLE)) {
             MaterializedIndexMeta indexMeta = 
localTbl.getIndexMetaByIndexId(restoredIdx.getId());
-            TabletMeta tabletMeta = new TabletMeta(db.getId(), 
localTbl.getId(), restorePart.getId(),
-                    restoredIdx.getId(), indexMeta.getSchemaHash(), 
TStorageMedium.HDD);
             for (Tablet restoreTablet : restoredIdx.getTablets()) {
+                TabletMeta tabletMeta = new TabletMeta(db.getId(), 
localTbl.getId(), restorePart.getId(),
+                        restoredIdx.getId(), indexMeta.getSchemaHash(), 
TStorageMedium.HDD,
+                        restoreTablet.getCooldownReplicaId(), 
restoreTablet.getCooldownTerm());
                 Env.getCurrentInvertedIndex().addTablet(restoreTablet.getId(), 
tabletMeta);
                 for (Replica restoreReplica : restoreTablet.getReplicas()) {
                     
Env.getCurrentInvertedIndex().addReplica(restoreTablet.getId(), restoreReplica);
@@ -1172,9 +1173,10 @@ public class RestoreJob extends AbstractJob {
             // modify tablet inverted index
             for (MaterializedIndex restoreIdx : 
restorePart.getMaterializedIndices(IndexExtState.VISIBLE)) {
                 int schemaHash = 
localTbl.getSchemaHashByIndexId(restoreIdx.getId());
-                TabletMeta tabletMeta = new TabletMeta(db.getId(), 
localTbl.getId(), restorePart.getId(),
-                        restoreIdx.getId(), schemaHash, TStorageMedium.HDD);
                 for (Tablet restoreTablet : restoreIdx.getTablets()) {
+                    TabletMeta tabletMeta = new TabletMeta(db.getId(), 
localTbl.getId(), restorePart.getId(),
+                            restoreIdx.getId(), schemaHash, 
TStorageMedium.HDD, restoreTablet.getCooldownReplicaId(),
+                            restoreTablet.getCooldownTerm());
                     
Env.getCurrentInvertedIndex().addTablet(restoreTablet.getId(), tabletMeta);
                     for (Replica restoreReplica : restoreTablet.getReplicas()) 
{
                         
Env.getCurrentInvertedIndex().addReplica(restoreTablet.getId(), restoreReplica);
@@ -1204,9 +1206,10 @@ public class RestoreJob extends AbstractJob {
                 for (Partition restorePart : olapRestoreTbl.getPartitions()) {
                     for (MaterializedIndex restoreIdx : 
restorePart.getMaterializedIndices(IndexExtState.VISIBLE)) {
                         int schemaHash = 
olapRestoreTbl.getSchemaHashByIndexId(restoreIdx.getId());
-                        TabletMeta tabletMeta = new TabletMeta(db.getId(), 
restoreTbl.getId(), restorePart.getId(),
-                                restoreIdx.getId(), schemaHash, 
TStorageMedium.HDD);
                         for (Tablet restoreTablet : restoreIdx.getTablets()) {
+                            TabletMeta tabletMeta = new TabletMeta(db.getId(), 
restoreTbl.getId(), restorePart.getId(),
+                                    restoreIdx.getId(), schemaHash, 
TStorageMedium.HDD,
+                                    restoreTablet.getCooldownReplicaId(), 
restoreTablet.getCooldownTerm());
                             
Env.getCurrentInvertedIndex().addTablet(restoreTablet.getId(), tabletMeta);
                             for (Replica restoreReplica : 
restoreTablet.getReplicas()) {
                                 
Env.getCurrentInvertedIndex().addReplica(restoreTablet.getId(), restoreReplica);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java
index 011df7eb15..ec2588ae2e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java
@@ -839,8 +839,9 @@ public class CatalogRecycleBin extends MasterDaemon 
implements Writable {
                 for (MaterializedIndex index : 
partition.getMaterializedIndices(IndexExtState.ALL)) {
                     long indexId = index.getId();
                     int schemaHash = olapTable.getSchemaHashByIndexId(indexId);
-                    TabletMeta tabletMeta = new TabletMeta(dbId, tableId, 
partitionId, indexId, schemaHash, medium);
                     for (Tablet tablet : index.getTablets()) {
+                        TabletMeta tabletMeta = new TabletMeta(dbId, tableId, 
partitionId, indexId, schemaHash, medium,
+                                tablet.getCooldownReplicaId(), 
tablet.getCooldownTerm());
                         long tabletId = tablet.getId();
                         invertedIndex.addTablet(tabletId, tabletMeta);
                         for (Replica replica : tablet.getReplicas()) {
@@ -891,8 +892,9 @@ public class CatalogRecycleBin extends MasterDaemon 
implements Writable {
             for (MaterializedIndex index : 
partition.getMaterializedIndices(IndexExtState.ALL)) {
                 long indexId = index.getId();
                 int schemaHash = olapTable.getSchemaHashByIndexId(indexId);
-                TabletMeta tabletMeta = new TabletMeta(dbId, tableId, 
partitionId, indexId, schemaHash, medium);
                 for (Tablet tablet : index.getTablets()) {
+                    TabletMeta tabletMeta = new TabletMeta(dbId, tableId, 
partitionId, indexId, schemaHash, medium,
+                            tablet.getCooldownReplicaId(), 
tablet.getCooldownTerm());
                     long tabletId = tablet.getId();
                     invertedIndex.addTablet(tabletId, tabletMeta);
                     for (Replica replica : tablet.getReplicas()) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 83bde25845..112360a3b3 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -121,6 +121,7 @@ import org.apache.doris.common.util.SmallFileMgr;
 import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.common.util.Util;
 import org.apache.doris.consistency.ConsistencyChecker;
+import org.apache.doris.cooldown.CooldownHandler;
 import org.apache.doris.datasource.CatalogIf;
 import org.apache.doris.datasource.CatalogMgr;
 import org.apache.doris.datasource.EsExternalCatalog;
@@ -317,6 +318,7 @@ public class Env {
     private DeleteHandler deleteHandler;
     private DbUsedDataQuotaInfoCollector dbUsedDataQuotaInfoCollector;
     private PartitionInMemoryInfoCollector partitionInMemoryInfoCollector;
+    private CooldownHandler cooldownHandler;
     private MetastoreEventsProcessor metastoreEventsProcessor;
 
     private MasterDaemon labelCleaner; // To clean old LabelInfo, 
ExportJobInfos
@@ -551,6 +553,7 @@ public class Env {
         this.deleteHandler = new DeleteHandler();
         this.dbUsedDataQuotaInfoCollector = new DbUsedDataQuotaInfoCollector();
         this.partitionInMemoryInfoCollector = new 
PartitionInMemoryInfoCollector();
+        this.cooldownHandler = new CooldownHandler();
         this.metastoreEventsProcessor = new MetastoreEventsProcessor();
 
         this.replayedJournalId = new AtomicLong(0L);
@@ -1399,6 +1402,9 @@ public class Env {
         dbUsedDataQuotaInfoCollector.start();
         // start daemon thread to update global partition in memory 
information periodically
         partitionInMemoryInfoCollector.start();
+        if (Config.cooldown_single_remote_file) {
+            cooldownHandler.start();
+        }
         streamLoadRecordMgr.start();
         getInternalCatalog().getIcebergTableCreationRecordMgr().start();
         new InternalSchemaInitializer().start();
@@ -1781,6 +1787,12 @@ public class Env {
         return checksum;
     }
 
+    public long loadCooldownJob(DataInputStream dis, long checksum) throws 
IOException {
+        cooldownHandler.readField(dis);
+        LOG.info("finished replay loadCooldownJob from image");
+        return checksum;
+    }
+
     public long loadAlterJob(DataInputStream dis, long checksum) throws 
IOException {
         long newChecksum = checksum;
         for (JobType type : JobType.values()) {
@@ -2231,6 +2243,14 @@ public class Env {
         return checksum;
     }
 
+    /**
+     * Save CooldownJob.
+     */
+    public long saveCooldownJob(CountingDataOutputStream out, long checksum) 
throws IOException {
+        Env.getCurrentEnv().getCooldownHandler().write(out);
+        return checksum;
+    }
+
     public long saveMTMVJobManager(CountingDataOutputStream out, long 
checksum) throws IOException {
         if (Config.enable_mtmv_scheduler_framework) {
             Env.getCurrentEnv().getMTMVJobManager().write(out, checksum);
@@ -3438,6 +3458,10 @@ public class Env {
         return (MaterializedViewHandler) 
this.alter.getMaterializedViewHandler();
     }
 
+    public CooldownHandler getCooldownHandler() {
+        return cooldownHandler;
+    }
+
     public SystemHandler getClusterHandler() {
         return (SystemHandler) this.alter.getClusterHandler();
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
index fbe60b74c8..7f594afa7d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
@@ -22,8 +22,11 @@ import org.apache.doris.clone.TabletSchedCtx;
 import org.apache.doris.clone.TabletSchedCtx.Priority;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.FeMetaVersion;
 import org.apache.doris.common.Pair;
+import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
+import org.apache.doris.persist.gson.GsonUtils;
 import org.apache.doris.resource.Tag;
 import org.apache.doris.system.Backend;
 import org.apache.doris.system.SystemInfoService;
@@ -87,6 +90,10 @@ public class Tablet extends MetaObject implements Writable {
     private long checkedVersionHash;
     @SerializedName(value = "isConsistent")
     private boolean isConsistent;
+    @SerializedName(value = "cooldownReplicaId")
+    private long cooldownReplicaId;
+    @SerializedName(value = "cooldownTerm")
+    private long cooldownTerm;
 
     // last time that the tablet checker checks this tablet.
     // no need to persist
@@ -136,6 +143,22 @@ public class Tablet extends MetaObject implements Writable 
{
         return isConsistent;
     }
 
+    public long getCooldownReplicaId() {
+        return cooldownReplicaId;
+    }
+
+    public void setCooldownReplicaId(long cooldownReplicaId) {
+        this.cooldownReplicaId = cooldownReplicaId;
+    }
+
+    public long getCooldownTerm() {
+        return cooldownTerm;
+    }
+
+    public void setCooldownTerm(long cooldownTerm) {
+        this.cooldownTerm = cooldownTerm;
+    }
+
     private boolean deleteRedundantReplica(long backendId, long version) {
         boolean delete = false;
         boolean hasBackend = false;
@@ -326,18 +349,8 @@ public class Tablet extends MetaObject implements Writable 
{
 
     @Override
     public void write(DataOutput out) throws IOException {
-        super.write(out);
-
-        out.writeLong(id);
-        int replicaCount = replicas.size();
-        out.writeInt(replicaCount);
-        for (int i = 0; i < replicaCount; ++i) {
-            replicas.get(i).write(out);
-        }
-
-        out.writeLong(checkedVersion);
-        out.writeLong(checkedVersionHash);
-        out.writeBoolean(isConsistent);
+        String json = GsonUtils.GSON.toJson(this);
+        Text.writeString(out, json);
     }
 
     @Override
@@ -359,6 +372,11 @@ public class Tablet extends MetaObject implements Writable 
{
     }
 
     public static Tablet read(DataInput in) throws IOException {
+        if (Env.getCurrentEnvJournalVersion() >= FeMetaVersion.VERSION_115) {
+            String json = Text.readString(in);
+            return GsonUtils.GSON.fromJson(json, Tablet.class);
+        }
+
         Tablet tablet = new Tablet();
         tablet.readFields(in);
         return tablet;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
index 73fff8fd70..98fcf380f6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
@@ -46,6 +46,7 @@ import org.apache.logging.log4j.Logger;
 
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -65,7 +66,7 @@ public class TabletInvertedIndex {
     public static final int NOT_EXIST_VALUE = -1;
 
     public static final TabletMeta NOT_EXIST_TABLET_META = new 
TabletMeta(NOT_EXIST_VALUE, NOT_EXIST_VALUE,
-            NOT_EXIST_VALUE, NOT_EXIST_VALUE, NOT_EXIST_VALUE, 
TStorageMedium.HDD);
+            NOT_EXIST_VALUE, NOT_EXIST_VALUE, NOT_EXIST_VALUE, 
TStorageMedium.HDD, -1, -1);
 
     private StampedLock lock = new StampedLock();
 
@@ -124,7 +125,8 @@ public class TabletInvertedIndex {
                              Map<Long, ListMultimap<Long, 
TPartitionVersionInfo>> transactionsToPublish,
                              ListMultimap<Long, Long> transactionsToClear,
                              ListMultimap<Long, Long> tabletRecoveryMap,
-                             List<Triple<Long, Integer, Boolean>> 
tabletToInMemory) {
+                             List<Triple<Long, Integer, Boolean>> 
tabletToInMemory,
+                             Map<Long, TabletMeta> syncCooldownTabletMap) {
         long stamp = readLock();
         long start = System.currentTimeMillis();
         try {
@@ -186,6 +188,11 @@ public class TabletInvertedIndex {
                                 }
                             }
 
+                            if (Config.cooldown_single_remote_file
+                                    && needChangeCooldownConf(tabletMeta, 
backendTabletInfo)) {
+                                
syncCooldownTabletMap.put(backendTabletInfo.getTabletId(), tabletMeta);
+                            }
+
                             long partitionId = tabletMeta.getPartitionId();
                             if (!Config.disable_storage_medium_check) {
                                 // check if need migration
@@ -328,6 +335,50 @@ public class TabletInvertedIndex {
         return false;
     }
 
+    private boolean needChangeCooldownConf(TabletMeta tabletMeta, TTabletInfo 
beTabletInfo) {
+        if (!beTabletInfo.isIsCooldown()) {
+            return false;
+        }
+        // check cooldown type in fe and be, they need to be the same.
+        if (tabletMeta.getCooldownReplicaId() != 
beTabletInfo.getCooldownReplicaId()) {
+            LOG.warn("cooldownReplicaId is wrong for tablet: {}, Fe: {}, Be: 
{}", beTabletInfo.getTabletId(),
+                    tabletMeta.getCooldownReplicaId(), 
beTabletInfo.getCooldownReplicaId());
+            return true;
+        }
+        // check cooldown type in one tablet, One UPLOAD_DATA is needed in the 
replicas.
+        long stamp = readLock();
+        try {
+            boolean replicaInvalid = true;
+            Map<Long, Replica> replicaMap = 
replicaMetaTable.row(beTabletInfo.getTabletId());
+            for (Map.Entry<Long, Replica> entry : replicaMap.entrySet()) {
+                if (entry.getValue().getId() == 
beTabletInfo.getCooldownReplicaId()) {
+                    replicaInvalid = false;
+                    break;
+                }
+            }
+            if (replicaInvalid) {
+                return true;
+            }
+        } finally {
+            readUnlock(stamp);
+        }
+        return false;
+    }
+
+    public List<Replica> getReplicas(Long tabletId) {
+        List<Replica> replicas = new LinkedList<>();
+        long stamp = readLock();
+        try {
+            Map<Long, Replica> replicaMap = replicaMetaTable.row(tabletId);
+            for (Map.Entry<Long, Replica> entry : replicaMap.entrySet()) {
+                replicas.add(entry.getValue());
+            }
+        } finally {
+            readUnlock(stamp);
+        }
+        return replicas;
+    }
+
     /**
      * Be will set `used' to false for bad replicas and `version_miss' to true 
for replicas with hole
      * in their version chain. In either case, those replicas need to be fixed 
by TabletScheduler.
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletMeta.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletMeta.java
index 2679977b6d..bfaaba3eb2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletMeta.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletMeta.java
@@ -37,10 +37,14 @@ public class TabletMeta {
 
     private TStorageMedium storageMedium;
 
+    private long cooldownReplicaId;
+
+    private long cooldownTerm;
+
     private ReentrantReadWriteLock lock;
 
     public TabletMeta(long dbId, long tableId, long partitionId, long indexId, 
int schemaHash,
-            TStorageMedium storageMedium) {
+            TStorageMedium storageMedium, long cooldownReplicaId, long 
cooldownTerm) {
         this.dbId = dbId;
         this.tableId = tableId;
         this.partitionId = partitionId;
@@ -50,6 +54,8 @@ public class TabletMeta {
         this.newSchemaHash = -1;
 
         this.storageMedium = storageMedium;
+        this.cooldownReplicaId = cooldownReplicaId;
+        this.cooldownTerm = cooldownTerm;
 
         lock = new ReentrantReadWriteLock();
     }
@@ -78,6 +84,22 @@ public class TabletMeta {
         this.storageMedium = storageMedium;
     }
 
+    public long getCooldownReplicaId() {
+        return cooldownReplicaId;
+    }
+
+    public void setCooldownReplicaId(long cooldownReplicaId) {
+        this.cooldownReplicaId = cooldownReplicaId;
+    }
+
+    public long getCooldownTerm() {
+        return cooldownTerm;
+    }
+
+    public void setCooldownTerm(long cooldownTerm) {
+        this.cooldownTerm = cooldownTerm;
+    }
+
     public int getOldSchemaHash() {
         lock.readLock().lock();
         try {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
index 71cb5fd188..5e03365b52 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
@@ -802,6 +802,13 @@ public class Config extends ConfigBase {
      */
     @ConfField(mutable = true, masterOnly = true)
     public static int alter_table_timeout_second = 86400 * 30; // 1month
+    /**
+     * Maximal timeout of push cooldown conf request.
+     */
+    @ConfField(mutable = false, masterOnly = true)
+    public static boolean cooldown_single_remote_file = false;
+    @ConfField(mutable = false, masterOnly = true)
+    public static int push_cooldown_conf_timeout_second = 600; // 10 min
     /**
      * If a backend is down for *max_backend_down_time_second*, a BACKEND_DOWN 
event will be triggered.
      * Do not set this if you know what you are doing.
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownConf.java 
b/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownConf.java
new file mode 100644
index 0000000000..9275a2633c
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownConf.java
@@ -0,0 +1,130 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cooldown;
+
+import org.apache.doris.common.io.Text;
+import org.apache.doris.common.io.Writable;
+import org.apache.doris.persist.gson.GsonUtils;
+
+import com.google.gson.annotations.SerializedName;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * This class represents the olap replica related metadata.
+ */
+public class CooldownConf implements Writable {
+    private static final Logger LOG = LogManager.getLogger(CooldownConf.class);
+
+    @SerializedName(value = "dbId")
+    protected long dbId;
+    @SerializedName(value = "tableId")
+    protected long tableId;
+    @SerializedName(value = "partitionId")
+    protected long partitionId;
+    @SerializedName(value = "indexId")
+    protected long indexId;
+    @SerializedName(value = "tabletId")
+    protected long tabletId;
+    @SerializedName(value = "cooldownReplicaId")
+    protected long cooldownReplicaId;
+    @SerializedName(value = "cooldownTerm")
+    protected long cooldownTerm;
+
+    public CooldownConf(long dbId, long tableId, long partitionId, long 
indexId, long tabletId, long cooldownReplicaId,
+                        long cooldownTerm) {
+        this.dbId = dbId;
+        this.tableId = tableId;
+        this.partitionId = partitionId;
+        this.indexId = indexId;
+        this.tabletId = tabletId;
+        this.cooldownReplicaId = cooldownReplicaId;
+        this.cooldownTerm = cooldownTerm;
+    }
+
+    public long getDbId() {
+        return dbId;
+    }
+
+    public void setDbId(long dbId) {
+        this.dbId = dbId;
+    }
+
+    public long getTableId() {
+        return tableId;
+    }
+
+    public void setTableId(long tableId) {
+        this.tableId = tableId;
+    }
+
+    public long getPartitionId() {
+        return partitionId;
+    }
+
+    public void setPartitionId(long partitionId) {
+        this.partitionId = partitionId;
+    }
+
+    public long getIndexId() {
+        return indexId;
+    }
+
+    public void setIndexId(long indexId) {
+        this.indexId = indexId;
+    }
+
+    public long getTabletId() {
+        return tabletId;
+    }
+
+    public void setTabletId(long tabletId) {
+        this.tabletId = tabletId;
+    }
+
+    public long getCooldownReplicaId() {
+        return cooldownReplicaId;
+    }
+
+    public void setCooldownReplicaId(long cooldownReplicaId) {
+        this.cooldownReplicaId = cooldownReplicaId;
+    }
+
+    public long getCooldownTerm() {
+        return cooldownTerm;
+    }
+
+    public void setCooldownTerm(long cooldownTerm) {
+        this.cooldownTerm = cooldownTerm;
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        String json = GsonUtils.GSON.toJson(this);
+        Text.writeString(out, json);
+    }
+
+    public static CooldownJob read(DataInput in) throws IOException {
+        String json = Text.readString(in);
+        return GsonUtils.GSON.fromJson(json, CooldownJob.class);
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownException.java 
b/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownException.java
new file mode 100644
index 0000000000..c0ec6bd5e7
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownException.java
@@ -0,0 +1,32 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cooldown;
+
+import org.apache.doris.common.DdlException;
+
+/*
+ * This exception will be thrown when the cooldown job(v2) running failed.
+ */
+public class CooldownException extends DdlException {
+
+    private static final long serialVersionUID = 4844951783432954268L;
+
+    public CooldownException(String msg) {
+        super(msg);
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownHandler.java 
b/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownHandler.java
new file mode 100644
index 0000000000..e8c5ba6c9a
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownHandler.java
@@ -0,0 +1,185 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cooldown;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.Replica;
+import org.apache.doris.catalog.TabletMeta;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.ThreadPoolManager;
+import org.apache.doris.common.util.MasterDaemon;
+
+import com.google.common.collect.Maps;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ThreadPoolExecutor;
+
+public class CooldownHandler extends MasterDaemon {
+    private static final Logger LOG = 
LogManager.getLogger(CooldownHandler.class);
+    private static final int MAX_ACTIVE_COOLDOWN_JOB_SIZE = 10;
+
+    private static final int MAX_RUNABLE_COOLDOWN_JOB_SIZE = 100;
+
+    private static final int MAX_TABLET_PER_JOB = 100;
+
+    private static final long timeoutMs = 1000L * 
Config.push_cooldown_conf_timeout_second;
+
+    // jobId -> CooldownJob, it is used to hold CooldownJob which is used to 
sent conf to be.
+    private final Map<Long, CooldownJob> runableCooldownJobs = 
Maps.newConcurrentMap();
+    private final Map<Long, Boolean> resetingTablet = Maps.newConcurrentMap();
+    // jobId -> CooldownJob,
+    public final Map<Long, CooldownJob> activeCooldownJobs = 
Maps.newConcurrentMap();
+
+    public final ThreadPoolExecutor cooldownThreadPool = 
ThreadPoolManager.newDaemonCacheThreadPool(
+            MAX_ACTIVE_COOLDOWN_JOB_SIZE, "cooldown-pool", true);
+
+    // syncCooldownTabletMap: tabletId -> TabletMeta
+    public void handleCooldownConf(Map<Long, TabletMeta> 
syncCooldownTabletMap) {
+        List<CooldownConf> cooldownConfList = new LinkedList<>();
+        for (Map.Entry<Long, TabletMeta> entry : 
syncCooldownTabletMap.entrySet()) {
+            if (runableCooldownJobs.size() >= MAX_RUNABLE_COOLDOWN_JOB_SIZE) {
+                return;
+            }
+            Long tabletId = entry.getKey();
+            TabletMeta tabletMeta = entry.getValue();
+            if (resetingTablet.containsKey(tabletId)) {
+                continue;
+            }
+            long cooldownReplicaId = -1;
+            List<Replica> replicas = 
Env.getCurrentInvertedIndex().getReplicas(tabletId);
+            if (replicas.size() == 0) {
+                continue;
+            }
+            for (Replica replica : replicas) {
+                if (tabletMeta.getCooldownReplicaId() == replica.getId()) {
+                    cooldownReplicaId = tabletMeta.getCooldownReplicaId();
+                    break;
+                }
+            }
+            long cooldownTerm = tabletMeta.getCooldownTerm();
+            if (cooldownReplicaId == -1) {
+                Random rand = new Random(System.currentTimeMillis());
+                int index = rand.nextInt(replicas.size());
+                cooldownReplicaId = replicas.get(index).getId();
+                ++cooldownTerm;
+            }
+            CooldownConf cooldownConf = new CooldownConf(tabletMeta.getDbId(), 
tabletMeta.getTableId(),
+                    tabletMeta.getPartitionId(), tabletMeta.getIndexId(), 
tabletId, cooldownReplicaId, cooldownTerm);
+            cooldownConfList.add(cooldownConf);
+            if (cooldownConfList.size() >= MAX_TABLET_PER_JOB) {
+                long jobId = Env.getCurrentEnv().getNextId();
+                CooldownJob cooldownJob = new CooldownJob(jobId, 
cooldownConfList, timeoutMs);
+                runableCooldownJobs.put(jobId, cooldownJob);
+                for (CooldownConf conf : cooldownConfList) {
+                    resetingTablet.put(conf.getTabletId(), true);
+                }
+                cooldownConfList = new LinkedList<>();
+            }
+        }
+        if (cooldownConfList.size() > 0) {
+            long jobId = Env.getCurrentEnv().getNextId();
+            CooldownJob cooldownJob = new CooldownJob(jobId, cooldownConfList, 
timeoutMs);
+            runableCooldownJobs.put(jobId, cooldownJob);
+            for (CooldownConf conf : cooldownConfList) {
+                resetingTablet.put(conf.getTabletId(), true);
+            }
+        }
+    }
+
+    @Override
+    protected void runAfterCatalogReady() {
+        clearFinishedOrCancelledCooldownJob();
+        runableCooldownJobs.values().forEach(cooldownJob -> {
+            if (!cooldownJob.isDone() && 
!activeCooldownJobs.containsKey(cooldownJob.getJobId())
+                    && activeCooldownJobs.size() < 
MAX_ACTIVE_COOLDOWN_JOB_SIZE) {
+                if (FeConstants.runningUnitTest) {
+                    cooldownJob.run();
+                } else {
+                    cooldownThreadPool.submit(() -> {
+                        if 
(activeCooldownJobs.putIfAbsent(cooldownJob.getJobId(), cooldownJob) == null) {
+                            try {
+                                cooldownJob.run();
+                            } finally {
+                                
activeCooldownJobs.remove(cooldownJob.getJobId());
+                            }
+                        }
+                    });
+                }
+            }
+        });
+    }
+
+    public void write(DataOutput out) throws IOException {
+        out.writeInt(runableCooldownJobs.size());
+        for (CooldownJob cooldownJob : runableCooldownJobs.values()) {
+            cooldownJob.write(out);
+        }
+    }
+
+    public void readField(DataInput in) throws IOException {
+        int size = in.readInt();
+        for (int i = 0; i < size; i++) {
+            CooldownJob cooldownJob = CooldownJob.read(in);
+            replayCooldownJob(cooldownJob);
+        }
+    }
+
+    public void replayCooldownJob(CooldownJob cooldownJob) {
+        CooldownJob replayCooldownJob = null;
+        if (!runableCooldownJobs.containsKey(cooldownJob.getJobId())) {
+            replayCooldownJob = new CooldownJob(cooldownJob.jobId, 
cooldownJob.getCooldownConfList(),
+                    cooldownJob.timeoutMs);
+            runableCooldownJobs.put(cooldownJob.getJobId(), replayCooldownJob);
+            for (CooldownConf cooldownConf : 
cooldownJob.getCooldownConfList()) {
+                resetingTablet.put(cooldownConf.getTabletId(), true);
+            }
+        } else {
+            replayCooldownJob = 
runableCooldownJobs.get(cooldownJob.getJobId());
+        }
+        replayCooldownJob.replay(cooldownJob);
+        if (replayCooldownJob.isDone()) {
+            runableCooldownJobs.remove(cooldownJob.getJobId());
+            for (CooldownConf cooldownConf : 
cooldownJob.getCooldownConfList()) {
+                resetingTablet.remove(cooldownConf.getTabletId());
+            }
+        }
+    }
+
+    private void clearFinishedOrCancelledCooldownJob() {
+        Iterator<Map.Entry<Long, CooldownJob>> iterator = 
runableCooldownJobs.entrySet().iterator();
+        while (iterator.hasNext()) {
+            CooldownJob cooldownJob = iterator.next().getValue();
+            if (cooldownJob.isDone()) {
+                iterator.remove();
+                for (CooldownConf cooldownConf : 
cooldownJob.getCooldownConfList()) {
+                    resetingTablet.remove(cooldownConf.getTabletId());
+                }
+            }
+        }
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownJob.java
new file mode 100644
index 0000000000..4c7eb168b6
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownJob.java
@@ -0,0 +1,397 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cooldown;
+
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.MaterializedIndex;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.Replica;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.catalog.Tablet;
+import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.common.io.Text;
+import org.apache.doris.common.io.Writable;
+import org.apache.doris.persist.gson.GsonUtils;
+import org.apache.doris.task.AgentBatchTask;
+import org.apache.doris.task.AgentTask;
+import org.apache.doris.task.AgentTaskExecutor;
+import org.apache.doris.task.AgentTaskQueue;
+import org.apache.doris.task.PushCooldownConfTask;
+import org.apache.doris.thrift.TTaskType;
+
+import com.google.common.base.Preconditions;
+import com.google.gson.annotations.SerializedName;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+public class CooldownJob implements Writable {
+    private static final Logger LOG = LogManager.getLogger(CooldownJob.class);
+
+    public enum JobState {
+        PENDING, // Job is created
+        SEND_CONF, // send cooldown task to BE.
+        RUNNING, // cooldown tasks are sent to BE, and waiting for them 
finished.
+        FINISHED, // job is done
+        CANCELLED; // job is cancelled(failed or be cancelled by user)
+
+        public boolean isFinalState() {
+            return this == CooldownJob.JobState.FINISHED || this == 
CooldownJob.JobState.CANCELLED;
+        }
+    }
+
+    @SerializedName(value = "jobId")
+    protected long jobId;
+    @SerializedName(value = "jobState")
+    protected CooldownJob.JobState jobState;
+    @SerializedName(value = "cooldownConfList")
+    protected List<CooldownConf> cooldownConfList;
+
+    @SerializedName(value = "errMsg")
+    protected String errMsg = "";
+    @SerializedName(value = "createTimeMs")
+    protected long createTimeMs = -1;
+    @SerializedName(value = "finishedTimeMs")
+    protected long finishedTimeMs = -1;
+    @SerializedName(value = "timeoutMs")
+    protected long timeoutMs = -1;
+
+    public long getJobId() {
+        return jobId;
+    }
+
+    public JobState getJobState() {
+        return jobState;
+    }
+
+    public List<CooldownConf> getCooldownConfList() {
+        return cooldownConfList;
+    }
+
+    public long getTimeoutMs() {
+        return timeoutMs;
+    }
+
+    private AgentBatchTask cooldownBatchTask = new AgentBatchTask();
+
+    public CooldownJob(long jobId, List<CooldownConf> cooldownConfList, long 
timeoutMs) {
+        this.jobId = jobId;
+        this.jobState = JobState.PENDING;
+        this.cooldownConfList = cooldownConfList;
+        this.createTimeMs = System.currentTimeMillis();
+        this.timeoutMs = timeoutMs;
+    }
+
+    protected void runPendingJob() throws CooldownException {
+        Preconditions.checkState(jobState == CooldownJob.JobState.PENDING, 
jobState);
+        this.jobState = JobState.SEND_CONF;
+        // write edit log
+        for (CooldownConf cooldownConf : cooldownConfList) {
+            setCooldownReplica(cooldownConf.getDbId(), 
cooldownConf.getTableId(), cooldownConf.getPartitionId(),
+                    cooldownConf.getIndexId(), cooldownConf.getTabletId(), 
cooldownConf.getCooldownReplicaId(),
+                    cooldownConf.getCooldownTerm());
+            
Env.getCurrentInvertedIndex().getTabletMeta(cooldownConf.getTabletId()).setCooldownReplicaId(
+                    cooldownConf.getCooldownReplicaId());
+            
Env.getCurrentInvertedIndex().getTabletMeta(cooldownConf.getTabletId()).setCooldownTerm(
+                    cooldownConf.getCooldownTerm());
+        }
+        Env.getCurrentEnv().getEditLog().logCooldownJob(this);
+        LOG.info("send cooldown job {} state to {}", jobId, this.jobState);
+    }
+
+    protected void runSendJob() throws CooldownException {
+        Preconditions.checkState(jobState == JobState.SEND_CONF, jobState);
+        LOG.info("begin to send cooldown conf tasks. job: {}", jobId);
+        if (!FeConstants.runningUnitTest) {
+            Map<Long, List<CooldownConf>> cooldownMap = new HashMap<>();
+            for (CooldownConf cooldownConf : cooldownConfList) {
+                Database db = Env.getCurrentInternalCatalog()
+                        .getDbOrException(cooldownConf.getDbId(), s -> new 
CooldownException(
+                                "Database " + s + " does not exist"));
+                OlapTable tbl;
+                try {
+                    tbl = (OlapTable) 
db.getTableOrMetaException(cooldownConf.getTableId(), TableIf.TableType.OLAP);
+                } catch (MetaNotFoundException e) {
+                    throw new CooldownException(e.getMessage());
+                }
+                if (tbl == null) {
+                    throw new CooldownException(String.format("No table: %d", 
cooldownConf.getTableId()));
+                }
+                tbl.readLock();
+                try {
+                    Partition partition = 
tbl.getPartition(cooldownConf.getPartitionId());
+                    if (partition == null) {
+                        throw new CooldownException(String.format("No 
partition: %d", cooldownConf.getPartitionId()));
+                    }
+                    MaterializedIndex index = 
partition.getIndex(cooldownConf.getIndexId());
+                    if (index == null) {
+                        throw new CooldownException(String.format("No index: 
%d", cooldownConf.getIndexId()));
+                    }
+                    Tablet tablet = 
index.getTablet(cooldownConf.getTabletId());
+                    if (tablet == null) {
+                        throw new CooldownException(String.format("No tablet: 
%d", cooldownConf.getTabletId()));
+                    }
+                    for (Replica replica : tablet.getReplicas()) {
+                        if (!cooldownMap.containsKey(replica.getBackendId())) {
+                            cooldownMap.put(replica.getBackendId(), new 
LinkedList<>());
+                        }
+                        
cooldownMap.get(replica.getBackendId()).add(cooldownConf);
+                    }
+                } finally {
+                    tbl.readUnlock();
+                }
+            }
+            for (Map.Entry<Long, List<CooldownConf>> entry : 
cooldownMap.entrySet()) {
+                PushCooldownConfTask pushCooldownConfTask = new 
PushCooldownConfTask(entry.getKey(), entry.getValue());
+                cooldownBatchTask.addTask(pushCooldownConfTask);
+            }
+            AgentTaskQueue.addBatchTask(cooldownBatchTask);
+            AgentTaskExecutor.submit(cooldownBatchTask);
+        }
+
+        this.jobState = JobState.RUNNING;
+        // write edit log
+        Env.getCurrentEnv().getEditLog().logCooldownJob(this);
+        LOG.info("send cooldown job {} state to {}", jobId, this.jobState);
+    }
+
+    protected void runRunningJob() throws CooldownException {
+        if (!cooldownBatchTask.isFinished()) {
+            LOG.info("cooldown tasks not finished. job: {}", jobId);
+            List<AgentTask> tasks = cooldownBatchTask.getUnfinishedTasks(2000);
+            for (AgentTask task : tasks) {
+                if (task.getFailedTimes() >= 3) {
+                    task.setFinished(true);
+                    AgentTaskQueue.removeTask(task.getBackendId(), 
TTaskType.PUSH_COOLDOWN_CONF, task.getSignature());
+                    LOG.warn("push cooldown conf task failed after try three 
times: " + task.getErrorMsg());
+                    throw new CooldownException("cooldown tasks failed on 
backend: " + task.getBackendId());
+                }
+            }
+            return;
+        }
+        this.jobState = CooldownJob.JobState.FINISHED;
+        this.finishedTimeMs = System.currentTimeMillis();
+
+        Env.getCurrentEnv().getEditLog().logCooldownJob(this);
+        LOG.info("push cooldown conf job finished: {}", jobId);
+    }
+
+    public boolean isTimeout() {
+        return System.currentTimeMillis() - createTimeMs > timeoutMs;
+    }
+
+    public boolean isDone() {
+        return jobState.isFinalState();
+    }
+
+    /*
+     * cancelImpl() can be called any time any place.
+     * We need to clean any possible residual of this job.
+     */
+    protected synchronized void cancelImpl(String errMsg) {
+        if (jobState.isFinalState()) {
+            return;
+        }
+
+        cancelInternal();
+
+        this.errMsg = errMsg;
+        this.finishedTimeMs = System.currentTimeMillis();
+        LOG.info("cancel cooldown job {}, err: {}", jobId, errMsg);
+        Env.getCurrentEnv().getEditLog().logCooldownJob(this);
+    }
+
+    /**
+     * The keyword 'synchronized' only protects 2 methods:
+     * run() and cancel()
+     * Only these 2 methods can be visited by different thread(internal 
working thread and user connection thread)
+     * So using 'synchronized' to make sure only one thread can run the job at 
one time.
+     *
+     * lock order:
+     *      synchronized
+     *      db lock
+     */
+    public synchronized void run() {
+        if (isTimeout()) {
+            cancelImpl("Timeout");
+            return;
+        }
+
+        try {
+            switch (jobState) {
+                case PENDING:
+                    runPendingJob();
+                    break;
+                case SEND_CONF:
+                    runSendJob();
+                    break;
+                case RUNNING:
+                    runRunningJob();
+                    break;
+                default:
+                    break;
+            }
+        } catch (CooldownException e) {
+            cancelImpl(e.getMessage());
+        }
+    }
+
+    public void replay(CooldownJob replayedJob) {
+        try {
+            switch (replayedJob.jobState) {
+                case PENDING:
+                    replayCreateJob(replayedJob);
+                    break;
+                case SEND_CONF:
+                    replayPengingJob();
+                    break;
+                case FINISHED:
+                    replayRunningJob(replayedJob);
+                    break;
+                case CANCELLED:
+                    replayCancelled(replayedJob);
+                    break;
+                default:
+                    break;
+            }
+        } catch (CooldownException e) {
+            LOG.warn("[INCONSISTENT META] replay cooldown job failed {}", 
replayedJob.jobId, e);
+        }
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        String json = GsonUtils.GSON.toJson(this);
+        Text.writeString(out, json);
+    }
+
+    public static CooldownJob read(DataInput in) throws IOException {
+        String json = Text.readString(in);
+        return GsonUtils.GSON.fromJson(json, CooldownJob.class);
+    }
+
+    /**
+     * Replay job in PENDING state.
+     * Should replay all changes before this job's state transfer to PENDING.
+     * These changes should be same as changes in CooldownHandler.createJob()
+     */
+    private void replayCreateJob(CooldownJob replayedJob) {
+        jobId = replayedJob.jobId;
+        for (CooldownConf replayedConf : replayedJob.cooldownConfList) {
+            CooldownConf cooldownConf = new 
CooldownConf(replayedConf.getDbId(), replayedConf.getTableId(),
+                    replayedConf.getPartitionId(), replayedConf.getIndexId(), 
replayedConf.getTabletId(),
+                    replayedConf.getCooldownReplicaId(), 
replayedConf.getCooldownTerm());
+            cooldownConfList.add(cooldownConf);
+        }
+        createTimeMs = replayedJob.createTimeMs;
+        timeoutMs = replayedJob.timeoutMs;
+        jobState = JobState.PENDING;
+        LOG.info("replay create cooldown job: {}, conf size: {}", jobId, 
cooldownConfList.size());
+    }
+
+    /**
+     * Replay job in PENDING state. set cooldown type in Replica
+     */
+    private void replayPengingJob() throws CooldownException {
+        for (CooldownConf cooldownConf : cooldownConfList) {
+            setCooldownReplica(cooldownConf.getDbId(), 
cooldownConf.getTableId(), cooldownConf.getPartitionId(),
+                    cooldownConf.getIndexId(), cooldownConf.getTabletId(), 
cooldownConf.getCooldownReplicaId(),
+                    cooldownConf.getCooldownTerm());
+            if 
(Env.getCurrentInvertedIndex().getTabletMeta(cooldownConf.getTabletId()) != 
null) {
+                
Env.getCurrentInvertedIndex().getTabletMeta(cooldownConf.getTabletId()).setCooldownReplicaId(
+                        cooldownConf.getCooldownReplicaId());
+                
Env.getCurrentInvertedIndex().getTabletMeta(cooldownConf.getTabletId()).setCooldownTerm(
+                        cooldownConf.getCooldownTerm());
+            }
+        }
+        jobState = JobState.SEND_CONF;
+        LOG.info("replay send cooldown conf, job: {}", jobId);
+    }
+
+    /**
+     * Replay job in FINISHED state.
+     * Should replay all changes in runRunningJob()
+     */
+    private void replayRunningJob(CooldownJob replayedJob) throws 
CooldownException {
+        jobState = CooldownJob.JobState.FINISHED;
+        this.finishedTimeMs = replayedJob.finishedTimeMs;
+        LOG.info("replay finished cooldown job: {}", jobId);
+    }
+
+    private void setCooldownReplica(long dbId, long tableId, long partitionId, 
long indexId, long tabletId,
+                                    long cooldownReplicaId, long cooldownTerm) 
throws CooldownException {
+        Database db = Env.getCurrentInternalCatalog()
+                .getDbOrException(dbId, s -> new CooldownException("Database " 
+ s + " does not exist"));
+        OlapTable tbl;
+        try {
+            tbl = (OlapTable) db.getTableOrMetaException(tableId, 
TableIf.TableType.OLAP);
+        } catch (MetaNotFoundException e) {
+            throw new CooldownException(e.getMessage());
+        }
+        if (tbl != null) {
+            tbl.writeLock();
+            try {
+                Partition partition = tbl.getPartition(partitionId);
+                if (partition != null) {
+                    MaterializedIndex index = partition.getIndex(indexId);
+                    if (index != null) {
+                        Tablet tablet = index.getTablet(tabletId);
+                        if (tablet != null) {
+                            tablet.setCooldownReplicaId(cooldownReplicaId);
+                            tablet.setCooldownTerm(cooldownTerm);
+                            LOG.info("setCooldownReplicaId to {} when cancel 
job: {}:{}", cooldownReplicaId,
+                                    tablet.getId(), jobId);
+                            return;
+                        }
+                    }
+                }
+                throw new CooldownException("set cooldown type failed.");
+            } finally {
+                tbl.writeUnlock();
+            }
+        }
+    }
+
+    private void cancelInternal() {
+        // clear tasks if has
+        AgentTaskQueue.removeBatchTask(cooldownBatchTask, 
TTaskType.PUSH_COOLDOWN_CONF);
+        jobState = CooldownJob.JobState.CANCELLED;
+    }
+
+    /**
+     * Replay job in CANCELLED state.
+     */
+    private void replayCancelled(CooldownJob replayedJob) {
+        cancelInternal();
+        this.jobState = CooldownJob.JobState.CANCELLED;
+        this.finishedTimeMs = replayedJob.finishedTimeMs;
+        this.errMsg = replayedJob.errMsg;
+        LOG.info("replay cancelled cooldown job: {}", jobId);
+    }
+
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
index 261a1704af..04c03cbdef 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
@@ -361,8 +361,9 @@ public class InternalCatalog implements CatalogIf<Database> 
{
                     for (MaterializedIndex index : 
partition.getMaterializedIndices(IndexExtState.ALL)) {
                         long indexId = index.getId();
                         int schemaHash = 
olapTable.getSchemaHashByIndexId(indexId);
-                        TabletMeta tabletMeta = new TabletMeta(dbId, tableId, 
partitionId, indexId, schemaHash, medium);
                         for (Tablet tablet : index.getTablets()) {
+                            TabletMeta tabletMeta = new TabletMeta(dbId, 
tableId, partitionId, indexId, schemaHash,
+                                    medium, tablet.getCooldownReplicaId(), 
tablet.getCooldownTerm());
                             long tabletId = tablet.getId();
                             invertedIndex.addTablet(tabletId, tabletMeta);
                             for (Replica replica : tablet.getReplicas()) {
@@ -1285,8 +1286,9 @@ public class InternalCatalog implements 
CatalogIf<Database> {
                     for (MaterializedIndex mIndex : 
partition.getMaterializedIndices(IndexExtState.ALL)) {
                         long indexId = mIndex.getId();
                         int schemaHash = 
olapTable.getSchemaHashByIndexId(indexId);
-                        TabletMeta tabletMeta = new TabletMeta(dbId, tableId, 
partitionId, indexId, schemaHash, medium);
                         for (Tablet tablet : mIndex.getTablets()) {
+                            TabletMeta tabletMeta = new TabletMeta(dbId, 
tableId, partitionId, indexId, schemaHash,
+                                    medium, tablet.getCooldownReplicaId(), 
tablet.getCooldownTerm());
                             long tabletId = tablet.getId();
                             invertedIndex.addTablet(tabletId, tabletMeta);
                             for (Replica replica : tablet.getReplicas()) {
@@ -1547,9 +1549,10 @@ public class InternalCatalog implements 
CatalogIf<Database> {
                 for (MaterializedIndex index : 
partition.getMaterializedIndices(IndexExtState.ALL)) {
                     long indexId = index.getId();
                     int schemaHash = olapTable.getSchemaHashByIndexId(indexId);
-                    TabletMeta tabletMeta = new TabletMeta(info.getDbId(), 
info.getTableId(), partition.getId(),
-                            index.getId(), schemaHash, 
info.getDataProperty().getStorageMedium());
                     for (Tablet tablet : index.getTablets()) {
+                        TabletMeta tabletMeta = new TabletMeta(info.getDbId(), 
info.getTableId(), partition.getId(),
+                                index.getId(), schemaHash, 
info.getDataProperty().getStorageMedium(),
+                                tablet.getCooldownReplicaId(), 
tablet.getCooldownTerm());
                         long tabletId = tablet.getId();
                         invertedIndex.addTablet(tabletId, tabletMeta);
                         for (Replica replica : tablet.getReplicas()) {
@@ -1699,7 +1702,8 @@ public class InternalCatalog implements 
CatalogIf<Database> {
 
             // create tablets
             int schemaHash = indexMeta.getSchemaHash();
-            TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId, 
indexId, schemaHash, storageMedium);
+            TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId, 
indexId, schemaHash, storageMedium, -1,
+                    -1);
             createTablets(clusterName, index, ReplicaState.NORMAL, 
distributionInfo, version, replicaAlloc, tabletMeta,
                     tabletIdSet, idGeneratorBuffer);
 
@@ -2673,9 +2677,9 @@ public class InternalCatalog implements 
CatalogIf<Database> {
                     for (MaterializedIndex mIndex : 
partition.getMaterializedIndices(IndexExtState.ALL)) {
                         long indexId = mIndex.getId();
                         int schemaHash = 
olapTable.getSchemaHashByIndexId(indexId);
-                        TabletMeta tabletMeta = new TabletMeta(db.getId(), 
olapTable.getId(), partitionId, indexId,
-                                schemaHash, medium);
                         for (Tablet tablet : mIndex.getTablets()) {
+                            TabletMeta tabletMeta = new TabletMeta(db.getId(), 
olapTable.getId(), partitionId, indexId,
+                                    schemaHash, medium, 
tablet.getCooldownReplicaId(), tablet.getCooldownTerm());
                             long tabletId = tablet.getId();
                             invertedIndex.addTablet(tabletId, tabletMeta);
                             for (Replica replica : tablet.getReplicas()) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java 
b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
index fb8fbb5b99..232509f60b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
@@ -36,6 +36,7 @@ import org.apache.doris.cluster.Cluster;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
 import org.apache.doris.common.util.SmallFileMgr.SmallFile;
+import org.apache.doris.cooldown.CooldownJob;
 import org.apache.doris.datasource.CatalogLog;
 import org.apache.doris.datasource.ExternalObjectLog;
 import org.apache.doris.datasource.InitCatalogLog;
@@ -579,6 +580,11 @@ public class JournalEntity implements Writable {
                 isRead = true;
                 break;
             }
+            case OperationType.OP_PUSH_COOLDOWN_CONF: {
+                data = CooldownJob.read(in);
+                isRead = true;
+                break;
+            }
             case OperationType.OP_BATCH_ADD_ROLLUP: {
                 data = BatchAlterJobPersistInfo.read(in);
                 isRead = true;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
index 6868de1a9b..021f049aa0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
@@ -44,6 +44,7 @@ import org.apache.doris.task.CreateReplicaTask;
 import org.apache.doris.task.DirMoveTask;
 import org.apache.doris.task.DownloadTask;
 import org.apache.doris.task.PublishVersionTask;
+import org.apache.doris.task.PushCooldownConfTask;
 import org.apache.doris.task.PushTask;
 import org.apache.doris.task.SnapshotTask;
 import org.apache.doris.task.StorageMediaMigrationTask;
@@ -196,6 +197,9 @@ public class MasterImpl {
                 case UPDATE_TABLET_META_INFO:
                     finishUpdateTabletMeta(task, request);
                     break;
+                case PUSH_COOLDOWN_CONF:
+                    finishPushCooldownConfTask(task);
+                    break;
                 default:
                     break;
             }
@@ -598,4 +602,10 @@ public class MasterImpl {
         }
         AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.ALTER, 
task.getSignature());
     }
+
+    private void finishPushCooldownConfTask(AgentTask task) {
+        PushCooldownConfTask cooldownTask = (PushCooldownConfTask) task;
+        cooldownTask.setFinished(true);
+        AgentTaskQueue.removeTask(task.getBackendId(), 
TTaskType.PUSH_COOLDOWN_CONF, task.getSignature());
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java 
b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
index c8e03e4d6b..50add0b24e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
@@ -259,6 +259,8 @@ public class ReportHandler extends Daemon {
         Set<Long> tabletFoundInMeta = Sets.newConcurrentHashSet();
         // storage medium -> tablet id
         ListMultimap<TStorageMedium, Long> tabletMigrationMap = 
LinkedListMultimap.create();
+        // the cooldown type of replicas which need to be sync. tabletId -> 
TabletMeta
+        Map<Long, TabletMeta> syncCooldownTabletMap = new HashMap<>();
 
         // dbid -> txn id -> [partition info]
         Map<Long, ListMultimap<Long, TPartitionVersionInfo>> 
transactionsToPublish = Maps.newHashMap();
@@ -278,7 +280,8 @@ public class ReportHandler extends Daemon {
                 transactionsToPublish,
                 transactionsToClear,
                 tabletRecoveryMap,
-                tabletToInMemory);
+                tabletToInMemory,
+                syncCooldownTabletMap);
 
         // 2. sync
         if (!tabletSyncMap.isEmpty()) {
@@ -321,6 +324,11 @@ public class ReportHandler extends Daemon {
             handleSetTabletInMemory(backendId, tabletToInMemory);
         }
 
+        // 10. send cooldownType which need sync to CooldownHandler
+        if (!syncCooldownTabletMap.isEmpty()) {
+            
Env.getCurrentEnv().getCooldownHandler().handleCooldownConf(syncCooldownTabletMap);
+        }
+
         final SystemInfoService currentSystemInfo = Env.getCurrentSystemInfo();
         Backend reportBackend = currentSystemInfo.getBackend(backendId);
         if (reportBackend != null) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java 
b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
index 6ac6ab0700..fe757ff3aa 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
@@ -41,6 +41,7 @@ import org.apache.doris.common.MetaNotFoundException;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
 import org.apache.doris.common.util.SmallFileMgr.SmallFile;
+import org.apache.doris.cooldown.CooldownJob;
 import org.apache.doris.datasource.CatalogLog;
 import org.apache.doris.datasource.ExternalObjectLog;
 import org.apache.doris.datasource.InitCatalogLog;
@@ -744,6 +745,10 @@ public class EditLog {
                     }
                     break;
                 }
+                case OperationType.OP_PUSH_COOLDOWN_CONF:
+                    CooldownJob cooldownJob = (CooldownJob) journal.getData();
+                    env.getCooldownHandler().replayCooldownJob(cooldownJob);
+                    break;
                 case OperationType.OP_BATCH_ADD_ROLLUP: {
                     BatchAlterJobPersistInfo batchAlterJobV2 = 
(BatchAlterJobPersistInfo) journal.getData();
                     for (AlterJobV2 alterJobV2 : 
batchAlterJobV2.getAlterJobV2List()) {
@@ -1505,6 +1510,10 @@ public class EditLog {
         logEdit(OperationType.OP_ALTER_JOB_V2, alterJob);
     }
 
+    public void logCooldownJob(CooldownJob cooldownJob) {
+        logEdit(OperationType.OP_PUSH_COOLDOWN_CONF, cooldownJob);
+    }
+
     public void logBatchAlterJob(BatchAlterJobPersistInfo batchAlterJobV2) {
         logEdit(OperationType.OP_BATCH_ADD_ROLLUP, batchAlterJobV2);
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java 
b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
index c5889a8001..13009ba85d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
@@ -76,6 +76,8 @@ public class OperationType {
 
     //schema change for add and drop columns
     public static final short OP_MODIFY_TABLE_ADD_OR_DROP_COLUMNS = 128;
+    // set cooldown conf in replica
+    public static final short OP_PUSH_COOLDOWN_CONF = 129;
 
     // 30~39 130~139 230~239 ...
     // load job for only hadoop load
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaPersistMethod.java 
b/fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaPersistMethod.java
index 5ab8c19d0e..896aaae698 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaPersistMethod.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaPersistMethod.java
@@ -209,6 +209,12 @@ public class MetaPersistMethod {
                 metaPersistMethod.writeMethod = 
Env.class.getDeclaredMethod("saveMTMVJobManager",
                         CountingDataOutputStream.class, long.class);
                 break;
+            case "cooldownJob":
+                metaPersistMethod.readMethod = 
Env.class.getDeclaredMethod("loadCooldownJob", DataInputStream.class,
+                        long.class);
+                metaPersistMethod.writeMethod = 
Env.class.getDeclaredMethod("saveCooldownJob",
+                        CountingDataOutputStream.class, long.class);
+                break;
             default:
                 break;
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/persist/meta/PersistMetaModules.java
 
b/fe/fe-core/src/main/java/org/apache/doris/persist/meta/PersistMetaModules.java
index 5defa01e9d..e4db00bc8e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/persist/meta/PersistMetaModules.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/persist/meta/PersistMetaModules.java
@@ -38,7 +38,7 @@ public class PersistMetaModules {
             "masterInfo", "frontends", "backends", "datasource", "db", 
"loadJob", "alterJob", "recycleBin",
             "globalVariable", "cluster", "broker", "resources", "exportJob", 
"syncJob", "backupHandler",
             "paloAuth", "transactionState", "colocateTableIndex", 
"routineLoadJobs", "loadJobV2", "smallFiles",
-            "plugins", "deleteHandler", "sqlBlockRule", "policy", 
"mtmvJobManager");
+            "plugins", "deleteHandler", "sqlBlockRule", "policy", 
"mtmvJobManager", "cooldownJob");
 
     static {
         MODULES_MAP = Maps.newHashMap();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
index 2319bf8886..0208058166 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
@@ -37,6 +37,7 @@ import org.apache.doris.thrift.TGetStoragePolicy;
 import org.apache.doris.thrift.TMoveDirReq;
 import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TPublishVersionRequest;
+import org.apache.doris.thrift.TPushCooldownConfReq;
 import org.apache.doris.thrift.TPushReq;
 import org.apache.doris.thrift.TPushType;
 import org.apache.doris.thrift.TReleaseSnapshotRequest;
@@ -359,6 +360,15 @@ public class AgentBatchTask implements Runnable {
                 tAgentTaskRequest.setUpdatePolicy(request);
                 return tAgentTaskRequest;
             }
+            case PUSH_COOLDOWN_CONF: {
+                PushCooldownConfTask pushCooldownConfTask = 
(PushCooldownConfTask) task;
+                TPushCooldownConfReq request = pushCooldownConfTask.toThrift();
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug(request.toString());
+                }
+                tAgentTaskRequest.setPushCooldownConf(request);
+                return tAgentTaskRequest;
+            }
             default:
                 LOG.debug("could not find task type for task [{}]", task);
                 return null;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/task/PushCooldownConfTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/task/PushCooldownConfTask.java
new file mode 100644
index 0000000000..06ea11d659
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/PushCooldownConfTask.java
@@ -0,0 +1,52 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.task;
+
+import org.apache.doris.cooldown.CooldownConf;
+import org.apache.doris.thrift.TCooldownConf;
+import org.apache.doris.thrift.TPushCooldownConfReq;
+import org.apache.doris.thrift.TTaskType;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.List;
+
+public class PushCooldownConfTask extends AgentTask {
+    private static final Logger LOG = 
LogManager.getLogger(PushCooldownConfTask.class);
+
+    private List<CooldownConf> cooldownConfList;
+
+    public PushCooldownConfTask(long backendId, List<CooldownConf> 
cooldownConfList) {
+        super(null, backendId, TTaskType.PUSH_COOLDOWN_CONF, -1, -1, -1, -1, 
-1);
+
+        this.cooldownConfList = cooldownConfList;
+    }
+
+    public TPushCooldownConfReq toThrift() {
+        TPushCooldownConfReq pushCooldownConfReq = new TPushCooldownConfReq();
+        for (CooldownConf cooldownConf : cooldownConfList) {
+            TCooldownConf tCooldownConf = new TCooldownConf();
+            tCooldownConf.setTabletId(cooldownConf.getTabletId());
+            
tCooldownConf.setCooldownReplicaId(cooldownConf.getCooldownReplicaId());
+            tCooldownConf.setCooldownTerm(cooldownConf.getCooldownTerm());
+            pushCooldownConfReq.addToCooldownConfs(tCooldownConf);
+        }
+        return pushCooldownConfReq;
+    }
+}
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/backup/CatalogMocker.java 
b/fe/fe-core/src/test/java/org/apache/doris/backup/CatalogMocker.java
index 1c8a0130cc..42e788b537 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/backup/CatalogMocker.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/backup/CatalogMocker.java
@@ -249,7 +249,7 @@ public class CatalogMocker {
 
         Tablet tablet0 = new Tablet(TEST_TABLET0_ID);
         TabletMeta tabletMeta = new TabletMeta(TEST_DB_ID, TEST_TBL_ID, 
TEST_SINGLE_PARTITION_ID,
-                                               TEST_TBL_ID, SCHEMA_HASH, 
TStorageMedium.HDD);
+                                               TEST_TBL_ID, SCHEMA_HASH, 
TStorageMedium.HDD, -1, -1);
         baseIndex.addTablet(tablet0, tabletMeta);
         Replica replica0 = new Replica(TEST_REPLICA0_ID, BACKEND1_ID, 0, 
ReplicaState.NORMAL);
         Replica replica1 = new Replica(TEST_REPLICA1_ID, BACKEND2_ID, 0, 
ReplicaState.NORMAL);
@@ -323,7 +323,7 @@ public class CatalogMocker {
 
         Tablet baseTabletP1 = new Tablet(TEST_BASE_TABLET_P1_ID);
         TabletMeta tabletMetaBaseTabletP1 = new TabletMeta(TEST_DB_ID, 
TEST_TBL2_ID, TEST_PARTITION1_ID,
-                                                           TEST_TBL2_ID, 
SCHEMA_HASH, TStorageMedium.HDD);
+                                                           TEST_TBL2_ID, 
SCHEMA_HASH, TStorageMedium.HDD, -1, -1);
         baseIndexP1.addTablet(baseTabletP1, tabletMetaBaseTabletP1);
         Replica replica3 = new Replica(TEST_REPLICA3_ID, BACKEND1_ID, 0, 
ReplicaState.NORMAL);
         Replica replica4 = new Replica(TEST_REPLICA4_ID, BACKEND2_ID, 0, 
ReplicaState.NORMAL);
@@ -335,7 +335,7 @@ public class CatalogMocker {
 
         Tablet baseTabletP2 = new Tablet(TEST_BASE_TABLET_P2_ID);
         TabletMeta tabletMetaBaseTabletP2 = new TabletMeta(TEST_DB_ID, 
TEST_TBL2_ID, TEST_PARTITION2_ID,
-                                                           TEST_TBL2_ID, 
SCHEMA_HASH, TStorageMedium.HDD);
+                                                           TEST_TBL2_ID, 
SCHEMA_HASH, TStorageMedium.HDD, -1, -1);
         baseIndexP2.addTablet(baseTabletP2, tabletMetaBaseTabletP2);
         Replica replica6 = new Replica(TEST_REPLICA6_ID, BACKEND1_ID, 0, 
ReplicaState.NORMAL);
         Replica replica7 = new Replica(TEST_REPLICA7_ID, BACKEND2_ID, 0, 
ReplicaState.NORMAL);
@@ -356,7 +356,7 @@ public class CatalogMocker {
         Tablet rollupTabletP1 = new Tablet(TEST_ROLLUP_TABLET_P1_ID);
         TabletMeta tabletMetaRollupTabletP1 = new TabletMeta(TEST_DB_ID, 
TEST_TBL2_ID, TEST_PARTITION1_ID,
                                                              
TEST_ROLLUP_TABLET_P1_ID, ROLLUP_SCHEMA_HASH,
-                                                             
TStorageMedium.HDD);
+                                                             
TStorageMedium.HDD, -1, -1);
         rollupIndexP1.addTablet(rollupTabletP1, tabletMetaRollupTabletP1);
         Replica replica9 = new Replica(TEST_REPLICA9_ID, BACKEND1_ID, 0, 
ReplicaState.NORMAL);
         Replica replica10 = new Replica(TEST_REPLICA10_ID, BACKEND2_ID, 0, 
ReplicaState.NORMAL);
@@ -373,7 +373,7 @@ public class CatalogMocker {
         Tablet rollupTabletP2 = new Tablet(TEST_ROLLUP_TABLET_P2_ID);
         TabletMeta tabletMetaRollupTabletP2 = new TabletMeta(TEST_DB_ID, 
TEST_TBL2_ID, TEST_PARTITION1_ID,
                                                              
TEST_ROLLUP_TABLET_P2_ID, ROLLUP_SCHEMA_HASH,
-                                                             
TStorageMedium.HDD);
+                                                             
TStorageMedium.HDD, -1, -1);
         rollupIndexP2.addTablet(rollupTabletP2, tabletMetaRollupTabletP2);
         Replica replica12 = new Replica(TEST_REPLICA12_ID, BACKEND1_ID, 0, 
ReplicaState.NORMAL);
         Replica replica13 = new Replica(TEST_REPLICA13_ID, BACKEND2_ID, 0, 
ReplicaState.NORMAL);
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java 
b/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java
index 0c180e68aa..d79855ce67 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java
@@ -193,7 +193,7 @@ public class CatalogTestUtil {
 
         // index
         MaterializedIndex index = new MaterializedIndex(indexId, 
IndexState.NORMAL);
-        TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId, 
indexId, 0, TStorageMedium.HDD);
+        TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId, 
indexId, 0, TStorageMedium.HDD, -1, -1);
         index.addTablet(tablet, tabletMeta);
 
         tablet.addReplica(replica1);
@@ -260,7 +260,8 @@ public class CatalogTestUtil {
 
         // index
         MaterializedIndex index = new MaterializedIndex(testIndexId2, 
IndexState.NORMAL);
-        TabletMeta tabletMeta = new TabletMeta(testDbId1, testTableId2, 
testPartitionId2, testIndexId2, 0, TStorageMedium.HDD);
+        TabletMeta tabletMeta = new TabletMeta(testDbId1, testTableId2, 
testPartitionId2, testIndexId2, 0,
+                TStorageMedium.HDD, -1, -1);
         index.addTablet(tablet, tabletMeta);
 
         tablet.addReplica(replica);
diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/TabletTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/catalog/TabletTest.java
index d7fdb2694a..14e20a1bd7 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/catalog/TabletTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/TabletTest.java
@@ -66,7 +66,7 @@ public class TabletTest {
         };
 
         tablet = new Tablet(1);
-        TabletMeta tabletMeta = new TabletMeta(10, 20, 30, 40, 1, 
TStorageMedium.HDD);
+        TabletMeta tabletMeta = new TabletMeta(10, 20, 30, 40, 1, 
TStorageMedium.HDD, -1, -1);
         invertedIndex.addTablet(1, tabletMeta);
         replica1 = new Replica(1L, 1L, 100L, 0, 200000L, 0, 3000L, 
ReplicaState.NORMAL, 0, 0);
         replica2 = new Replica(2L, 2L, 100L, 0, 200000L, 0, 3000L, 
ReplicaState.NORMAL, 0, 0);
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/clone/ClusterLoadStatisticsTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/clone/ClusterLoadStatisticsTest.java
index c81c3839c4..1ac66444ef 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/clone/ClusterLoadStatisticsTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/clone/ClusterLoadStatisticsTest.java
@@ -145,16 +145,16 @@ public class ClusterLoadStatisticsTest {
         // tablet
         invertedIndex = new TabletInvertedIndex();
 
-        invertedIndex.addTablet(50000, new TabletMeta(1, 2, 3, 4, 5, 
TStorageMedium.HDD));
+        invertedIndex.addTablet(50000, new TabletMeta(1, 2, 3, 4, 5, 
TStorageMedium.HDD, -1, -1));
         invertedIndex.addReplica(50000, new Replica(50001, be1.getId(), 0, 
ReplicaState.NORMAL));
         invertedIndex.addReplica(50000, new Replica(50002, be2.getId(), 0, 
ReplicaState.NORMAL));
         invertedIndex.addReplica(50000, new Replica(50003, be3.getId(), 0, 
ReplicaState.NORMAL));
 
-        invertedIndex.addTablet(60000, new TabletMeta(1, 2, 3, 4, 5, 
TStorageMedium.HDD));
+        invertedIndex.addTablet(60000, new TabletMeta(1, 2, 3, 4, 5, 
TStorageMedium.HDD, -1, -1));
         invertedIndex.addReplica(60000, new Replica(60002, be2.getId(), 0, 
ReplicaState.NORMAL));
         invertedIndex.addReplica(60000, new Replica(60003, be3.getId(), 0, 
ReplicaState.NORMAL));
 
-        invertedIndex.addTablet(70000, new TabletMeta(1, 2, 3, 4, 5, 
TStorageMedium.HDD));
+        invertedIndex.addTablet(70000, new TabletMeta(1, 2, 3, 4, 5, 
TStorageMedium.HDD, -1, -1));
         invertedIndex.addReplica(70000, new Replica(70002, be2.getId(), 0, 
ReplicaState.NORMAL));
         invertedIndex.addReplica(70000, new Replica(70003, be3.getId(), 0, 
ReplicaState.NORMAL));
     }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/clone/RebalancerTestUtil.java 
b/fe/fe-core/src/test/java/org/apache/doris/clone/RebalancerTestUtil.java
index 88f02df4cb..756510de77 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/clone/RebalancerTestUtil.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/clone/RebalancerTestUtil.java
@@ -81,7 +81,7 @@ public class RebalancerTestUtil {
         int schemaHash = olapTable.getSchemaHashByIndexId(baseIndex.getId());
 
         TabletMeta tabletMeta = new TabletMeta(db.getId(), olapTable.getId(),
-                partition.getId(), baseIndex.getId(), schemaHash, medium);
+                partition.getId(), baseIndex.getId(), schemaHash, medium, -1, 
-1);
         Tablet tablet = new Tablet(tabletId);
 
         // add tablet to olapTable
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/common/util/UnitTestUtil.java 
b/fe/fe-core/src/test/java/org/apache/doris/common/util/UnitTestUtil.java
index 4460fcb31f..d933b80982 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/common/util/UnitTestUtil.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/common/util/UnitTestUtil.java
@@ -78,7 +78,7 @@ public class UnitTestUtil {
 
         // index
         MaterializedIndex index = new MaterializedIndex(indexId, 
IndexState.NORMAL);
-        TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId, 
indexId, 0, TStorageMedium.HDD);
+        TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId, 
indexId, 0, TStorageMedium.HDD, -1, -1);
         index.addTablet(tablet, tabletMeta);
 
         tablet.addReplica(replica1);
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/cooldown/CooldownJobTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/cooldown/CooldownJobTest.java
new file mode 100644
index 0000000000..6e16e9e945
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/cooldown/CooldownJobTest.java
@@ -0,0 +1,130 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cooldown;
+
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.KeysType;
+import org.apache.doris.catalog.MaterializedIndex;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.PartitionInfo;
+import org.apache.doris.catalog.Replica;
+import org.apache.doris.catalog.Tablet;
+import org.apache.doris.catalog.TabletMeta;
+import org.apache.doris.cluster.Cluster;
+import org.apache.doris.persist.EditLog;
+import org.apache.doris.thrift.TStorageMedium;
+
+import mockit.Mocked;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+public class CooldownJobTest {
+
+    private static long jobId = 100L;
+    private static long dbId = 101L;
+    private static long tableId = 102L;
+    private static long partitionId = 103L;
+    private static long indexId = 104L;
+    private static long tabletId = 105L;
+    private static long replicaId = 106L;
+    private static long backendId = 107L;
+    private static long cooldownReplicaId = 106L;
+    private static long cooldownTerm = 109L;
+    private static long timeoutMs = 10000L;
+    private static Tablet tablet = new Tablet(tabletId);
+    private static Replica replica = new Replica(replicaId, backendId, 1, 
Replica.ReplicaState.NORMAL);
+
+    private static CooldownConf cooldownConf = new CooldownConf(dbId, tableId, 
partitionId, indexId, tabletId,
+            cooldownReplicaId, cooldownTerm);
+
+    private static List<CooldownConf> cooldownConfList = new LinkedList<>();
+
+    @Mocked
+    private EditLog editLog;
+
+    public static CooldownJob createCooldownJob() {
+        tablet.setCooldownReplicaId(cooldownReplicaId);
+        tablet.setCooldownTerm(cooldownTerm);
+        cooldownConfList.add(cooldownConf);
+        return new CooldownJob(jobId, cooldownConfList, timeoutMs);
+    }
+
+    @Before
+    public void setUp() {
+        Cluster testCluster = new Cluster("test_cluster", 0);
+        Database db = new Database(dbId, "db1");
+        db.setClusterName("test_cluster");
+        Env.getCurrentEnv().addCluster(testCluster);
+        Env.getCurrentEnv().unprotectCreateDb(db);
+        OlapTable table = new OlapTable(tableId, "testTable", new 
ArrayList<>(), KeysType.DUP_KEYS,
+                new PartitionInfo(), null);
+        table.setId(tableId);
+        db.createTable(table);
+        MaterializedIndex baseIndex = new MaterializedIndex();
+        baseIndex.setIdForRestore(indexId);
+        Partition partition = new Partition(partitionId, "part1", baseIndex, 
null);
+        table.addPartition(partition);
+        TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId, 
indexId, 1, TStorageMedium.HDD,
+                cooldownReplicaId, cooldownTerm);
+        baseIndex.addTablet(tablet, tabletMeta);
+        tablet.addReplica(replica);
+        Env.getCurrentEnv().setEditLog(editLog);
+    }
+
+    @Test
+    public void testPending() throws Exception {
+        CooldownJob cooldownJob = createCooldownJob();
+        cooldownJob.runPendingJob();
+        Assert.assertEquals(CooldownJob.JobState.SEND_CONF, 
cooldownJob.jobState);
+        for (CooldownConf conf : cooldownJob.getCooldownConfList()) {
+            Assert.assertEquals(conf.getCooldownReplicaId(), replica.getId());
+        }
+        CooldownJob job1 = createCooldownJob();
+        job1.replay(cooldownJob);
+        Assert.assertEquals(CooldownJob.JobState.SEND_CONF, job1.jobState);
+        // run send job
+        cooldownJob.runSendJob();
+        Assert.assertEquals(CooldownJob.JobState.RUNNING, 
cooldownJob.jobState);
+        // run replay finish job
+        cooldownJob.jobState = CooldownJob.JobState.FINISHED;
+        job1.replay(cooldownJob);
+        Assert.assertEquals(CooldownJob.JobState.FINISHED, job1.jobState);
+    }
+
+    @Test
+    public void testCancelJob() throws Exception {
+        CooldownJob cooldownJob = createCooldownJob();
+        cooldownJob.runPendingJob();
+        Assert.assertEquals(CooldownJob.JobState.SEND_CONF, 
cooldownJob.jobState);
+        Assert.assertEquals(cooldownReplicaId, replica.getId());
+        // run send job
+        cooldownJob.runSendJob();
+        Assert.assertEquals(CooldownJob.JobState.RUNNING, 
cooldownJob.jobState);
+        // run cancel job
+        cooldownJob.cancelImpl("test cancel");
+        Assert.assertEquals(CooldownJob.JobState.CANCELLED, 
cooldownJob.jobState);
+    }
+
+}
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/http/DorisHttpTestCase.java 
b/fe/fe-core/src/test/java/org/apache/doris/http/DorisHttpTestCase.java
index e4310243c4..df8ed5a316 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/http/DorisHttpTestCase.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/http/DorisHttpTestCase.java
@@ -156,7 +156,8 @@ public abstract class DorisHttpTestCase {
 
         // index
         MaterializedIndex baseIndex = new MaterializedIndex(testIndexId, 
MaterializedIndex.IndexState.NORMAL);
-        TabletMeta tabletMeta = new TabletMeta(testDbId, testTableId, 
testPartitionId, testIndexId, testSchemaHash, TStorageMedium.HDD);
+        TabletMeta tabletMeta = new TabletMeta(testDbId, testTableId, 
testPartitionId, testIndexId, testSchemaHash,
+                TStorageMedium.HDD, -1, -1);
         baseIndex.addTablet(tablet, tabletMeta);
 
         tablet.addReplica(replica1);
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/load/DeleteHandlerTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/load/DeleteHandlerTest.java
index 5c2c005972..e464517d7f 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/load/DeleteHandlerTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/load/DeleteHandlerTest.java
@@ -116,7 +116,7 @@ public class DeleteHandlerTest {
         auth = AccessTestUtil.fetchAdminAccess();
         analyzer = AccessTestUtil.fetchAdminAnalyzer(false);
         db = CatalogMocker.mockDb();
-        TabletMeta tabletMeta = new TabletMeta(DB_ID, TBL_ID, PARTITION_ID, 
TBL_ID, 0, null);
+        TabletMeta tabletMeta = new TabletMeta(DB_ID, TBL_ID, PARTITION_ID, 
TBL_ID, 0, null, -1, -1);
         invertedIndex.addTablet(TABLET_ID, tabletMeta);
         invertedIndex.addReplica(TABLET_ID, new Replica(REPLICA_ID_1, 
BACKEND_ID_1, 0, Replica.ReplicaState.NORMAL));
         invertedIndex.addReplica(TABLET_ID, new Replica(REPLICA_ID_2, 
BACKEND_ID_2, 0, Replica.ReplicaState.NORMAL));
diff --git a/gensrc/thrift/AgentService.thrift 
b/gensrc/thrift/AgentService.thrift
index 38033224d6..3215c63d19 100644
--- a/gensrc/thrift/AgentService.thrift
+++ b/gensrc/thrift/AgentService.thrift
@@ -349,6 +349,16 @@ struct TPluginMetaInfo {
     4: optional string source
 }
 
+struct TCooldownConf {
+    1: required Types.TTabletId tablet_id
+    2: optional Types.TReplicaId cooldown_replica_id
+    3: optional i64 cooldown_term
+}
+
+struct TPushCooldownConfReq {
+    1: required list<TCooldownConf> cooldown_confs
+}
+
 struct TAgentTaskRequest {
     1: required TAgentServiceVersion protocol_version
     2: required Types.TTaskType task_type
@@ -380,6 +390,7 @@ struct TAgentTaskRequest {
     27: optional TCompactionReq compaction_req
     28: optional TStorageMigrationReqV2 storage_migration_req_v2
     29: optional TGetStoragePolicy update_policy
+    30: optional TPushCooldownConfReq push_cooldown_conf
 }
 
 struct TAgentResult {
diff --git a/gensrc/thrift/MasterService.thrift 
b/gensrc/thrift/MasterService.thrift
index 7c9b923198..6852f21426 100644
--- a/gensrc/thrift/MasterService.thrift
+++ b/gensrc/thrift/MasterService.thrift
@@ -42,6 +42,9 @@ struct TTabletInfo {
     15: optional Types.TReplicaId replica_id
     // data size on remote storage
     16: optional Types.TSize remote_data_size
+    17: optional Types.TReplicaId cooldown_replica_id
+    18: optional bool is_cooldown = false
+    19: optional i64 cooldown_term = -1
 }
 
 struct TFinishTaskRequest {
diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift
index 589b95f77c..1a92304b38 100644
--- a/gensrc/thrift/Types.thrift
+++ b/gensrc/thrift/Types.thrift
@@ -202,7 +202,8 @@ enum TTaskType {
     UNINSTALL_PLUGIN,
     COMPACTION,
     STORAGE_MEDIUM_MIGRATE_V2,
-    NOTIFY_UPDATE_STORAGE_POLICY
+    NOTIFY_UPDATE_STORAGE_POLICY,
+    PUSH_COOLDOWN_CONF
 }
 
 enum TStmtType {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to