This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new dc713f993ed [Enhancement](full compaction) Add run status support for
full compaction (#34043)
dc713f993ed is described below
commit dc713f993eddc117c7ca8eff0c7cc5da973a3314
Author: abmdocrt <[email protected]>
AuthorDate: Sat Apr 27 13:51:28 2024 +0800
[Enhancement](full compaction) Add run status support for full compaction
(#34043)
* The usage is `curl
http://{ip}:{host}/api/compaction/run_status?tablet_id={tablet_id}`
e.g. `curl http://127.0.0.1:8040/api/compaction/run_status?tablet_id=10084`
If full compaction is running, the output will be
```
{
"status" : "Success",
"run_status" : true,
"msg" : "compaction task for this tablet is running",
"tablet_id" : 10084,
"compact_type" : "full"
}
```
else the ouput will be
```
{
"status" : "Success",
"run_status" : false,
"msg" : "compaction task for this tablet is not running",
"tablet_id" : 10084,
"compact_type" : "full"
}
```
* 2
---
be/src/http/action/compaction_action.cpp | 14 +++
be/src/olap/full_compaction.cpp | 6 +-
be/src/olap/olap_server.cpp | 12 ++-
be/src/olap/storage_engine.cpp | 24 +++++
be/src/olap/storage_engine.h | 1 +
be/src/olap/tablet.cpp | 2 +-
be/src/olap/tablet.h | 6 ++
.../test_full_compaciton_run_status.groovy | 100 +++++++++++++++++++++
8 files changed, 161 insertions(+), 4 deletions(-)
diff --git a/be/src/http/action/compaction_action.cpp
b/be/src/http/action/compaction_action.cpp
index f61edf92fc1..43ad940db5e 100644
--- a/be/src/http/action/compaction_action.cpp
+++ b/be/src/http/action/compaction_action.cpp
@@ -193,6 +193,20 @@ Status
CompactionAction::_handle_run_status_compaction(HttpRequest* req, std::st
std::string compaction_type;
bool run_status = false;
+ {
+ // Full compaction holds both base compaction lock and cumu
compaction lock.
+ // So we can not judge if full compaction is running by check
these two locks holding.
+ // Here, we use a variable 'is_full_compaction_running' to check
if full compaction is running.
+ if (tablet->is_full_compaction_running()) {
+ msg = "compaction task for this tablet is running";
+ compaction_type = "full";
+ run_status = true;
+ *json_result = strings::Substitute(json_template, run_status,
msg, tablet_id,
+ compaction_type);
+ return Status::OK();
+ }
+ }
+
{
// use try lock to check this tablet is running cumulative
compaction
std::unique_lock<std::mutex>
lock_cumulative(tablet->get_cumulative_compaction_lock(),
diff --git a/be/src/olap/full_compaction.cpp b/be/src/olap/full_compaction.cpp
index 0d6660ca543..8a2712c38b5 100644
--- a/be/src/olap/full_compaction.cpp
+++ b/be/src/olap/full_compaction.cpp
@@ -46,7 +46,9 @@ FullCompaction::FullCompaction(StorageEngine& engine, const
TabletSharedPtr& tab
: CompactionMixin(engine, tablet, "FullCompaction:" +
std::to_string(tablet->tablet_id())) {
}
-FullCompaction::~FullCompaction() = default;
+FullCompaction::~FullCompaction() {
+ tablet()->set_is_full_compaction_running(false);
+}
Status FullCompaction::prepare_compact() {
if (!tablet()->init_succeeded()) {
@@ -55,6 +57,7 @@ Status FullCompaction::prepare_compact() {
std::unique_lock base_lock(tablet()->get_base_compaction_lock());
std::unique_lock cumu_lock(tablet()->get_cumulative_compaction_lock());
+ tablet()->set_is_full_compaction_running(true);
// 1. pick rowsets to compact
RETURN_IF_ERROR(pick_rowsets_to_compact());
@@ -112,6 +115,7 @@ Status FullCompaction::modify_rowsets() {
std::lock_guard<std::mutex>
rowset_update_wlock(tablet()->get_rowset_update_lock());
std::lock_guard<std::shared_mutex>
meta_wlock(_tablet->get_header_lock());
RETURN_IF_ERROR(tablet()->modify_rowsets(output_rowsets,
_input_rowsets, true));
+ DBUG_EXECUTE_IF("FullCompaction.modify_rowsets.sleep", { sleep(5); })
tablet()->save_meta();
}
return Status::OK();
diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index 73bd0e37f81..17ad3979fae 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -918,11 +918,16 @@ bool
StorageEngine::_push_tablet_into_submitted_compaction(TabletSharedPtr table
.insert(tablet->tablet_id())
.second);
break;
- default:
+ case CompactionType::BASE_COMPACTION:
already_existed =
!(_tablet_submitted_base_compaction[tablet->data_dir()]
.insert(tablet->tablet_id())
.second);
break;
+ case CompactionType::FULL_COMPACTION:
+ already_existed =
!(_tablet_submitted_full_compaction[tablet->data_dir()]
+ .insert(tablet->tablet_id())
+ .second);
+ break;
}
return already_existed;
}
@@ -935,9 +940,12 @@ void
StorageEngine::_pop_tablet_from_submitted_compaction(TabletSharedPtr tablet
case CompactionType::CUMULATIVE_COMPACTION:
removed =
_tablet_submitted_cumu_compaction[tablet->data_dir()].erase(tablet->tablet_id());
break;
- default:
+ case CompactionType::BASE_COMPACTION:
removed =
_tablet_submitted_base_compaction[tablet->data_dir()].erase(tablet->tablet_id());
break;
+ case CompactionType::FULL_COMPACTION:
+ removed =
_tablet_submitted_full_compaction[tablet->data_dir()].erase(tablet->tablet_id());
+ break;
}
if (removed == 1) {
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index d327f82ab33..d549e17b1bf 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -1396,6 +1396,30 @@ Status
StorageEngine::get_compaction_status_json(std::string* result) {
}
root.AddMember(base_key, path_obj2, root.GetAllocator());
+ // full
+ const std::string& full = "FullCompaction";
+ rapidjson::Value full_key;
+ full_key.SetString(full.c_str(), full.length(), root.GetAllocator());
+ rapidjson::Document path_obj3;
+ path_obj3.SetObject();
+ for (auto& it : _tablet_submitted_full_compaction) {
+ const std::string& dir = it.first->path();
+ rapidjson::Value path_key;
+ path_key.SetString(dir.c_str(), dir.length(),
path_obj3.GetAllocator());
+
+ rapidjson::Document arr;
+ arr.SetArray();
+
+ for (auto& tablet_id : it.second) {
+ rapidjson::Value key;
+ const std::string& key_str = std::to_string(tablet_id);
+ key.SetString(key_str.c_str(), key_str.length(),
path_obj3.GetAllocator());
+ arr.PushBack(key, root.GetAllocator());
+ }
+ path_obj3.AddMember(path_key, arr, path_obj3.GetAllocator());
+ }
+ root.AddMember(full_key, path_obj3, root.GetAllocator());
+
rapidjson::StringBuffer strbuf;
rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(strbuf);
root.Accept(writer);
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index 0412a576e16..63234047305 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -441,6 +441,7 @@ private:
// a tablet can do base and cumulative compaction at same time
std::map<DataDir*, std::unordered_set<TTabletId>>
_tablet_submitted_cumu_compaction;
std::map<DataDir*, std::unordered_set<TTabletId>>
_tablet_submitted_base_compaction;
+ std::map<DataDir*, std::unordered_set<TTabletId>>
_tablet_submitted_full_compaction;
std::mutex _low_priority_task_nums_mutex;
std::unordered_map<DataDir*, int32_t> _low_priority_task_nums;
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 94889bbcf8f..358292463fc 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -1241,7 +1241,7 @@ void Tablet::get_compaction_status(std::string*
json_result) {
root.AddMember("last base failure time", base_value, root.GetAllocator());
rapidjson::Value full_value;
format_str =
ToStringFromUnixMillis(_last_full_compaction_failure_millis.load());
- base_value.SetString(format_str.c_str(), format_str.length(),
root.GetAllocator());
+ full_value.SetString(format_str.c_str(), format_str.length(),
root.GetAllocator());
root.AddMember("last full failure time", full_value, root.GetAllocator());
rapidjson::Value cumu_success_value;
format_str =
ToStringFromUnixMillis(_last_cumu_compaction_success_millis.load());
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index 41f67b898ce..678a519cfae 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -459,6 +459,10 @@ public:
void set_alter_failed(bool alter_failed) { _alter_failed = alter_failed; }
bool is_alter_failed() { return _alter_failed; }
+ void set_is_full_compaction_running(bool is_full_compaction_running) {
+ _is_full_compaction_running = is_full_compaction_running;
+ }
+ inline bool is_full_compaction_running() const { return
_is_full_compaction_running; }
void clear_cache() override;
private:
@@ -573,6 +577,8 @@ private:
std::atomic<bool> _alter_failed = false;
int64_t _io_error_times = 0;
+
+ std::atomic_bool _is_full_compaction_running = false;
};
inline CumulativeCompactionPolicy* Tablet::cumulative_compaction_policy() {
diff --git
a/regression-test/suites/fault_injection_p0/test_full_compaciton_run_status.groovy
b/regression-test/suites/fault_injection_p0/test_full_compaciton_run_status.groovy
new file mode 100644
index 00000000000..4f720d51331
--- /dev/null
+++
b/regression-test/suites/fault_injection_p0/test_full_compaciton_run_status.groovy
@@ -0,0 +1,100 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_full_compaction_run_status","nonConcurrent") {
+
+
+ def tableName = "full_compaction_run_status_test"
+
+ // test successful group commit async load
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+
+ String backend_id;
+
+ def backendId_to_backendIP = [:]
+ def backendId_to_backendHttpPort = [:]
+ getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);
+
+ backend_id = backendId_to_backendIP.keySet()[0]
+
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ `k` int ,
+ `v` int ,
+ ) engine=olap
+ DISTRIBUTED BY HASH(`k`)
+ BUCKETS 2
+ properties(
+ "replication_num" = "1",
+ "disable_auto_compaction" = "true")
+ """
+
+ sql """ INSERT INTO ${tableName} VALUES (0,00)"""
+ sql """ INSERT INTO ${tableName} VALUES (1,10)"""
+ sql """ INSERT INTO ${tableName} VALUES (2,20)"""
+ sql """ INSERT INTO ${tableName} VALUES (3,30)"""
+ sql """ INSERT INTO ${tableName} VALUES (4,40)"""
+ sql """ INSERT INTO ${tableName} VALUES (5,50)"""
+ sql """ INSERT INTO ${tableName} VALUES (6,60)"""
+ sql """ INSERT INTO ${tableName} VALUES (7,70)"""
+ sql """ INSERT INTO ${tableName} VALUES (8,80)"""
+ sql """ INSERT INTO ${tableName} VALUES (9,90)"""
+
+ GetDebugPoint().clearDebugPointsForAllBEs()
+
+ def exception = false;
+ try {
+
GetDebugPoint().enableDebugPointForAllBEs("FullCompaction.modify_rowsets.sleep")
+ def tablets = sql_return_maparray """ show tablets from ${tableName};
"""
+ for (def tablet in tablets) {
+ String tablet_id = tablet.TabletId
+ backend_id = tablet.BackendId
+
+ times = 1
+ do{
+ (code, out, err) =
be_run_full_compaction(backendId_to_backendIP.get(backend_id),
backendId_to_backendHttpPort.get(backend_id), tablet_id)
+ logger.info("Run compaction: code=" + code + ", out=" + out +
", err=" + err)
+ ++times
+ sleep(1000)
+ } while (parseJson(out.trim()).status.toLowerCase()!="success" &&
times<=10)
+
+ (code, out, err) =
be_get_compaction_status(backendId_to_backendIP.get(backend_id),
backendId_to_backendHttpPort.get(backend_id), tablet_id)
+ logger.info("Get compaction status: code=" + code + ", out=" + out
+ ", err=" + err)
+ assertEquals(code, 0)
+ def compactJson = parseJson(out.trim())
+ assertTrue(compactJson.msg.toLowerCase().contains("is running"))
+ }
+ Thread.sleep(30000)
+ logger.info("sleep 30s to wait full compaction finish.")
+ for (def tablet in tablets) {
+ String tablet_id = tablet.TabletId
+ backend_id = tablet.BackendId
+
+ (code, out, err) =
be_get_compaction_status(backendId_to_backendIP.get(backend_id),
backendId_to_backendHttpPort.get(backend_id), tablet_id)
+ logger.info("Get compaction status: code=" + code + ", out=" + out
+ ", err=" + err)
+ assertEquals(code, 0)
+ def compactJson = parseJson(out.trim())
+ assertTrue(compactJson.msg.toLowerCase().contains("is not
running"))
+ }
+ } catch (Exception e) {
+ logger.info(e.getMessage())
+ exception = true;
+ } finally {
+
GetDebugPoint().disableDebugPointForAllBEs("FullCompaction.modify_rowsets.sleep")
+ assertFalse(exception)
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]