This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-4.0-preview
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 241981ec31dc72520aff0184c014aa5d38e6e77f
Author: zzzxl <[email protected]>
AuthorDate: Wed Apr 10 17:56:13 2024 +0800

    [fix](inverted index) cloud mod supports time series (#33414)
---
 be/src/cloud/cloud_cumulative_compaction.cpp       |  17 ++-
 .../cloud/cloud_cumulative_compaction_policy.cpp   | 150 ++++++++++++++++++-
 be/src/cloud/cloud_cumulative_compaction_policy.h  |  63 ++++++--
 be/src/cloud/cloud_rowset_builder.cpp              |   4 +
 be/src/cloud/cloud_rowset_writer.cpp               |   1 +
 be/src/cloud/cloud_storage_engine.cpp              |  18 ++-
 be/src/cloud/cloud_storage_engine.h                |  11 +-
 be/src/cloud/cloud_tablet.cpp                      |  18 +++
 be/src/cloud/cloud_tablet_mgr.cpp                  |   1 +
 be/src/olap/compaction.cpp                         |   5 +
 be/src/olap/rowset/rowset_writer_context.h         |   2 +
 cloud/src/meta-service/meta_service.cpp            |  17 +++
 .../org/apache/doris/alter/CloudRollupJobV2.java   |   7 +-
 .../apache/doris/alter/CloudSchemaChangeJobV2.java |   7 +-
 .../java/org/apache/doris/catalog/OlapTable.java   |  12 +-
 .../cloud/alter/CloudSchemaChangeHandler.java      | 160 ++++++++++++++++++++-
 .../cloud/datasource/CloudInternalCatalog.java     |  19 ++-
 gensrc/proto/cloud.proto                           |   6 +
 18 files changed, 483 insertions(+), 35 deletions(-)

diff --git a/be/src/cloud/cloud_cumulative_compaction.cpp 
b/be/src/cloud/cloud_cumulative_compaction.cpp
index 4e43246eebd..39ce711c9db 100644
--- a/be/src/cloud/cloud_cumulative_compaction.cpp
+++ b/be/src/cloud/cloud_cumulative_compaction.cpp
@@ -200,8 +200,11 @@ Status CloudCumulativeCompaction::execute_compact() {
 Status CloudCumulativeCompaction::modify_rowsets() {
     // calculate new cumulative point
     int64_t input_cumulative_point = cloud_tablet()->cumulative_layer_point();
-    int64_t new_cumulative_point = 
_engine.cumu_compaction_policy()->new_cumulative_point(
-            cloud_tablet(), _output_rowset, _last_delete_version, 
input_cumulative_point);
+    auto compaction_policy = 
cloud_tablet()->tablet_meta()->compaction_policy();
+    int64_t new_cumulative_point =
+            _engine.cumu_compaction_policy(compaction_policy)
+                    ->new_cumulative_point(cloud_tablet(), _output_rowset, 
_last_delete_version,
+                                           input_cumulative_point);
     // commit compaction job
     cloud::TabletJobInfoPB job;
     auto idx = job.mutable_idx();
@@ -352,10 +355,12 @@ Status 
CloudCumulativeCompaction::pick_rowsets_to_compact() {
     }
 
     size_t compaction_score = 0;
-    _engine.cumu_compaction_policy()->pick_input_rowsets(
-            cloud_tablet(), candidate_rowsets, 
config::cumulative_compaction_max_deltas,
-            config::cumulative_compaction_min_deltas, &_input_rowsets, 
&_last_delete_version,
-            &compaction_score);
+    auto compaction_policy = 
cloud_tablet()->tablet_meta()->compaction_policy();
+    _engine.cumu_compaction_policy(compaction_policy)
+            ->pick_input_rowsets(cloud_tablet(), candidate_rowsets,
+                                 config::cumulative_compaction_max_deltas,
+                                 config::cumulative_compaction_min_deltas, 
&_input_rowsets,
+                                 &_last_delete_version, &compaction_score);
 
     if (_input_rowsets.empty()) {
         return Status::Error<CUMULATIVE_NO_SUITABLE_VERSION>("no suitable 
versions");
diff --git a/be/src/cloud/cloud_cumulative_compaction_policy.cpp 
b/be/src/cloud/cloud_cumulative_compaction_policy.cpp
index 5875340ec7b..ecf53bd2c68 100644
--- a/be/src/cloud/cloud_cumulative_compaction_policy.cpp
+++ b/be/src/cloud/cloud_cumulative_compaction_policy.cpp
@@ -49,7 +49,7 @@ int64_t 
CloudSizeBasedCumulativeCompactionPolicy::_level_size(const int64_t size
     return (int64_t)1 << (sizeof(size) * 8 - 1 - __builtin_clzl(size));
 }
 
-int CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets(
+int32_t CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets(
         CloudTablet* tablet, const std::vector<RowsetSharedPtr>& 
candidate_rowsets,
         const int64_t max_compaction_score, const int64_t min_compaction_score,
         std::vector<RowsetSharedPtr>* input_rowsets, Version* 
last_delete_version,
@@ -213,4 +213,152 @@ int64_t 
CloudSizeBasedCumulativeCompactionPolicy::new_cumulative_point(
                    : last_cumulative_point;
 }
 
+int32_t CloudTimeSeriesCumulativeCompactionPolicy::pick_input_rowsets(
+        CloudTablet* tablet, const std::vector<RowsetSharedPtr>& 
candidate_rowsets,
+        const int64_t max_compaction_score, const int64_t min_compaction_score,
+        std::vector<RowsetSharedPtr>* input_rowsets, Version* 
last_delete_version,
+        size_t* compaction_score, bool allow_delete) {
+    if (tablet->tablet_state() == TABLET_NOTREADY) {
+        return 0;
+    }
+
+    int64_t compaction_goal_size_mbytes =
+            tablet->tablet_meta()->time_series_compaction_goal_size_mbytes();
+
+    int transient_size = 0;
+    *compaction_score = 0;
+    input_rowsets->clear();
+    int64_t total_size = 0;
+
+    for (const auto& rowset : candidate_rowsets) {
+        // check whether this rowset is delete version
+        if (!allow_delete && rowset->rowset_meta()->has_delete_predicate()) {
+            *last_delete_version = rowset->version();
+            if (!input_rowsets->empty()) {
+                // we meet a delete version, and there were other versions 
before.
+                // we should compact those version before handling them over 
to base compaction
+                break;
+            } else {
+                // we meet a delete version, and no other versions before, 
skip it and continue
+                input_rowsets->clear();
+                *compaction_score = 0;
+                transient_size = 0;
+                total_size = 0;
+                continue;
+            }
+        }
+
+        *compaction_score += rowset->rowset_meta()->get_compaction_score();
+        total_size += rowset->rowset_meta()->total_disk_size();
+
+        transient_size += 1;
+        input_rowsets->push_back(rowset);
+
+        // Condition 1: the size of input files for compaction meets the 
requirement of parameter compaction_goal_size
+        if (total_size >= (compaction_goal_size_mbytes * 1024 * 1024)) {
+            if (input_rowsets->size() == 1 &&
+                
!input_rowsets->front()->rowset_meta()->is_segments_overlapping()) {
+                // Only 1 non-overlapping rowset, skip it
+                input_rowsets->clear();
+                *compaction_score = 0;
+                total_size = 0;
+                continue;
+            }
+            return transient_size;
+        }
+    }
+
+    // if there is delete version, do compaction directly
+    if (last_delete_version->first != -1) {
+        // if there is only one rowset and not overlapping,
+        // we do not need to do cumulative compaction
+        if (input_rowsets->size() == 1 &&
+            !input_rowsets->front()->rowset_meta()->is_segments_overlapping()) 
{
+            input_rowsets->clear();
+            *compaction_score = 0;
+        }
+        return transient_size;
+    }
+
+    // Condition 2: the number of input files reaches the threshold specified 
by parameter compaction_file_count_threshold
+    if (*compaction_score >= 
tablet->tablet_meta()->time_series_compaction_file_count_threshold()) {
+        return transient_size;
+    }
+
+    // Condition 3: level1 achieve compaction_goal_size
+    std::vector<RowsetSharedPtr> level1_rowsets;
+    if (tablet->tablet_meta()->time_series_compaction_level_threshold() >= 2) {
+        int64_t continuous_size = 0;
+        for (const auto& rowset : candidate_rowsets) {
+            const auto& rs_meta = rowset->rowset_meta();
+            if (rs_meta->compaction_level() == 0) {
+                break;
+            }
+            level1_rowsets.push_back(rowset);
+            continuous_size += rs_meta->total_disk_size();
+            if (level1_rowsets.size() >= 2) {
+                if (continuous_size >= compaction_goal_size_mbytes * 1024 * 
1024) {
+                    input_rowsets->swap(level1_rowsets);
+                    return input_rowsets->size();
+                }
+            }
+        }
+    }
+
+    int64_t now = UnixMillis();
+    int64_t last_cumu = tablet->last_cumu_compaction_success_time();
+    if (last_cumu != 0) {
+        int64_t cumu_interval = now - last_cumu;
+
+        // Condition 4: the time interval between compactions exceeds the 
value specified by parameter compaction_time_threshold_second
+        if (cumu_interval >
+            
(tablet->tablet_meta()->time_series_compaction_time_threshold_seconds() * 
1000)) {
+            if 
(tablet->tablet_meta()->time_series_compaction_level_threshold() >= 2) {
+                if (input_rowsets->empty() && level1_rowsets.size() >= 2) {
+                    input_rowsets->swap(level1_rowsets);
+                    return input_rowsets->size();
+                }
+            }
+            return transient_size;
+        }
+    }
+
+    input_rowsets->clear();
+    *compaction_score = 0;
+
+    return 0;
+}
+
+int64_t CloudTimeSeriesCumulativeCompactionPolicy::new_compaction_level(
+        const std::vector<RowsetSharedPtr>& input_rowsets) {
+    int64_t first_level = 0;
+    for (size_t i = 0; i < input_rowsets.size(); i++) {
+        int64_t cur_level = 
input_rowsets[i]->rowset_meta()->compaction_level();
+        if (i == 0) {
+            first_level = cur_level;
+        } else {
+            if (first_level != cur_level) {
+                LOG(ERROR) << "Failed to check compaction level, first_level: 
" << first_level
+                           << ", cur_level: " << cur_level;
+            }
+        }
+    }
+    return first_level + 1;
+}
+
+int64_t CloudTimeSeriesCumulativeCompactionPolicy::new_cumulative_point(
+        CloudTablet* tablet, const RowsetSharedPtr& output_rowset, Version& 
last_delete_version,
+        int64_t last_cumulative_point) {
+    if (tablet->tablet_state() != TABLET_RUNNING || 
output_rowset->num_segments() == 0) {
+        return last_cumulative_point;
+    }
+
+    if (tablet->tablet_meta()->time_series_compaction_level_threshold() >= 2 &&
+        output_rowset->rowset_meta()->compaction_level() < 2) {
+        return last_cumulative_point;
+    }
+
+    return output_rowset->end_version() + 1;
+}
+
 } // namespace doris
diff --git a/be/src/cloud/cloud_cumulative_compaction_policy.h 
b/be/src/cloud/cloud_cumulative_compaction_policy.h
index dffe6e0cd0f..17edc41859e 100644
--- a/be/src/cloud/cloud_cumulative_compaction_policy.h
+++ b/be/src/cloud/cloud_cumulative_compaction_policy.h
@@ -34,7 +34,26 @@ namespace doris {
 class Tablet;
 struct Version;
 
-class CloudSizeBasedCumulativeCompactionPolicy {
+class CloudCumulativeCompactionPolicy {
+public:
+    virtual ~CloudCumulativeCompactionPolicy() = default;
+
+    virtual int64_t new_cumulative_point(CloudTablet* tablet, const 
RowsetSharedPtr& output_rowset,
+                                         Version& last_delete_version,
+                                         int64_t last_cumulative_point) = 0;
+
+    virtual int64_t new_compaction_level(const std::vector<RowsetSharedPtr>& 
input_rowsets) = 0;
+
+    virtual int32_t pick_input_rowsets(CloudTablet* tablet,
+                                       const std::vector<RowsetSharedPtr>& 
candidate_rowsets,
+                                       const int64_t max_compaction_score,
+                                       const int64_t min_compaction_score,
+                                       std::vector<RowsetSharedPtr>* 
input_rowsets,
+                                       Version* last_delete_version, size_t* 
compaction_score,
+                                       bool allow_delete = false) = 0;
+};
+
+class CloudSizeBasedCumulativeCompactionPolicy : public 
CloudCumulativeCompactionPolicy {
 public:
     CloudSizeBasedCumulativeCompactionPolicy(
             int64_t promotion_size = config::compaction_promotion_size_mbytes 
* 1024 * 1024,
@@ -43,17 +62,23 @@ public:
             int64_t compaction_min_size = config::compaction_min_size_mbytes * 
1024 * 1024,
             int64_t promotion_version_count = 
config::compaction_promotion_version_count);
 
-    ~CloudSizeBasedCumulativeCompactionPolicy() {}
+    ~CloudSizeBasedCumulativeCompactionPolicy() override = default;
 
     int64_t new_cumulative_point(CloudTablet* tablet, const RowsetSharedPtr& 
output_rowset,
-                                 Version& last_delete_version, int64_t 
last_cumulative_point);
+                                 Version& last_delete_version,
+                                 int64_t last_cumulative_point) override;
+
+    int64_t new_compaction_level(const std::vector<RowsetSharedPtr>& 
input_rowsets) override {
+        return 0;
+    }
 
-    int pick_input_rowsets(CloudTablet* tablet,
-                           const std::vector<RowsetSharedPtr>& 
candidate_rowsets,
-                           const int64_t max_compaction_score, const int64_t 
min_compaction_score,
-                           std::vector<RowsetSharedPtr>* input_rowsets,
-                           Version* last_delete_version, size_t* 
compaction_score,
-                           bool allow_delete = false);
+    int32_t pick_input_rowsets(CloudTablet* tablet,
+                               const std::vector<RowsetSharedPtr>& 
candidate_rowsets,
+                               const int64_t max_compaction_score,
+                               const int64_t min_compaction_score,
+                               std::vector<RowsetSharedPtr>* input_rowsets,
+                               Version* last_delete_version, size_t* 
compaction_score,
+                               bool allow_delete = false) override;
 
 private:
     int64_t _level_size(const int64_t size);
@@ -73,4 +98,24 @@ private:
     int64_t _promotion_version_count;
 };
 
+class CloudTimeSeriesCumulativeCompactionPolicy : public 
CloudCumulativeCompactionPolicy {
+public:
+    CloudTimeSeriesCumulativeCompactionPolicy() = default;
+    ~CloudTimeSeriesCumulativeCompactionPolicy() override = default;
+
+    int64_t new_cumulative_point(CloudTablet* tablet, const RowsetSharedPtr& 
output_rowset,
+                                 Version& last_delete_version,
+                                 int64_t last_cumulative_point) override;
+
+    int64_t new_compaction_level(const std::vector<RowsetSharedPtr>& 
input_rowsets) override;
+
+    int32_t pick_input_rowsets(CloudTablet* tablet,
+                               const std::vector<RowsetSharedPtr>& 
candidate_rowsets,
+                               const int64_t max_compaction_score,
+                               const int64_t min_compaction_score,
+                               std::vector<RowsetSharedPtr>* input_rowsets,
+                               Version* last_delete_version, size_t* 
compaction_score,
+                               bool allow_delete = false) override;
+};
+
 } // namespace doris
diff --git a/be/src/cloud/cloud_rowset_builder.cpp 
b/be/src/cloud/cloud_rowset_builder.cpp
index b1cbefba544..3585878cd72 100644
--- a/be/src/cloud/cloud_rowset_builder.cpp
+++ b/be/src/cloud/cloud_rowset_builder.cpp
@@ -46,6 +46,10 @@ Status CloudRowsetBuilder::init() {
     }
     RETURN_IF_ERROR(check_tablet_version_count());
 
+    using namespace std::chrono;
+    std::static_pointer_cast<CloudTablet>(_tablet)->last_load_time_ms =
+            
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
+
     // build tablet schema in request level
     _build_current_tablet_schema(_req.index_id, _req.table_schema_param.get(),
                                  *_tablet->tablet_schema());
diff --git a/be/src/cloud/cloud_rowset_writer.cpp 
b/be/src/cloud/cloud_rowset_writer.cpp
index 0111b511750..26c3fe714d3 100644
--- a/be/src/cloud/cloud_rowset_writer.cpp
+++ b/be/src/cloud/cloud_rowset_writer.cpp
@@ -46,6 +46,7 @@ Status CloudRowsetWriter::init(const RowsetWriterContext& 
rowset_writer_context)
     _rowset_meta->set_segments_overlap(_context.segments_overlap);
     _rowset_meta->set_txn_id(_context.txn_id);
     _rowset_meta->set_txn_expiration(_context.txn_expiration);
+    _rowset_meta->set_compaction_level(_context.compaction_level);
     if (_context.rowset_state == PREPARED || _context.rowset_state == 
COMMITTED) {
         _is_pending = true;
         _rowset_meta->set_load_id(_context.load_id);
diff --git a/be/src/cloud/cloud_storage_engine.cpp 
b/be/src/cloud/cloud_storage_engine.cpp
index 24ceeef4277..96e336b3ef3 100644
--- a/be/src/cloud/cloud_storage_engine.cpp
+++ b/be/src/cloud/cloud_storage_engine.cpp
@@ -39,6 +39,7 @@
 #include "io/fs/s3_file_system.h"
 #include "io/hdfs_util.h"
 #include "olap/cumulative_compaction_policy.h"
+#include "olap/cumulative_compaction_time_series_policy.h"
 #include "olap/memtable_flush_executor.h"
 #include "olap/storage_policy.h"
 #include "runtime/memory/cache_manager.h"
@@ -68,9 +69,12 @@ int get_base_thread_num() {
 CloudStorageEngine::CloudStorageEngine(const UniqueId& backend_uid)
         : BaseStorageEngine(Type::CLOUD, backend_uid),
           _meta_mgr(std::make_unique<cloud::CloudMetaMgr>()),
-          _tablet_mgr(std::make_unique<CloudTabletMgr>(*this)),
-          _cumulative_compaction_policy(
-                  
std::make_shared<CloudSizeBasedCumulativeCompactionPolicy>()) {}
+          _tablet_mgr(std::make_unique<CloudTabletMgr>(*this)) {
+    _cumulative_compaction_policies[CUMULATIVE_SIZE_BASED_POLICY] =
+            std::make_shared<CloudSizeBasedCumulativeCompactionPolicy>();
+    _cumulative_compaction_policies[CUMULATIVE_TIME_SERIES_POLICY] =
+            std::make_shared<CloudTimeSeriesCumulativeCompactionPolicy>();
+}
 
 CloudStorageEngine::~CloudStorageEngine() {
     stop();
@@ -766,4 +770,12 @@ Status 
CloudStorageEngine::get_compaction_status_json(std::string* result) {
     return Status::OK();
 }
 
+std::shared_ptr<CloudCumulativeCompactionPolicy> 
CloudStorageEngine::cumu_compaction_policy(
+        std::string_view compaction_policy) {
+    if (!_cumulative_compaction_policies.contains(compaction_policy)) {
+        return 
_cumulative_compaction_policies.at(CUMULATIVE_SIZE_BASED_POLICY);
+    }
+    return _cumulative_compaction_policies.at(compaction_policy);
+}
+
 } // namespace doris
diff --git a/be/src/cloud/cloud_storage_engine.h 
b/be/src/cloud/cloud_storage_engine.h
index b8c4da9cf64..297050360df 100644
--- a/be/src/cloud/cloud_storage_engine.h
+++ b/be/src/cloud/cloud_storage_engine.h
@@ -87,10 +87,6 @@ public:
     void get_cumu_compaction(int64_t tablet_id,
                              
std::vector<std::shared_ptr<CloudCumulativeCompaction>>& res);
 
-    CloudSizeBasedCumulativeCompactionPolicy* cumu_compaction_policy() const {
-        return _cumulative_compaction_policy.get();
-    }
-
     Status submit_compaction_task(const CloudTabletSPtr& tablet, 
CompactionType compaction_type);
 
     Status get_compaction_status_json(std::string* result);
@@ -110,6 +106,9 @@ public:
         return _submitted_full_compactions.count(tablet_id);
     }
 
+    std::shared_ptr<CloudCumulativeCompactionPolicy> cumu_compaction_policy(
+            std::string_view compaction_policy);
+
 private:
     void _refresh_storage_vault_info_thread_callback();
     void _vacuum_stale_rowsets_thread_callback();
@@ -151,7 +150,9 @@ private:
     std::unique_ptr<ThreadPool> _base_compaction_thread_pool;
     std::unique_ptr<ThreadPool> _cumu_compaction_thread_pool;
 
-    std::shared_ptr<CloudSizeBasedCumulativeCompactionPolicy> 
_cumulative_compaction_policy;
+    using CumuPolices =
+            std::unordered_map<std::string_view, 
std::shared_ptr<CloudCumulativeCompactionPolicy>>;
+    CumuPolices _cumulative_compaction_policies;
 };
 
 } // namespace doris
diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp
index f4941deff0e..817df0e0e17 100644
--- a/be/src/cloud/cloud_tablet.cpp
+++ b/be/src/cloud/cloud_tablet.cpp
@@ -32,6 +32,7 @@
 #include "cloud/cloud_storage_engine.h"
 #include "cloud/cloud_tablet_mgr.h"
 #include "io/cache/block_file_cache_factory.h"
+#include "olap/cumulative_compaction_time_series_policy.h"
 #include "olap/olap_define.h"
 #include "olap/rowset/beta_rowset.h"
 #include "olap/rowset/rowset.h"
@@ -409,6 +410,23 @@ Result<std::unique_ptr<RowsetWriter>> 
CloudTablet::create_transient_rowset_write
 }
 
 int64_t CloudTablet::get_cloud_base_compaction_score() const {
+    if (_tablet_meta->compaction_policy() == CUMULATIVE_TIME_SERIES_POLICY) {
+        bool has_delete = false;
+        int64_t point = cumulative_layer_point();
+        for (const auto& rs_meta : _tablet_meta->all_rs_metas()) {
+            if (rs_meta->start_version() >= point) {
+                continue;
+            }
+            if (rs_meta->has_delete_predicate()) {
+                has_delete = true;
+                break;
+            }
+        }
+        if (!has_delete) {
+            return 0;
+        }
+    }
+
     return _approximate_num_rowsets.load(std::memory_order_relaxed) -
            _approximate_cumu_num_rowsets.load(std::memory_order_relaxed);
 }
diff --git a/be/src/cloud/cloud_tablet_mgr.cpp 
b/be/src/cloud/cloud_tablet_mgr.cpp
index 5c4c9af2e7b..06bea6db126 100644
--- a/be/src/cloud/cloud_tablet_mgr.cpp
+++ b/be/src/cloud/cloud_tablet_mgr.cpp
@@ -327,6 +327,7 @@ Status CloudTabletMgr::get_topn_tablets_to_compact(
         if (t == nullptr) { continue; }
 
         int64_t s = score(t.get());
+        if (s <= 0) { continue; }
         if (s > *max_score) {
             max_score_tablet_id = t->tablet_id();
             *max_score = s;
diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index a263d42ec6c..dec407894ef 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -1117,6 +1117,11 @@ Status 
CloudCompactionMixin::construct_output_rowset_writer(RowsetWriterContext&
     ctx.tablet_schema = _cur_tablet_schema;
     ctx.newest_write_timestamp = _newest_write_timestamp;
     ctx.write_type = DataWriteType::TYPE_COMPACTION;
+
+    auto compaction_policy = _tablet->tablet_meta()->compaction_policy();
+    ctx.compaction_level =
+            
_engine.cumu_compaction_policy(compaction_policy)->new_compaction_level(_input_rowsets);
+
     _output_rs_writer = DORIS_TRY(_tablet->create_rowset_writer(ctx, 
_is_vertical));
     
RETURN_IF_ERROR(_engine.meta_mgr().prepare_rowset(*_output_rs_writer->rowset_meta().get()));
     return Status::OK();
diff --git a/be/src/olap/rowset/rowset_writer_context.h 
b/be/src/olap/rowset/rowset_writer_context.h
index e578a9a09ad..366af8cba4e 100644
--- a/be/src/olap/rowset/rowset_writer_context.h
+++ b/be/src/olap/rowset/rowset_writer_context.h
@@ -102,6 +102,8 @@ struct RowsetWriterContext {
     // In semi-structure senario tablet_schema will be updated concurrently,
     // this lock need to be held when update.Use shared_ptr to avoid delete 
copy contructor
     std::shared_ptr<std::mutex> schema_lock;
+
+    int64_t compaction_level = 0;
 };
 
 } // namespace doris
diff --git a/cloud/src/meta-service/meta_service.cpp 
b/cloud/src/meta-service/meta_service.cpp
index e2a19bce339..e27b6b5b944 100644
--- a/cloud/src/meta-service/meta_service.cpp
+++ b/cloud/src/meta-service/meta_service.cpp
@@ -697,6 +697,23 @@ void 
MetaServiceImpl::update_tablet(::google::protobuf::RpcController* controlle
             
tablet_meta.set_group_commit_interval_ms(tablet_meta_info.group_commit_interval_ms());
         } else if (tablet_meta_info.has_group_commit_data_bytes()) {
             
tablet_meta.set_group_commit_data_bytes(tablet_meta_info.group_commit_data_bytes());
+        } else if (tablet_meta_info.has_compaction_policy()) {
+            
tablet_meta.set_compaction_policy(tablet_meta_info.compaction_policy());
+        } else if 
(tablet_meta_info.has_time_series_compaction_goal_size_mbytes()) {
+            tablet_meta.set_time_series_compaction_goal_size_mbytes(
+                    
tablet_meta_info.time_series_compaction_goal_size_mbytes());
+        } else if 
(tablet_meta_info.has_time_series_compaction_file_count_threshold()) {
+            tablet_meta.set_time_series_compaction_file_count_threshold(
+                    
tablet_meta_info.time_series_compaction_file_count_threshold());
+        } else if 
(tablet_meta_info.has_time_series_compaction_time_threshold_seconds()) {
+            tablet_meta.set_time_series_compaction_time_threshold_seconds(
+                    
tablet_meta_info.time_series_compaction_time_threshold_seconds());
+        } else if 
(tablet_meta_info.has_time_series_compaction_empty_rowsets_threshold()) {
+            tablet_meta.set_time_series_compaction_empty_rowsets_threshold(
+                    
tablet_meta_info.time_series_compaction_empty_rowsets_threshold());
+        } else if 
(tablet_meta_info.has_time_series_compaction_level_threshold()) {
+            tablet_meta.set_time_series_compaction_level_threshold(
+                    tablet_meta_info.time_series_compaction_level_threshold());
         }
         int64_t table_id = tablet_meta.table_id();
         int64_t index_id = tablet_meta.index_id();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/alter/CloudRollupJobV2.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/CloudRollupJobV2.java
index 2a7a2bc2f55..4bc34582fff 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/CloudRollupJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/CloudRollupJobV2.java
@@ -189,7 +189,12 @@ public class CloudRollupJobV2 extends RollupJobV2 {
                             tbl.isInMemory(), true,
                             tbl.getName(), tbl.getTTLSeconds(),
                             tbl.getEnableUniqueKeyMergeOnWrite(), 
tbl.storeRowColumn(),
-                            tbl.getBaseSchemaVersion());
+                            tbl.getBaseSchemaVersion(), 
tbl.getCompactionPolicy(),
+                            tbl.getTimeSeriesCompactionGoalSizeMbytes(),
+                            tbl.getTimeSeriesCompactionFileCountThreshold(),
+                            tbl.getTimeSeriesCompactionTimeThresholdSeconds(),
+                            tbl.getTimeSeriesCompactionEmptyRowsetsThreshold(),
+                            tbl.getTimeSeriesCompactionLevelThreshold());
                 requestBuilder.addTabletMetas(builder);
             } // end for rollupTablets
             ((CloudInternalCatalog) Env.getCurrentInternalCatalog())
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/alter/CloudSchemaChangeJobV2.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/CloudSchemaChangeJobV2.java
index 4f474d188d4..cdfcf0a5e20 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/alter/CloudSchemaChangeJobV2.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/alter/CloudSchemaChangeJobV2.java
@@ -201,7 +201,12 @@ public class CloudSchemaChangeJobV2 extends 
SchemaChangeJobV2 {
                                 tbl.getStoragePolicy(), tbl.isInMemory(), true,
                                 tbl.getName(), tbl.getTTLSeconds(),
                                 tbl.getEnableUniqueKeyMergeOnWrite(), 
tbl.storeRowColumn(),
-                                shadowSchemaVersion);
+                                shadowSchemaVersion, tbl.getCompactionPolicy(),
+                                tbl.getTimeSeriesCompactionGoalSizeMbytes(),
+                                
tbl.getTimeSeriesCompactionFileCountThreshold(),
+                                
tbl.getTimeSeriesCompactionTimeThresholdSeconds(),
+                                
tbl.getTimeSeriesCompactionEmptyRowsetsThreshold(),
+                                tbl.getTimeSeriesCompactionLevelThreshold());
                     requestBuilder.addTabletMetas(builder);
                 } // end for rollupTablets
                 ((CloudInternalCatalog) Env.getCurrentInternalCatalog())
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
index 786769bd102..f35968ee22a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -2185,7 +2185,7 @@ public class OlapTable extends Table implements 
MTMVRelatedTableIf {
         if (tableProperty != null) {
             return tableProperty.compactionPolicy();
         }
-        return "";
+        return PropertyAnalyzer.SIZE_BASED_COMPACTION_POLICY;
     }
 
     public void setTimeSeriesCompactionGoalSizeMbytes(long 
timeSeriesCompactionGoalSizeMbytes) {
@@ -2199,7 +2199,7 @@ public class OlapTable extends Table implements 
MTMVRelatedTableIf {
         if (tableProperty != null) {
             return tableProperty.timeSeriesCompactionGoalSizeMbytes();
         }
-        return null;
+        return 
PropertyAnalyzer.TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES_DEFAULT_VALUE;
     }
 
     public void setTimeSeriesCompactionFileCountThreshold(long 
timeSeriesCompactionFileCountThreshold) {
@@ -2213,7 +2213,7 @@ public class OlapTable extends Table implements 
MTMVRelatedTableIf {
         if (tableProperty != null) {
             return tableProperty.timeSeriesCompactionFileCountThreshold();
         }
-        return null;
+        return 
PropertyAnalyzer.TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD_DEFAULT_VALUE;
     }
 
     public void setTimeSeriesCompactionTimeThresholdSeconds(long 
timeSeriesCompactionTimeThresholdSeconds) {
@@ -2228,7 +2228,7 @@ public class OlapTable extends Table implements 
MTMVRelatedTableIf {
         if (tableProperty != null) {
             return tableProperty.timeSeriesCompactionTimeThresholdSeconds();
         }
-        return null;
+        return 
PropertyAnalyzer.TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS_DEFAULT_VALUE;
     }
 
     public void setTimeSeriesCompactionEmptyRowsetsThreshold(long 
timeSeriesCompactionEmptyRowsetsThreshold) {
@@ -2242,7 +2242,7 @@ public class OlapTable extends Table implements 
MTMVRelatedTableIf {
         if (tableProperty != null) {
             return tableProperty.timeSeriesCompactionEmptyRowsetsThreshold();
         }
-        return null;
+        return 
PropertyAnalyzer.TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD_DEFAULT_VALUE;
     }
 
     public void setTimeSeriesCompactionLevelThreshold(long 
timeSeriesCompactionLevelThreshold) {
@@ -2256,7 +2256,7 @@ public class OlapTable extends Table implements 
MTMVRelatedTableIf {
         if (tableProperty != null) {
             return tableProperty.timeSeriesCompactionLevelThreshold();
         }
-        return null;
+        return 
PropertyAnalyzer.TIME_SERIES_COMPACTION_LEVEL_THRESHOLD_DEFAULT_VALUE;
     }
 
     public int getBaseSchemaVersion() {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/alter/CloudSchemaChangeHandler.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/alter/CloudSchemaChangeHandler.java
index 573b394e79f..48602a0a8a4 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/alter/CloudSchemaChangeHandler.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/alter/CloudSchemaChangeHandler.java
@@ -92,7 +92,13 @@ public class CloudSchemaChangeHandler extends 
SchemaChangeHandler {
             throws UserException {
         
Preconditions.checkState(properties.containsKey(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_INTERVAL_MS)
                 || 
properties.containsKey(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_DATA_BYTES)
-                || 
properties.containsKey(PropertyAnalyzer.PROPERTIES_FILE_CACHE_TTL_SECONDS));
+                || 
properties.containsKey(PropertyAnalyzer.PROPERTIES_FILE_CACHE_TTL_SECONDS)
+                || 
properties.containsKey(PropertyAnalyzer.PROPERTIES_COMPACTION_POLICY)
+                || 
properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES)
+                || 
properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD)
+                || 
properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS)
+                || 
properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD)
+                || 
properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD));
 
         if (properties.size() != 1) {
             throw new UserException("Can only set one table property at a 
time");
@@ -149,6 +155,123 @@ public class CloudSchemaChangeHandler extends 
SchemaChangeHandler {
             }
             param.groupCommitDataBytes = groupCommitDataBytes;
             param.type = 
UpdatePartitionMetaParam.TabletMetaType.GROUP_COMMIT_DATA_BYTES;
+        } else if 
(properties.containsKey(PropertyAnalyzer.PROPERTIES_COMPACTION_POLICY)) {
+            String compactionPolicy = 
properties.get(PropertyAnalyzer.PROPERTIES_COMPACTION_POLICY);
+            if (compactionPolicy != null
+                    && 
!compactionPolicy.equals(PropertyAnalyzer.TIME_SERIES_COMPACTION_POLICY)
+                    && 
!compactionPolicy.equals(PropertyAnalyzer.SIZE_BASED_COMPACTION_POLICY)) {
+                throw new UserException("Table compaction policy only support 
for "
+                        + PropertyAnalyzer.TIME_SERIES_COMPACTION_POLICY
+                        + " or " + 
PropertyAnalyzer.SIZE_BASED_COMPACTION_POLICY);
+            }
+            olapTable.readLock();
+            try {
+                if (compactionPolicy == olapTable.getCompactionPolicy()) {
+                    LOG.info("compactionPolicy:{} is equal with 
olapTable.getCompactionPolicy():{}",
+                            compactionPolicy, olapTable.getCompactionPolicy());
+                    return;
+                }
+                partitions.addAll(olapTable.getPartitions());
+            } finally {
+                olapTable.readUnlock();
+            }
+            param.compactionPolicy = compactionPolicy;
+            param.type = 
UpdatePartitionMetaParam.TabletMetaType.COMPACTION_POLICY;
+        } else if 
(properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES))
 {
+            long timeSeriesCompactionGoalSizeMbytes = 
Long.parseLong(properties.get(PropertyAnalyzer
+                    .PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES));
+            olapTable.readLock();
+            try {
+                if (timeSeriesCompactionGoalSizeMbytes
+                        == olapTable.getTimeSeriesCompactionGoalSizeMbytes()) {
+                    LOG.info("timeSeriesCompactionGoalSizeMbytes:{} is equal 
with"
+                                    + " 
olapTable.timeSeriesCompactionGoalSizeMbytes():{}",
+                            timeSeriesCompactionGoalSizeMbytes,
+                            olapTable.getTimeSeriesCompactionGoalSizeMbytes());
+                    return;
+                }
+                partitions.addAll(olapTable.getPartitions());
+            } finally {
+                olapTable.readUnlock();
+            }
+            param.timeSeriesCompactionGoalSizeMbytes = 
timeSeriesCompactionGoalSizeMbytes;
+            param.type = 
UpdatePartitionMetaParam.TabletMetaType.TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES;
+        } else if 
(properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD))
 {
+            long timeSeriesCompactionFileCountThreshold = 
Long.parseLong(properties.get(PropertyAnalyzer
+                    .PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD));
+            olapTable.readLock();
+            try {
+                if (timeSeriesCompactionFileCountThreshold
+                        == 
olapTable.getTimeSeriesCompactionFileCountThreshold()) {
+                    LOG.info("timeSeriesCompactionFileCountThreshold:{} is 
equal with"
+                                    + " 
olapTable.getTimeSeriesCompactionFileCountThreshold():{}",
+                            timeSeriesCompactionFileCountThreshold,
+                            
olapTable.getTimeSeriesCompactionFileCountThreshold());
+                    return;
+                }
+                partitions.addAll(olapTable.getPartitions());
+            } finally {
+                olapTable.readUnlock();
+            }
+            param.timeSeriesCompactionFileCountThreshold = 
timeSeriesCompactionFileCountThreshold;
+            param.type = 
UpdatePartitionMetaParam.TabletMetaType.TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD;
+        } else if 
(properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS))
 {
+            long timeSeriesCompactionTimeThresholdSeconds = 
Long.parseLong(properties.get(PropertyAnalyzer
+                    
.PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS));
+            olapTable.readLock();
+            try {
+                if (timeSeriesCompactionTimeThresholdSeconds
+                        == 
olapTable.getTimeSeriesCompactionTimeThresholdSeconds()) {
+                    LOG.info("timeSeriesCompactionTimeThresholdSeconds:{} is 
equal with"
+                                    + " 
olapTable.getTimeSeriesCompactionTimeThresholdSeconds():{}",
+                            timeSeriesCompactionTimeThresholdSeconds,
+                            
olapTable.getTimeSeriesCompactionTimeThresholdSeconds());
+                    return;
+                }
+                partitions.addAll(olapTable.getPartitions());
+            } finally {
+                olapTable.readUnlock();
+            }
+            param.timeSeriesCompactionTimeThresholdSeconds = 
timeSeriesCompactionTimeThresholdSeconds;
+            param.type = 
UpdatePartitionMetaParam.TabletMetaType.TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS;
+        } else if 
(properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD))
 {
+            long timeSeriesCompactionEmptyRowsetsThreshold = 
Long.parseLong(properties.get(PropertyAnalyzer
+                    
.PROPERTIES_TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD));
+            olapTable.readLock();
+            try {
+                if (timeSeriesCompactionEmptyRowsetsThreshold
+                        == 
olapTable.getTimeSeriesCompactionEmptyRowsetsThreshold()) {
+                    LOG.info("timeSeriesCompactionEmptyRowsetsThreshold:{} is 
equal with"
+                                    + " 
olapTable.getTimeSeriesCompactionEmptyRowsetsThreshold():{}",
+                            timeSeriesCompactionEmptyRowsetsThreshold,
+                            
olapTable.getTimeSeriesCompactionEmptyRowsetsThreshold());
+                    return;
+                }
+                partitions.addAll(olapTable.getPartitions());
+            } finally {
+                olapTable.readUnlock();
+            }
+            param.timeSeriesCompactionEmptyRowsetsThreshold = 
timeSeriesCompactionEmptyRowsetsThreshold;
+            param.type = 
UpdatePartitionMetaParam.TabletMetaType.TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD;
+        } else if 
(properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD))
 {
+            long timeSeriesCompactionLevelThreshold = 
Long.parseLong(properties.get(PropertyAnalyzer
+                    .PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD));
+            olapTable.readLock();
+            try {
+                if (timeSeriesCompactionLevelThreshold
+                        == olapTable.getTimeSeriesCompactionLevelThreshold()) {
+                    LOG.info("timeSeriesCompactionLevelThreshold:{} is equal 
with"
+                                    + " 
olapTable.getTimeSeriesCompactionLevelThreshold():{}",
+                            timeSeriesCompactionLevelThreshold,
+                            olapTable.getTimeSeriesCompactionLevelThreshold());
+                    return;
+                }
+                partitions.addAll(olapTable.getPartitions());
+            } finally {
+                olapTable.readUnlock();
+            }
+            param.timeSeriesCompactionLevelThreshold = 
timeSeriesCompactionLevelThreshold;
+            param.type = 
UpdatePartitionMetaParam.TabletMetaType.TIME_SERIES_COMPACTION_LEVEL_THRESHOLD;
         } else {
             LOG.warn("invalid properties:{}", properties);
             throw new UserException("invalid properties");
@@ -173,6 +296,12 @@ public class CloudSchemaChangeHandler extends 
SchemaChangeHandler {
             TTL_SECONDS,
             GROUP_COMMIT_INTERVAL_MS,
             GROUP_COMMIT_DATA_BYTES,
+            COMPACTION_POLICY,
+            TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES,
+            TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD,
+            TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS,
+            TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD,
+            TIME_SERIES_COMPACTION_LEVEL_THRESHOLD,
         }
 
         TabletMetaType type;
@@ -181,6 +310,12 @@ public class CloudSchemaChangeHandler extends 
SchemaChangeHandler {
         long ttlSeconds = 0;
         long groupCommitIntervalMs = 0;
         long groupCommitDataBytes = 0;
+        String compactionPolicy;
+        long timeSeriesCompactionGoalSizeMbytes = 0;
+        long timeSeriesCompactionFileCountThreshold = 0;
+        long timeSeriesCompactionTimeThresholdSeconds = 0;
+        long timeSeriesCompactionEmptyRowsetsThreshold = 0;
+        long timeSeriesCompactionLevelThreshold = 0;
     }
 
     public void updateCloudPartitionMeta(Database db,
@@ -228,6 +363,29 @@ public class CloudSchemaChangeHandler extends 
SchemaChangeHandler {
                     case GROUP_COMMIT_DATA_BYTES:
                         
infoBuilder.setGroupCommitDataBytes(param.groupCommitDataBytes);
                         break;
+                    case COMPACTION_POLICY:
+                        
infoBuilder.setCompactionPolicy(param.compactionPolicy);
+                        break;
+                    case TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES:
+                        infoBuilder.setTimeSeriesCompactionGoalSizeMbytes(
+                                param.timeSeriesCompactionGoalSizeMbytes);
+                        break;
+                    case TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD:
+                        infoBuilder.setTimeSeriesCompactionFileCountThreshold(
+                                param.timeSeriesCompactionFileCountThreshold);
+                        break;
+                    case TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS:
+                        
infoBuilder.setTimeSeriesCompactionTimeThresholdSeconds(
+                                
param.timeSeriesCompactionTimeThresholdSeconds);
+                        break;
+                    case TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD:
+                        
infoBuilder.setTimeSeriesCompactionEmptyRowsetsThreshold(
+                                
param.timeSeriesCompactionEmptyRowsetsThreshold);
+                        break;
+                    case TIME_SERIES_COMPACTION_LEVEL_THRESHOLD:
+                        infoBuilder.setTimeSeriesCompactionLevelThreshold(
+                                param.timeSeriesCompactionLevelThreshold);
+                        break;
                     default:
                         throw new UserException("Unknown TabletMetaType");
                 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
index 1eaf415b152..42ca4bfe99c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
@@ -155,7 +155,12 @@ public class CloudInternalCatalog extends InternalCatalog {
                         partitionId, tablet, tabletType, schemaHash, keysType, 
shortKeyColumnCount,
                         bfColumns, tbl.getBfFpp(), indexes, columns, 
tbl.getDataSortInfo(),
                         tbl.getCompressionType(), storagePolicy, isInMemory, 
false, tbl.getName(), tbl.getTTLSeconds(),
-                        tbl.getEnableUniqueKeyMergeOnWrite(), 
tbl.storeRowColumn(), indexMeta.getSchemaVersion());
+                        tbl.getEnableUniqueKeyMergeOnWrite(), 
tbl.storeRowColumn(), indexMeta.getSchemaVersion(),
+                        tbl.getCompactionPolicy(), 
tbl.getTimeSeriesCompactionGoalSizeMbytes(),
+                        tbl.getTimeSeriesCompactionFileCountThreshold(),
+                        tbl.getTimeSeriesCompactionTimeThresholdSeconds(),
+                        tbl.getTimeSeriesCompactionEmptyRowsetsThreshold(),
+                        tbl.getTimeSeriesCompactionLevelThreshold());
                 requestBuilder.addTabletMetas(builder);
             }
             if (!storageVaultIdSet && ((CloudEnv) 
Env.getCurrentEnv()).getEnableStorageVault()) {
@@ -198,7 +203,10 @@ public class CloudInternalCatalog extends InternalCatalog {
             List<Column> schemaColumns, DataSortInfo dataSortInfo, 
TCompressionType compressionType,
             String storagePolicy, boolean isInMemory, boolean isShadow,
             String tableName, long ttlSeconds, boolean 
enableUniqueKeyMergeOnWrite,
-            boolean storeRowColumn, int schemaVersion) throws DdlException {
+            boolean storeRowColumn, int schemaVersion, String compactionPolicy,
+            Long timeSeriesCompactionGoalSizeMbytes, Long 
timeSeriesCompactionFileCountThreshold,
+            Long timeSeriesCompactionTimeThresholdSeconds, Long 
timeSeriesCompactionEmptyRowsetsThreshold,
+            Long timeSeriesCompactionLevelThreshold) throws DdlException {
         OlapFile.TabletMetaCloudPB.Builder builder = 
OlapFile.TabletMetaCloudPB.newBuilder();
         builder.setTableId(tableId);
         builder.setIndexId(indexId);
@@ -227,6 +235,13 @@ public class CloudInternalCatalog extends InternalCatalog {
         builder.setReplicaId(tablet.getReplicas().get(0).getId());
         builder.setEnableUniqueKeyMergeOnWrite(enableUniqueKeyMergeOnWrite);
 
+        builder.setCompactionPolicy(compactionPolicy);
+        
builder.setTimeSeriesCompactionGoalSizeMbytes(timeSeriesCompactionGoalSizeMbytes);
+        
builder.setTimeSeriesCompactionFileCountThreshold(timeSeriesCompactionFileCountThreshold);
+        
builder.setTimeSeriesCompactionTimeThresholdSeconds(timeSeriesCompactionTimeThresholdSeconds);
+        
builder.setTimeSeriesCompactionEmptyRowsetsThreshold(timeSeriesCompactionEmptyRowsetsThreshold);
+        
builder.setTimeSeriesCompactionLevelThreshold(timeSeriesCompactionLevelThreshold);
+
         OlapFile.TabletSchemaCloudPB.Builder schemaBuilder = 
OlapFile.TabletSchemaCloudPB.newBuilder();
         schemaBuilder.setSchemaVersion(schemaVersion);
 
diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto
index 3054b9245f5..b856d50206b 100644
--- a/gensrc/proto/cloud.proto
+++ b/gensrc/proto/cloud.proto
@@ -475,6 +475,12 @@ message TabletMetaInfoPB { // For update tablet meta
     optional int64 ttl_seconds = 4;
     optional int64 group_commit_interval_ms = 5;
     optional int64 group_commit_data_bytes = 6;
+    optional string compaction_policy = 7;
+    optional int64 time_series_compaction_goal_size_mbytes = 8;
+    optional int64 time_series_compaction_file_count_threshold = 9;
+    optional int64 time_series_compaction_time_threshold_seconds = 10;
+    optional int64 time_series_compaction_empty_rowsets_threshold = 11;
+    optional int64 time_series_compaction_level_threshold = 12;
 }
 
 message TabletCompactionJobPB {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to