This is an automated email from the ASF dual-hosted git repository.
jianliangqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 62a5bc39bff [fix](inverted index) cloud mod supports time series
(#33414)
62a5bc39bff is described below
commit 62a5bc39bffd2325af852858c2325f9f047ebbf7
Author: zzzxl <[email protected]>
AuthorDate: Wed Apr 10 17:56:13 2024 +0800
[fix](inverted index) cloud mod supports time series (#33414)
---
be/src/cloud/cloud_cumulative_compaction.cpp | 17 ++-
.../cloud/cloud_cumulative_compaction_policy.cpp | 150 ++++++++++++++++++-
be/src/cloud/cloud_cumulative_compaction_policy.h | 63 ++++++--
be/src/cloud/cloud_rowset_builder.cpp | 4 +
be/src/cloud/cloud_rowset_writer.cpp | 1 +
be/src/cloud/cloud_storage_engine.cpp | 18 ++-
be/src/cloud/cloud_storage_engine.h | 11 +-
be/src/cloud/cloud_tablet.cpp | 18 +++
be/src/cloud/cloud_tablet_mgr.cpp | 1 +
be/src/olap/compaction.cpp | 5 +
be/src/olap/rowset/rowset_writer_context.h | 2 +
cloud/src/meta-service/meta_service.cpp | 17 +++
.../org/apache/doris/alter/CloudRollupJobV2.java | 7 +-
.../apache/doris/alter/CloudSchemaChangeJobV2.java | 7 +-
.../java/org/apache/doris/catalog/OlapTable.java | 12 +-
.../cloud/alter/CloudSchemaChangeHandler.java | 160 ++++++++++++++++++++-
.../cloud/datasource/CloudInternalCatalog.java | 19 ++-
gensrc/proto/cloud.proto | 6 +
18 files changed, 483 insertions(+), 35 deletions(-)
diff --git a/be/src/cloud/cloud_cumulative_compaction.cpp
b/be/src/cloud/cloud_cumulative_compaction.cpp
index 4e43246eebd..39ce711c9db 100644
--- a/be/src/cloud/cloud_cumulative_compaction.cpp
+++ b/be/src/cloud/cloud_cumulative_compaction.cpp
@@ -200,8 +200,11 @@ Status CloudCumulativeCompaction::execute_compact() {
Status CloudCumulativeCompaction::modify_rowsets() {
// calculate new cumulative point
int64_t input_cumulative_point = cloud_tablet()->cumulative_layer_point();
- int64_t new_cumulative_point =
_engine.cumu_compaction_policy()->new_cumulative_point(
- cloud_tablet(), _output_rowset, _last_delete_version,
input_cumulative_point);
+ auto compaction_policy =
cloud_tablet()->tablet_meta()->compaction_policy();
+ int64_t new_cumulative_point =
+ _engine.cumu_compaction_policy(compaction_policy)
+ ->new_cumulative_point(cloud_tablet(), _output_rowset,
_last_delete_version,
+ input_cumulative_point);
// commit compaction job
cloud::TabletJobInfoPB job;
auto idx = job.mutable_idx();
@@ -352,10 +355,12 @@ Status
CloudCumulativeCompaction::pick_rowsets_to_compact() {
}
size_t compaction_score = 0;
- _engine.cumu_compaction_policy()->pick_input_rowsets(
- cloud_tablet(), candidate_rowsets,
config::cumulative_compaction_max_deltas,
- config::cumulative_compaction_min_deltas, &_input_rowsets,
&_last_delete_version,
- &compaction_score);
+ auto compaction_policy =
cloud_tablet()->tablet_meta()->compaction_policy();
+ _engine.cumu_compaction_policy(compaction_policy)
+ ->pick_input_rowsets(cloud_tablet(), candidate_rowsets,
+ config::cumulative_compaction_max_deltas,
+ config::cumulative_compaction_min_deltas,
&_input_rowsets,
+ &_last_delete_version, &compaction_score);
if (_input_rowsets.empty()) {
return Status::Error<CUMULATIVE_NO_SUITABLE_VERSION>("no suitable
versions");
diff --git a/be/src/cloud/cloud_cumulative_compaction_policy.cpp
b/be/src/cloud/cloud_cumulative_compaction_policy.cpp
index 5875340ec7b..ecf53bd2c68 100644
--- a/be/src/cloud/cloud_cumulative_compaction_policy.cpp
+++ b/be/src/cloud/cloud_cumulative_compaction_policy.cpp
@@ -49,7 +49,7 @@ int64_t
CloudSizeBasedCumulativeCompactionPolicy::_level_size(const int64_t size
return (int64_t)1 << (sizeof(size) * 8 - 1 - __builtin_clzl(size));
}
-int CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets(
+int32_t CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets(
CloudTablet* tablet, const std::vector<RowsetSharedPtr>&
candidate_rowsets,
const int64_t max_compaction_score, const int64_t min_compaction_score,
std::vector<RowsetSharedPtr>* input_rowsets, Version*
last_delete_version,
@@ -213,4 +213,152 @@ int64_t
CloudSizeBasedCumulativeCompactionPolicy::new_cumulative_point(
: last_cumulative_point;
}
+int32_t CloudTimeSeriesCumulativeCompactionPolicy::pick_input_rowsets(
+ CloudTablet* tablet, const std::vector<RowsetSharedPtr>&
candidate_rowsets,
+ const int64_t max_compaction_score, const int64_t min_compaction_score,
+ std::vector<RowsetSharedPtr>* input_rowsets, Version*
last_delete_version,
+ size_t* compaction_score, bool allow_delete) {
+ if (tablet->tablet_state() == TABLET_NOTREADY) {
+ return 0;
+ }
+
+ int64_t compaction_goal_size_mbytes =
+ tablet->tablet_meta()->time_series_compaction_goal_size_mbytes();
+
+ int transient_size = 0;
+ *compaction_score = 0;
+ input_rowsets->clear();
+ int64_t total_size = 0;
+
+ for (const auto& rowset : candidate_rowsets) {
+ // check whether this rowset is delete version
+ if (!allow_delete && rowset->rowset_meta()->has_delete_predicate()) {
+ *last_delete_version = rowset->version();
+ if (!input_rowsets->empty()) {
+ // we meet a delete version, and there were other versions
before.
+ // we should compact those version before handling them over
to base compaction
+ break;
+ } else {
+ // we meet a delete version, and no other versions before,
skip it and continue
+ input_rowsets->clear();
+ *compaction_score = 0;
+ transient_size = 0;
+ total_size = 0;
+ continue;
+ }
+ }
+
+ *compaction_score += rowset->rowset_meta()->get_compaction_score();
+ total_size += rowset->rowset_meta()->total_disk_size();
+
+ transient_size += 1;
+ input_rowsets->push_back(rowset);
+
+ // Condition 1: the size of input files for compaction meets the
requirement of parameter compaction_goal_size
+ if (total_size >= (compaction_goal_size_mbytes * 1024 * 1024)) {
+ if (input_rowsets->size() == 1 &&
+
!input_rowsets->front()->rowset_meta()->is_segments_overlapping()) {
+ // Only 1 non-overlapping rowset, skip it
+ input_rowsets->clear();
+ *compaction_score = 0;
+ total_size = 0;
+ continue;
+ }
+ return transient_size;
+ }
+ }
+
+ // if there is delete version, do compaction directly
+ if (last_delete_version->first != -1) {
+ // if there is only one rowset and not overlapping,
+ // we do not need to do cumulative compaction
+ if (input_rowsets->size() == 1 &&
+ !input_rowsets->front()->rowset_meta()->is_segments_overlapping())
{
+ input_rowsets->clear();
+ *compaction_score = 0;
+ }
+ return transient_size;
+ }
+
+ // Condition 2: the number of input files reaches the threshold specified
by parameter compaction_file_count_threshold
+ if (*compaction_score >=
tablet->tablet_meta()->time_series_compaction_file_count_threshold()) {
+ return transient_size;
+ }
+
+ // Condition 3: level1 achieve compaction_goal_size
+ std::vector<RowsetSharedPtr> level1_rowsets;
+ if (tablet->tablet_meta()->time_series_compaction_level_threshold() >= 2) {
+ int64_t continuous_size = 0;
+ for (const auto& rowset : candidate_rowsets) {
+ const auto& rs_meta = rowset->rowset_meta();
+ if (rs_meta->compaction_level() == 0) {
+ break;
+ }
+ level1_rowsets.push_back(rowset);
+ continuous_size += rs_meta->total_disk_size();
+ if (level1_rowsets.size() >= 2) {
+ if (continuous_size >= compaction_goal_size_mbytes * 1024 *
1024) {
+ input_rowsets->swap(level1_rowsets);
+ return input_rowsets->size();
+ }
+ }
+ }
+ }
+
+ int64_t now = UnixMillis();
+ int64_t last_cumu = tablet->last_cumu_compaction_success_time();
+ if (last_cumu != 0) {
+ int64_t cumu_interval = now - last_cumu;
+
+ // Condition 4: the time interval between compactions exceeds the
value specified by parameter compaction_time_threshold_second
+ if (cumu_interval >
+
(tablet->tablet_meta()->time_series_compaction_time_threshold_seconds() *
1000)) {
+ if
(tablet->tablet_meta()->time_series_compaction_level_threshold() >= 2) {
+ if (input_rowsets->empty() && level1_rowsets.size() >= 2) {
+ input_rowsets->swap(level1_rowsets);
+ return input_rowsets->size();
+ }
+ }
+ return transient_size;
+ }
+ }
+
+ input_rowsets->clear();
+ *compaction_score = 0;
+
+ return 0;
+}
+
+int64_t CloudTimeSeriesCumulativeCompactionPolicy::new_compaction_level(
+ const std::vector<RowsetSharedPtr>& input_rowsets) {
+ int64_t first_level = 0;
+ for (size_t i = 0; i < input_rowsets.size(); i++) {
+ int64_t cur_level =
input_rowsets[i]->rowset_meta()->compaction_level();
+ if (i == 0) {
+ first_level = cur_level;
+ } else {
+ if (first_level != cur_level) {
+ LOG(ERROR) << "Failed to check compaction level, first_level:
" << first_level
+ << ", cur_level: " << cur_level;
+ }
+ }
+ }
+ return first_level + 1;
+}
+
+int64_t CloudTimeSeriesCumulativeCompactionPolicy::new_cumulative_point(
+ CloudTablet* tablet, const RowsetSharedPtr& output_rowset, Version&
last_delete_version,
+ int64_t last_cumulative_point) {
+ if (tablet->tablet_state() != TABLET_RUNNING ||
output_rowset->num_segments() == 0) {
+ return last_cumulative_point;
+ }
+
+ if (tablet->tablet_meta()->time_series_compaction_level_threshold() >= 2 &&
+ output_rowset->rowset_meta()->compaction_level() < 2) {
+ return last_cumulative_point;
+ }
+
+ return output_rowset->end_version() + 1;
+}
+
} // namespace doris
diff --git a/be/src/cloud/cloud_cumulative_compaction_policy.h
b/be/src/cloud/cloud_cumulative_compaction_policy.h
index dffe6e0cd0f..17edc41859e 100644
--- a/be/src/cloud/cloud_cumulative_compaction_policy.h
+++ b/be/src/cloud/cloud_cumulative_compaction_policy.h
@@ -34,7 +34,26 @@ namespace doris {
class Tablet;
struct Version;
-class CloudSizeBasedCumulativeCompactionPolicy {
+class CloudCumulativeCompactionPolicy {
+public:
+ virtual ~CloudCumulativeCompactionPolicy() = default;
+
+ virtual int64_t new_cumulative_point(CloudTablet* tablet, const
RowsetSharedPtr& output_rowset,
+ Version& last_delete_version,
+ int64_t last_cumulative_point) = 0;
+
+ virtual int64_t new_compaction_level(const std::vector<RowsetSharedPtr>&
input_rowsets) = 0;
+
+ virtual int32_t pick_input_rowsets(CloudTablet* tablet,
+ const std::vector<RowsetSharedPtr>&
candidate_rowsets,
+ const int64_t max_compaction_score,
+ const int64_t min_compaction_score,
+ std::vector<RowsetSharedPtr>*
input_rowsets,
+ Version* last_delete_version, size_t*
compaction_score,
+ bool allow_delete = false) = 0;
+};
+
+class CloudSizeBasedCumulativeCompactionPolicy : public
CloudCumulativeCompactionPolicy {
public:
CloudSizeBasedCumulativeCompactionPolicy(
int64_t promotion_size = config::compaction_promotion_size_mbytes
* 1024 * 1024,
@@ -43,17 +62,23 @@ public:
int64_t compaction_min_size = config::compaction_min_size_mbytes *
1024 * 1024,
int64_t promotion_version_count =
config::compaction_promotion_version_count);
- ~CloudSizeBasedCumulativeCompactionPolicy() {}
+ ~CloudSizeBasedCumulativeCompactionPolicy() override = default;
int64_t new_cumulative_point(CloudTablet* tablet, const RowsetSharedPtr&
output_rowset,
- Version& last_delete_version, int64_t
last_cumulative_point);
+ Version& last_delete_version,
+ int64_t last_cumulative_point) override;
+
+ int64_t new_compaction_level(const std::vector<RowsetSharedPtr>&
input_rowsets) override {
+ return 0;
+ }
- int pick_input_rowsets(CloudTablet* tablet,
- const std::vector<RowsetSharedPtr>&
candidate_rowsets,
- const int64_t max_compaction_score, const int64_t
min_compaction_score,
- std::vector<RowsetSharedPtr>* input_rowsets,
- Version* last_delete_version, size_t*
compaction_score,
- bool allow_delete = false);
+ int32_t pick_input_rowsets(CloudTablet* tablet,
+ const std::vector<RowsetSharedPtr>&
candidate_rowsets,
+ const int64_t max_compaction_score,
+ const int64_t min_compaction_score,
+ std::vector<RowsetSharedPtr>* input_rowsets,
+ Version* last_delete_version, size_t*
compaction_score,
+ bool allow_delete = false) override;
private:
int64_t _level_size(const int64_t size);
@@ -73,4 +98,24 @@ private:
int64_t _promotion_version_count;
};
+class CloudTimeSeriesCumulativeCompactionPolicy : public
CloudCumulativeCompactionPolicy {
+public:
+ CloudTimeSeriesCumulativeCompactionPolicy() = default;
+ ~CloudTimeSeriesCumulativeCompactionPolicy() override = default;
+
+ int64_t new_cumulative_point(CloudTablet* tablet, const RowsetSharedPtr&
output_rowset,
+ Version& last_delete_version,
+ int64_t last_cumulative_point) override;
+
+ int64_t new_compaction_level(const std::vector<RowsetSharedPtr>&
input_rowsets) override;
+
+ int32_t pick_input_rowsets(CloudTablet* tablet,
+ const std::vector<RowsetSharedPtr>&
candidate_rowsets,
+ const int64_t max_compaction_score,
+ const int64_t min_compaction_score,
+ std::vector<RowsetSharedPtr>* input_rowsets,
+ Version* last_delete_version, size_t*
compaction_score,
+ bool allow_delete = false) override;
+};
+
} // namespace doris
diff --git a/be/src/cloud/cloud_rowset_builder.cpp
b/be/src/cloud/cloud_rowset_builder.cpp
index b1cbefba544..3585878cd72 100644
--- a/be/src/cloud/cloud_rowset_builder.cpp
+++ b/be/src/cloud/cloud_rowset_builder.cpp
@@ -46,6 +46,10 @@ Status CloudRowsetBuilder::init() {
}
RETURN_IF_ERROR(check_tablet_version_count());
+ using namespace std::chrono;
+ std::static_pointer_cast<CloudTablet>(_tablet)->last_load_time_ms =
+
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
+
// build tablet schema in request level
_build_current_tablet_schema(_req.index_id, _req.table_schema_param.get(),
*_tablet->tablet_schema());
diff --git a/be/src/cloud/cloud_rowset_writer.cpp
b/be/src/cloud/cloud_rowset_writer.cpp
index 0111b511750..26c3fe714d3 100644
--- a/be/src/cloud/cloud_rowset_writer.cpp
+++ b/be/src/cloud/cloud_rowset_writer.cpp
@@ -46,6 +46,7 @@ Status CloudRowsetWriter::init(const RowsetWriterContext&
rowset_writer_context)
_rowset_meta->set_segments_overlap(_context.segments_overlap);
_rowset_meta->set_txn_id(_context.txn_id);
_rowset_meta->set_txn_expiration(_context.txn_expiration);
+ _rowset_meta->set_compaction_level(_context.compaction_level);
if (_context.rowset_state == PREPARED || _context.rowset_state ==
COMMITTED) {
_is_pending = true;
_rowset_meta->set_load_id(_context.load_id);
diff --git a/be/src/cloud/cloud_storage_engine.cpp
b/be/src/cloud/cloud_storage_engine.cpp
index 24ceeef4277..96e336b3ef3 100644
--- a/be/src/cloud/cloud_storage_engine.cpp
+++ b/be/src/cloud/cloud_storage_engine.cpp
@@ -39,6 +39,7 @@
#include "io/fs/s3_file_system.h"
#include "io/hdfs_util.h"
#include "olap/cumulative_compaction_policy.h"
+#include "olap/cumulative_compaction_time_series_policy.h"
#include "olap/memtable_flush_executor.h"
#include "olap/storage_policy.h"
#include "runtime/memory/cache_manager.h"
@@ -68,9 +69,12 @@ int get_base_thread_num() {
CloudStorageEngine::CloudStorageEngine(const UniqueId& backend_uid)
: BaseStorageEngine(Type::CLOUD, backend_uid),
_meta_mgr(std::make_unique<cloud::CloudMetaMgr>()),
- _tablet_mgr(std::make_unique<CloudTabletMgr>(*this)),
- _cumulative_compaction_policy(
-
std::make_shared<CloudSizeBasedCumulativeCompactionPolicy>()) {}
+ _tablet_mgr(std::make_unique<CloudTabletMgr>(*this)) {
+ _cumulative_compaction_policies[CUMULATIVE_SIZE_BASED_POLICY] =
+ std::make_shared<CloudSizeBasedCumulativeCompactionPolicy>();
+ _cumulative_compaction_policies[CUMULATIVE_TIME_SERIES_POLICY] =
+ std::make_shared<CloudTimeSeriesCumulativeCompactionPolicy>();
+}
CloudStorageEngine::~CloudStorageEngine() {
stop();
@@ -766,4 +770,12 @@ Status
CloudStorageEngine::get_compaction_status_json(std::string* result) {
return Status::OK();
}
+std::shared_ptr<CloudCumulativeCompactionPolicy>
CloudStorageEngine::cumu_compaction_policy(
+ std::string_view compaction_policy) {
+ if (!_cumulative_compaction_policies.contains(compaction_policy)) {
+ return
_cumulative_compaction_policies.at(CUMULATIVE_SIZE_BASED_POLICY);
+ }
+ return _cumulative_compaction_policies.at(compaction_policy);
+}
+
} // namespace doris
diff --git a/be/src/cloud/cloud_storage_engine.h
b/be/src/cloud/cloud_storage_engine.h
index b8c4da9cf64..297050360df 100644
--- a/be/src/cloud/cloud_storage_engine.h
+++ b/be/src/cloud/cloud_storage_engine.h
@@ -87,10 +87,6 @@ public:
void get_cumu_compaction(int64_t tablet_id,
std::vector<std::shared_ptr<CloudCumulativeCompaction>>& res);
- CloudSizeBasedCumulativeCompactionPolicy* cumu_compaction_policy() const {
- return _cumulative_compaction_policy.get();
- }
-
Status submit_compaction_task(const CloudTabletSPtr& tablet,
CompactionType compaction_type);
Status get_compaction_status_json(std::string* result);
@@ -110,6 +106,9 @@ public:
return _submitted_full_compactions.count(tablet_id);
}
+ std::shared_ptr<CloudCumulativeCompactionPolicy> cumu_compaction_policy(
+ std::string_view compaction_policy);
+
private:
void _refresh_storage_vault_info_thread_callback();
void _vacuum_stale_rowsets_thread_callback();
@@ -151,7 +150,9 @@ private:
std::unique_ptr<ThreadPool> _base_compaction_thread_pool;
std::unique_ptr<ThreadPool> _cumu_compaction_thread_pool;
- std::shared_ptr<CloudSizeBasedCumulativeCompactionPolicy>
_cumulative_compaction_policy;
+ using CumuPolices =
+ std::unordered_map<std::string_view,
std::shared_ptr<CloudCumulativeCompactionPolicy>>;
+ CumuPolices _cumulative_compaction_policies;
};
} // namespace doris
diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp
index f4941deff0e..817df0e0e17 100644
--- a/be/src/cloud/cloud_tablet.cpp
+++ b/be/src/cloud/cloud_tablet.cpp
@@ -32,6 +32,7 @@
#include "cloud/cloud_storage_engine.h"
#include "cloud/cloud_tablet_mgr.h"
#include "io/cache/block_file_cache_factory.h"
+#include "olap/cumulative_compaction_time_series_policy.h"
#include "olap/olap_define.h"
#include "olap/rowset/beta_rowset.h"
#include "olap/rowset/rowset.h"
@@ -409,6 +410,23 @@ Result<std::unique_ptr<RowsetWriter>>
CloudTablet::create_transient_rowset_write
}
int64_t CloudTablet::get_cloud_base_compaction_score() const {
+ if (_tablet_meta->compaction_policy() == CUMULATIVE_TIME_SERIES_POLICY) {
+ bool has_delete = false;
+ int64_t point = cumulative_layer_point();
+ for (const auto& rs_meta : _tablet_meta->all_rs_metas()) {
+ if (rs_meta->start_version() >= point) {
+ continue;
+ }
+ if (rs_meta->has_delete_predicate()) {
+ has_delete = true;
+ break;
+ }
+ }
+ if (!has_delete) {
+ return 0;
+ }
+ }
+
return _approximate_num_rowsets.load(std::memory_order_relaxed) -
_approximate_cumu_num_rowsets.load(std::memory_order_relaxed);
}
diff --git a/be/src/cloud/cloud_tablet_mgr.cpp
b/be/src/cloud/cloud_tablet_mgr.cpp
index 5c4c9af2e7b..06bea6db126 100644
--- a/be/src/cloud/cloud_tablet_mgr.cpp
+++ b/be/src/cloud/cloud_tablet_mgr.cpp
@@ -327,6 +327,7 @@ Status CloudTabletMgr::get_topn_tablets_to_compact(
if (t == nullptr) { continue; }
int64_t s = score(t.get());
+ if (s <= 0) { continue; }
if (s > *max_score) {
max_score_tablet_id = t->tablet_id();
*max_score = s;
diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index 9f699fc3f89..81b41d76ee6 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -1109,6 +1109,11 @@ Status
CloudCompactionMixin::construct_output_rowset_writer(RowsetWriterContext&
ctx.tablet_schema = _cur_tablet_schema;
ctx.newest_write_timestamp = _newest_write_timestamp;
ctx.write_type = DataWriteType::TYPE_COMPACTION;
+
+ auto compaction_policy = _tablet->tablet_meta()->compaction_policy();
+ ctx.compaction_level =
+
_engine.cumu_compaction_policy(compaction_policy)->new_compaction_level(_input_rowsets);
+
_output_rs_writer = DORIS_TRY(_tablet->create_rowset_writer(ctx,
_is_vertical));
RETURN_IF_ERROR(_engine.meta_mgr().prepare_rowset(*_output_rs_writer->rowset_meta().get()));
return Status::OK();
diff --git a/be/src/olap/rowset/rowset_writer_context.h
b/be/src/olap/rowset/rowset_writer_context.h
index e578a9a09ad..366af8cba4e 100644
--- a/be/src/olap/rowset/rowset_writer_context.h
+++ b/be/src/olap/rowset/rowset_writer_context.h
@@ -102,6 +102,8 @@ struct RowsetWriterContext {
// In semi-structure senario tablet_schema will be updated concurrently,
// this lock need to be held when update.Use shared_ptr to avoid delete
copy contructor
std::shared_ptr<std::mutex> schema_lock;
+
+ int64_t compaction_level = 0;
};
} // namespace doris
diff --git a/cloud/src/meta-service/meta_service.cpp
b/cloud/src/meta-service/meta_service.cpp
index e2a19bce339..e27b6b5b944 100644
--- a/cloud/src/meta-service/meta_service.cpp
+++ b/cloud/src/meta-service/meta_service.cpp
@@ -697,6 +697,23 @@ void
MetaServiceImpl::update_tablet(::google::protobuf::RpcController* controlle
tablet_meta.set_group_commit_interval_ms(tablet_meta_info.group_commit_interval_ms());
} else if (tablet_meta_info.has_group_commit_data_bytes()) {
tablet_meta.set_group_commit_data_bytes(tablet_meta_info.group_commit_data_bytes());
+ } else if (tablet_meta_info.has_compaction_policy()) {
+
tablet_meta.set_compaction_policy(tablet_meta_info.compaction_policy());
+ } else if
(tablet_meta_info.has_time_series_compaction_goal_size_mbytes()) {
+ tablet_meta.set_time_series_compaction_goal_size_mbytes(
+
tablet_meta_info.time_series_compaction_goal_size_mbytes());
+ } else if
(tablet_meta_info.has_time_series_compaction_file_count_threshold()) {
+ tablet_meta.set_time_series_compaction_file_count_threshold(
+
tablet_meta_info.time_series_compaction_file_count_threshold());
+ } else if
(tablet_meta_info.has_time_series_compaction_time_threshold_seconds()) {
+ tablet_meta.set_time_series_compaction_time_threshold_seconds(
+
tablet_meta_info.time_series_compaction_time_threshold_seconds());
+ } else if
(tablet_meta_info.has_time_series_compaction_empty_rowsets_threshold()) {
+ tablet_meta.set_time_series_compaction_empty_rowsets_threshold(
+
tablet_meta_info.time_series_compaction_empty_rowsets_threshold());
+ } else if
(tablet_meta_info.has_time_series_compaction_level_threshold()) {
+ tablet_meta.set_time_series_compaction_level_threshold(
+ tablet_meta_info.time_series_compaction_level_threshold());
}
int64_t table_id = tablet_meta.table_id();
int64_t index_id = tablet_meta.index_id();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/alter/CloudRollupJobV2.java
b/fe/fe-core/src/main/java/org/apache/doris/alter/CloudRollupJobV2.java
index 2a7a2bc2f55..4bc34582fff 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/CloudRollupJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/CloudRollupJobV2.java
@@ -189,7 +189,12 @@ public class CloudRollupJobV2 extends RollupJobV2 {
tbl.isInMemory(), true,
tbl.getName(), tbl.getTTLSeconds(),
tbl.getEnableUniqueKeyMergeOnWrite(),
tbl.storeRowColumn(),
- tbl.getBaseSchemaVersion());
+ tbl.getBaseSchemaVersion(),
tbl.getCompactionPolicy(),
+ tbl.getTimeSeriesCompactionGoalSizeMbytes(),
+ tbl.getTimeSeriesCompactionFileCountThreshold(),
+ tbl.getTimeSeriesCompactionTimeThresholdSeconds(),
+ tbl.getTimeSeriesCompactionEmptyRowsetsThreshold(),
+ tbl.getTimeSeriesCompactionLevelThreshold());
requestBuilder.addTabletMetas(builder);
} // end for rollupTablets
((CloudInternalCatalog) Env.getCurrentInternalCatalog())
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/alter/CloudSchemaChangeJobV2.java
b/fe/fe-core/src/main/java/org/apache/doris/alter/CloudSchemaChangeJobV2.java
index 4f474d188d4..cdfcf0a5e20 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/alter/CloudSchemaChangeJobV2.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/alter/CloudSchemaChangeJobV2.java
@@ -201,7 +201,12 @@ public class CloudSchemaChangeJobV2 extends
SchemaChangeJobV2 {
tbl.getStoragePolicy(), tbl.isInMemory(), true,
tbl.getName(), tbl.getTTLSeconds(),
tbl.getEnableUniqueKeyMergeOnWrite(),
tbl.storeRowColumn(),
- shadowSchemaVersion);
+ shadowSchemaVersion, tbl.getCompactionPolicy(),
+ tbl.getTimeSeriesCompactionGoalSizeMbytes(),
+
tbl.getTimeSeriesCompactionFileCountThreshold(),
+
tbl.getTimeSeriesCompactionTimeThresholdSeconds(),
+
tbl.getTimeSeriesCompactionEmptyRowsetsThreshold(),
+ tbl.getTimeSeriesCompactionLevelThreshold());
requestBuilder.addTabletMetas(builder);
} // end for rollupTablets
((CloudInternalCatalog) Env.getCurrentInternalCatalog())
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
index d06212e6959..e50ba4a0ca4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -2188,7 +2188,7 @@ public class OlapTable extends Table implements
MTMVRelatedTableIf {
if (tableProperty != null) {
return tableProperty.compactionPolicy();
}
- return "";
+ return PropertyAnalyzer.SIZE_BASED_COMPACTION_POLICY;
}
public void setTimeSeriesCompactionGoalSizeMbytes(long
timeSeriesCompactionGoalSizeMbytes) {
@@ -2202,7 +2202,7 @@ public class OlapTable extends Table implements
MTMVRelatedTableIf {
if (tableProperty != null) {
return tableProperty.timeSeriesCompactionGoalSizeMbytes();
}
- return null;
+ return
PropertyAnalyzer.TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES_DEFAULT_VALUE;
}
public void setTimeSeriesCompactionFileCountThreshold(long
timeSeriesCompactionFileCountThreshold) {
@@ -2216,7 +2216,7 @@ public class OlapTable extends Table implements
MTMVRelatedTableIf {
if (tableProperty != null) {
return tableProperty.timeSeriesCompactionFileCountThreshold();
}
- return null;
+ return
PropertyAnalyzer.TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD_DEFAULT_VALUE;
}
public void setTimeSeriesCompactionTimeThresholdSeconds(long
timeSeriesCompactionTimeThresholdSeconds) {
@@ -2231,7 +2231,7 @@ public class OlapTable extends Table implements
MTMVRelatedTableIf {
if (tableProperty != null) {
return tableProperty.timeSeriesCompactionTimeThresholdSeconds();
}
- return null;
+ return
PropertyAnalyzer.TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS_DEFAULT_VALUE;
}
public void setTimeSeriesCompactionEmptyRowsetsThreshold(long
timeSeriesCompactionEmptyRowsetsThreshold) {
@@ -2245,7 +2245,7 @@ public class OlapTable extends Table implements
MTMVRelatedTableIf {
if (tableProperty != null) {
return tableProperty.timeSeriesCompactionEmptyRowsetsThreshold();
}
- return null;
+ return
PropertyAnalyzer.TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD_DEFAULT_VALUE;
}
public void setTimeSeriesCompactionLevelThreshold(long
timeSeriesCompactionLevelThreshold) {
@@ -2259,7 +2259,7 @@ public class OlapTable extends Table implements
MTMVRelatedTableIf {
if (tableProperty != null) {
return tableProperty.timeSeriesCompactionLevelThreshold();
}
- return null;
+ return
PropertyAnalyzer.TIME_SERIES_COMPACTION_LEVEL_THRESHOLD_DEFAULT_VALUE;
}
public int getBaseSchemaVersion() {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/alter/CloudSchemaChangeHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/alter/CloudSchemaChangeHandler.java
index 573b394e79f..48602a0a8a4 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/alter/CloudSchemaChangeHandler.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/alter/CloudSchemaChangeHandler.java
@@ -92,7 +92,13 @@ public class CloudSchemaChangeHandler extends
SchemaChangeHandler {
throws UserException {
Preconditions.checkState(properties.containsKey(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_INTERVAL_MS)
||
properties.containsKey(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_DATA_BYTES)
- ||
properties.containsKey(PropertyAnalyzer.PROPERTIES_FILE_CACHE_TTL_SECONDS));
+ ||
properties.containsKey(PropertyAnalyzer.PROPERTIES_FILE_CACHE_TTL_SECONDS)
+ ||
properties.containsKey(PropertyAnalyzer.PROPERTIES_COMPACTION_POLICY)
+ ||
properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES)
+ ||
properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD)
+ ||
properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS)
+ ||
properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD)
+ ||
properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD));
if (properties.size() != 1) {
throw new UserException("Can only set one table property at a
time");
@@ -149,6 +155,123 @@ public class CloudSchemaChangeHandler extends
SchemaChangeHandler {
}
param.groupCommitDataBytes = groupCommitDataBytes;
param.type =
UpdatePartitionMetaParam.TabletMetaType.GROUP_COMMIT_DATA_BYTES;
+ } else if
(properties.containsKey(PropertyAnalyzer.PROPERTIES_COMPACTION_POLICY)) {
+ String compactionPolicy =
properties.get(PropertyAnalyzer.PROPERTIES_COMPACTION_POLICY);
+ if (compactionPolicy != null
+ &&
!compactionPolicy.equals(PropertyAnalyzer.TIME_SERIES_COMPACTION_POLICY)
+ &&
!compactionPolicy.equals(PropertyAnalyzer.SIZE_BASED_COMPACTION_POLICY)) {
+ throw new UserException("Table compaction policy only support
for "
+ + PropertyAnalyzer.TIME_SERIES_COMPACTION_POLICY
+ + " or " +
PropertyAnalyzer.SIZE_BASED_COMPACTION_POLICY);
+ }
+ olapTable.readLock();
+ try {
+ if (compactionPolicy == olapTable.getCompactionPolicy()) {
+ LOG.info("compactionPolicy:{} is equal with
olapTable.getCompactionPolicy():{}",
+ compactionPolicy, olapTable.getCompactionPolicy());
+ return;
+ }
+ partitions.addAll(olapTable.getPartitions());
+ } finally {
+ olapTable.readUnlock();
+ }
+ param.compactionPolicy = compactionPolicy;
+ param.type =
UpdatePartitionMetaParam.TabletMetaType.COMPACTION_POLICY;
+ } else if
(properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES))
{
+ long timeSeriesCompactionGoalSizeMbytes =
Long.parseLong(properties.get(PropertyAnalyzer
+ .PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES));
+ olapTable.readLock();
+ try {
+ if (timeSeriesCompactionGoalSizeMbytes
+ == olapTable.getTimeSeriesCompactionGoalSizeMbytes()) {
+ LOG.info("timeSeriesCompactionGoalSizeMbytes:{} is equal
with"
+ + "
olapTable.timeSeriesCompactionGoalSizeMbytes():{}",
+ timeSeriesCompactionGoalSizeMbytes,
+ olapTable.getTimeSeriesCompactionGoalSizeMbytes());
+ return;
+ }
+ partitions.addAll(olapTable.getPartitions());
+ } finally {
+ olapTable.readUnlock();
+ }
+ param.timeSeriesCompactionGoalSizeMbytes =
timeSeriesCompactionGoalSizeMbytes;
+ param.type =
UpdatePartitionMetaParam.TabletMetaType.TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES;
+ } else if
(properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD))
{
+ long timeSeriesCompactionFileCountThreshold =
Long.parseLong(properties.get(PropertyAnalyzer
+ .PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD));
+ olapTable.readLock();
+ try {
+ if (timeSeriesCompactionFileCountThreshold
+ ==
olapTable.getTimeSeriesCompactionFileCountThreshold()) {
+ LOG.info("timeSeriesCompactionFileCountThreshold:{} is
equal with"
+ + "
olapTable.getTimeSeriesCompactionFileCountThreshold():{}",
+ timeSeriesCompactionFileCountThreshold,
+
olapTable.getTimeSeriesCompactionFileCountThreshold());
+ return;
+ }
+ partitions.addAll(olapTable.getPartitions());
+ } finally {
+ olapTable.readUnlock();
+ }
+ param.timeSeriesCompactionFileCountThreshold =
timeSeriesCompactionFileCountThreshold;
+ param.type =
UpdatePartitionMetaParam.TabletMetaType.TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD;
+ } else if
(properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS))
{
+ long timeSeriesCompactionTimeThresholdSeconds =
Long.parseLong(properties.get(PropertyAnalyzer
+
.PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS));
+ olapTable.readLock();
+ try {
+ if (timeSeriesCompactionTimeThresholdSeconds
+ ==
olapTable.getTimeSeriesCompactionTimeThresholdSeconds()) {
+ LOG.info("timeSeriesCompactionTimeThresholdSeconds:{} is
equal with"
+ + "
olapTable.getTimeSeriesCompactionTimeThresholdSeconds():{}",
+ timeSeriesCompactionTimeThresholdSeconds,
+
olapTable.getTimeSeriesCompactionTimeThresholdSeconds());
+ return;
+ }
+ partitions.addAll(olapTable.getPartitions());
+ } finally {
+ olapTable.readUnlock();
+ }
+ param.timeSeriesCompactionTimeThresholdSeconds =
timeSeriesCompactionTimeThresholdSeconds;
+ param.type =
UpdatePartitionMetaParam.TabletMetaType.TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS;
+ } else if
(properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD))
{
+ long timeSeriesCompactionEmptyRowsetsThreshold =
Long.parseLong(properties.get(PropertyAnalyzer
+
.PROPERTIES_TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD));
+ olapTable.readLock();
+ try {
+ if (timeSeriesCompactionEmptyRowsetsThreshold
+ ==
olapTable.getTimeSeriesCompactionEmptyRowsetsThreshold()) {
+ LOG.info("timeSeriesCompactionEmptyRowsetsThreshold:{} is
equal with"
+ + "
olapTable.getTimeSeriesCompactionEmptyRowsetsThreshold():{}",
+ timeSeriesCompactionEmptyRowsetsThreshold,
+
olapTable.getTimeSeriesCompactionEmptyRowsetsThreshold());
+ return;
+ }
+ partitions.addAll(olapTable.getPartitions());
+ } finally {
+ olapTable.readUnlock();
+ }
+ param.timeSeriesCompactionEmptyRowsetsThreshold =
timeSeriesCompactionEmptyRowsetsThreshold;
+ param.type =
UpdatePartitionMetaParam.TabletMetaType.TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD;
+ } else if
(properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD))
{
+ long timeSeriesCompactionLevelThreshold =
Long.parseLong(properties.get(PropertyAnalyzer
+ .PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD));
+ olapTable.readLock();
+ try {
+ if (timeSeriesCompactionLevelThreshold
+ == olapTable.getTimeSeriesCompactionLevelThreshold()) {
+ LOG.info("timeSeriesCompactionLevelThreshold:{} is equal
with"
+ + "
olapTable.getTimeSeriesCompactionLevelThreshold():{}",
+ timeSeriesCompactionLevelThreshold,
+ olapTable.getTimeSeriesCompactionLevelThreshold());
+ return;
+ }
+ partitions.addAll(olapTable.getPartitions());
+ } finally {
+ olapTable.readUnlock();
+ }
+ param.timeSeriesCompactionLevelThreshold =
timeSeriesCompactionLevelThreshold;
+ param.type =
UpdatePartitionMetaParam.TabletMetaType.TIME_SERIES_COMPACTION_LEVEL_THRESHOLD;
} else {
LOG.warn("invalid properties:{}", properties);
throw new UserException("invalid properties");
@@ -173,6 +296,12 @@ public class CloudSchemaChangeHandler extends
SchemaChangeHandler {
TTL_SECONDS,
GROUP_COMMIT_INTERVAL_MS,
GROUP_COMMIT_DATA_BYTES,
+ COMPACTION_POLICY,
+ TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES,
+ TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD,
+ TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS,
+ TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD,
+ TIME_SERIES_COMPACTION_LEVEL_THRESHOLD,
}
TabletMetaType type;
@@ -181,6 +310,12 @@ public class CloudSchemaChangeHandler extends
SchemaChangeHandler {
long ttlSeconds = 0;
long groupCommitIntervalMs = 0;
long groupCommitDataBytes = 0;
+ String compactionPolicy;
+ long timeSeriesCompactionGoalSizeMbytes = 0;
+ long timeSeriesCompactionFileCountThreshold = 0;
+ long timeSeriesCompactionTimeThresholdSeconds = 0;
+ long timeSeriesCompactionEmptyRowsetsThreshold = 0;
+ long timeSeriesCompactionLevelThreshold = 0;
}
public void updateCloudPartitionMeta(Database db,
@@ -228,6 +363,29 @@ public class CloudSchemaChangeHandler extends
SchemaChangeHandler {
case GROUP_COMMIT_DATA_BYTES:
infoBuilder.setGroupCommitDataBytes(param.groupCommitDataBytes);
break;
+ case COMPACTION_POLICY:
+
infoBuilder.setCompactionPolicy(param.compactionPolicy);
+ break;
+ case TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES:
+ infoBuilder.setTimeSeriesCompactionGoalSizeMbytes(
+ param.timeSeriesCompactionGoalSizeMbytes);
+ break;
+ case TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD:
+ infoBuilder.setTimeSeriesCompactionFileCountThreshold(
+ param.timeSeriesCompactionFileCountThreshold);
+ break;
+ case TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS:
+
infoBuilder.setTimeSeriesCompactionTimeThresholdSeconds(
+
param.timeSeriesCompactionTimeThresholdSeconds);
+ break;
+ case TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD:
+
infoBuilder.setTimeSeriesCompactionEmptyRowsetsThreshold(
+
param.timeSeriesCompactionEmptyRowsetsThreshold);
+ break;
+ case TIME_SERIES_COMPACTION_LEVEL_THRESHOLD:
+ infoBuilder.setTimeSeriesCompactionLevelThreshold(
+ param.timeSeriesCompactionLevelThreshold);
+ break;
default:
throw new UserException("Unknown TabletMetaType");
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
index 1eaf415b152..42ca4bfe99c 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
@@ -155,7 +155,12 @@ public class CloudInternalCatalog extends InternalCatalog {
partitionId, tablet, tabletType, schemaHash, keysType,
shortKeyColumnCount,
bfColumns, tbl.getBfFpp(), indexes, columns,
tbl.getDataSortInfo(),
tbl.getCompressionType(), storagePolicy, isInMemory,
false, tbl.getName(), tbl.getTTLSeconds(),
- tbl.getEnableUniqueKeyMergeOnWrite(),
tbl.storeRowColumn(), indexMeta.getSchemaVersion());
+ tbl.getEnableUniqueKeyMergeOnWrite(),
tbl.storeRowColumn(), indexMeta.getSchemaVersion(),
+ tbl.getCompactionPolicy(),
tbl.getTimeSeriesCompactionGoalSizeMbytes(),
+ tbl.getTimeSeriesCompactionFileCountThreshold(),
+ tbl.getTimeSeriesCompactionTimeThresholdSeconds(),
+ tbl.getTimeSeriesCompactionEmptyRowsetsThreshold(),
+ tbl.getTimeSeriesCompactionLevelThreshold());
requestBuilder.addTabletMetas(builder);
}
if (!storageVaultIdSet && ((CloudEnv)
Env.getCurrentEnv()).getEnableStorageVault()) {
@@ -198,7 +203,10 @@ public class CloudInternalCatalog extends InternalCatalog {
List<Column> schemaColumns, DataSortInfo dataSortInfo,
TCompressionType compressionType,
String storagePolicy, boolean isInMemory, boolean isShadow,
String tableName, long ttlSeconds, boolean
enableUniqueKeyMergeOnWrite,
- boolean storeRowColumn, int schemaVersion) throws DdlException {
+ boolean storeRowColumn, int schemaVersion, String compactionPolicy,
+ Long timeSeriesCompactionGoalSizeMbytes, Long
timeSeriesCompactionFileCountThreshold,
+ Long timeSeriesCompactionTimeThresholdSeconds, Long
timeSeriesCompactionEmptyRowsetsThreshold,
+ Long timeSeriesCompactionLevelThreshold) throws DdlException {
OlapFile.TabletMetaCloudPB.Builder builder =
OlapFile.TabletMetaCloudPB.newBuilder();
builder.setTableId(tableId);
builder.setIndexId(indexId);
@@ -227,6 +235,13 @@ public class CloudInternalCatalog extends InternalCatalog {
builder.setReplicaId(tablet.getReplicas().get(0).getId());
builder.setEnableUniqueKeyMergeOnWrite(enableUniqueKeyMergeOnWrite);
+ builder.setCompactionPolicy(compactionPolicy);
+
builder.setTimeSeriesCompactionGoalSizeMbytes(timeSeriesCompactionGoalSizeMbytes);
+
builder.setTimeSeriesCompactionFileCountThreshold(timeSeriesCompactionFileCountThreshold);
+
builder.setTimeSeriesCompactionTimeThresholdSeconds(timeSeriesCompactionTimeThresholdSeconds);
+
builder.setTimeSeriesCompactionEmptyRowsetsThreshold(timeSeriesCompactionEmptyRowsetsThreshold);
+
builder.setTimeSeriesCompactionLevelThreshold(timeSeriesCompactionLevelThreshold);
+
OlapFile.TabletSchemaCloudPB.Builder schemaBuilder =
OlapFile.TabletSchemaCloudPB.newBuilder();
schemaBuilder.setSchemaVersion(schemaVersion);
diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto
index 3054b9245f5..b856d50206b 100644
--- a/gensrc/proto/cloud.proto
+++ b/gensrc/proto/cloud.proto
@@ -475,6 +475,12 @@ message TabletMetaInfoPB { // For update tablet meta
optional int64 ttl_seconds = 4;
optional int64 group_commit_interval_ms = 5;
optional int64 group_commit_data_bytes = 6;
+ optional string compaction_policy = 7;
+ optional int64 time_series_compaction_goal_size_mbytes = 8;
+ optional int64 time_series_compaction_file_count_threshold = 9;
+ optional int64 time_series_compaction_time_threshold_seconds = 10;
+ optional int64 time_series_compaction_empty_rowsets_threshold = 11;
+ optional int64 time_series_compaction_level_threshold = 12;
}
message TabletCompactionJobPB {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]