This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new ae30d6ad1ca [fix](compaction test) show single replica compaction
status and fix test (#33076) (#34285)
ae30d6ad1ca is described below
commit ae30d6ad1ca8675e59a8afffdca9e68a43074a8c
Author: Sun Chenyang <[email protected]>
AuthorDate: Mon Apr 29 19:35:29 2024 +0800
[fix](compaction test) show single replica compaction status and fix test
(#33076) (#34285)
---
be/src/http/action/compaction_action.cpp | 55 ++++++++++++++++++++++---------
be/src/http/action/compaction_action.h | 4 ++-
be/src/olap/single_replica_compaction.cpp | 20 +++++------
be/src/olap/tablet.cpp | 47 ++++++++++++++++++++++++--
be/src/olap/tablet.h | 12 +++++++
5 files changed, 110 insertions(+), 28 deletions(-)
diff --git a/be/src/http/action/compaction_action.cpp
b/be/src/http/action/compaction_action.cpp
index c81c65da1f4..2332f719a7a 100644
--- a/be/src/http/action/compaction_action.cpp
+++ b/be/src/http/action/compaction_action.cpp
@@ -40,6 +40,7 @@
#include "olap/cumulative_compaction_time_series_policy.h"
#include "olap/full_compaction.h"
#include "olap/olap_define.h"
+#include "olap/single_replica_compaction.h"
#include "olap/storage_engine.h"
#include "olap/tablet_manager.h"
#include "util/doris_metrics.h"
@@ -115,6 +116,15 @@ Status
CompactionAction::_handle_run_compaction(HttpRequest* req, std::string* j
return Status::NotSupported("The compaction type '{}' is not
supported", compaction_type);
}
+ // "remote" = "true" means tablet should do single replica compaction to
fetch rowset from peer
+ bool fetch_from_remote = false;
+ std::string param_remote = req->param(PARAM_COMPACTION_REMOTE);
+ if (param_remote == "true") {
+ fetch_from_remote = true;
+ } else if (!param_remote.empty() && param_remote != "false") {
+ return Status::NotSupported("The remote = '{}' is not supported",
param_remote);
+ }
+
if (tablet_id == 0 && table_id != 0) {
std::vector<TabletSharedPtr> tablet_vec =
StorageEngine::instance()->tablet_manager()->get_all_tablet(
@@ -132,9 +142,13 @@ Status
CompactionAction::_handle_run_compaction(HttpRequest* req, std::string* j
return Status::NotFound("Tablet not found. tablet_id={}",
tablet_id);
}
+ if (fetch_from_remote && !tablet->should_fetch_from_peer()) {
+ return Status::NotSupported("tablet should do compaction locally");
+ }
+
// 3. execute compaction task
- std::packaged_task<Status()> task([this, tablet, compaction_type]() {
- return _execute_compaction_callback(tablet, compaction_type);
+ std::packaged_task<Status()> task([this, tablet, compaction_type,
fetch_from_remote]() {
+ return _execute_compaction_callback(tablet, compaction_type,
fetch_from_remote);
});
std::future<Status> future_obj = task.get_future();
std::thread(std::move(task)).detach();
@@ -226,7 +240,8 @@ Status
CompactionAction::_handle_run_status_compaction(HttpRequest* req, std::st
}
Status CompactionAction::_execute_compaction_callback(TabletSharedPtr tablet,
- const std::string&
compaction_type) {
+ const std::string&
compaction_type,
+ bool fetch_from_remote) {
MonotonicStopWatch timer;
timer.start();
@@ -252,17 +267,28 @@ Status
CompactionAction::_execute_compaction_callback(TabletSharedPtr tablet,
}
}
} else if (compaction_type == PARAM_COMPACTION_CUMULATIVE) {
- CumulativeCompaction cumulative_compaction(tablet);
- res = cumulative_compaction.compact();
- if (!res) {
- if (res.is<CUMULATIVE_NO_SUITABLE_VERSION>()) {
- // Ignore this error code.
- VLOG_NOTICE << "failed to init cumulative compaction due to no
suitable version,"
- << "tablet=" << tablet->full_name();
- } else {
-
DorisMetrics::instance()->cumulative_compaction_request_failed->increment(1);
- LOG(WARNING) << "failed to do cumulative compaction. res=" <<
res
- << ", table=" << tablet->full_name();
+ if (fetch_from_remote) {
+ SingleReplicaCompaction single_compaction(tablet,
+
CompactionType::CUMULATIVE_COMPACTION);
+ res = single_compaction.compact();
+ if (!res) {
+ LOG(WARNING) << "failed to do single compaction. res=" << res
+ << ", table=" << tablet->tablet_id();
+ }
+ } else {
+ CumulativeCompaction cumulative_compaction(tablet);
+ res = cumulative_compaction.compact();
+ if (!res) {
+ if (res.is<CUMULATIVE_NO_SUITABLE_VERSION>()) {
+ // Ignore this error code.
+ VLOG_NOTICE
+ << "failed to init cumulative compaction due to no
suitable version,"
+ << "tablet=" << tablet->tablet_id();
+ } else {
+
DorisMetrics::instance()->cumulative_compaction_request_failed->increment(1);
+ LOG(WARNING) << "failed to do cumulative compaction. res="
<< res
+ << ", table=" << tablet->tablet_id();
+ }
}
}
} else if (compaction_type == PARAM_COMPACTION_FULL) {
@@ -279,7 +305,6 @@ Status
CompactionAction::_execute_compaction_callback(TabletSharedPtr tablet,
}
}
}
-
timer.stop();
LOG(INFO) << "Manual compaction task finish, status=" << res
<< ", compaction_use_time=" << timer.elapsed_time() / 1000000 <<
"ms";
diff --git a/be/src/http/action/compaction_action.h
b/be/src/http/action/compaction_action.h
index 1f0e67c6e92..111a54549ff 100644
--- a/be/src/http/action/compaction_action.h
+++ b/be/src/http/action/compaction_action.h
@@ -40,6 +40,7 @@ const std::string PARAM_COMPACTION_TYPE = "compact_type";
const std::string PARAM_COMPACTION_BASE = "base";
const std::string PARAM_COMPACTION_CUMULATIVE = "cumulative";
const std::string PARAM_COMPACTION_FULL = "full";
+const std::string PARAM_COMPACTION_REMOTE = "remote";
/// This action is used for viewing the compaction status.
/// See compaction-action.md for details.
@@ -60,7 +61,8 @@ private:
Status _handle_run_compaction(HttpRequest* req, std::string* json_result);
/// thread callback function for the tablet to do compaction
- Status _execute_compaction_callback(TabletSharedPtr tablet, const
std::string& compaction_type);
+ Status _execute_compaction_callback(TabletSharedPtr tablet, const
std::string& compaction_type,
+ bool fethch_from_remote);
/// fetch compaction running status
Status _handle_run_status_compaction(HttpRequest* req, std::string*
json_result);
diff --git a/be/src/olap/single_replica_compaction.cpp
b/be/src/olap/single_replica_compaction.cpp
index 9e3c1843aaa..f6bd02ed4b7 100644
--- a/be/src/olap/single_replica_compaction.cpp
+++ b/be/src/olap/single_replica_compaction.cpp
@@ -66,6 +66,9 @@ Status SingleReplicaCompaction::pick_rowsets_to_compact() {
}
Status SingleReplicaCompaction::execute_compact_impl() {
+ if (!_tablet->should_fetch_from_peer()) {
+ return Status::Aborted("compaction should be performed locally");
+ }
std::unique_lock<std::mutex>
lock_cumu(_tablet->get_cumulative_compaction_lock(),
std::try_to_lock);
if (!lock_cumu.owns_lock()) {
@@ -114,8 +117,7 @@ Status
SingleReplicaCompaction::_do_single_replica_compaction_impl() {
Version proper_version;
// 3. find proper version to fetch
if (!_find_rowset_to_fetch(peer_versions, &proper_version)) {
- LOG(WARNING) << _tablet->tablet_id() << " tablet don't need to fetch,
no matched version";
- return Status::Aborted("no matched version to fetch");
+ return Status::Cancelled("no matched versions for single replica
compaction");
}
// 4. fetch compaction result
@@ -132,6 +134,8 @@ Status
SingleReplicaCompaction::_do_single_replica_compaction_impl() {
_tablet->set_last_full_compaction_success_time(UnixMillis());
}
+ _tablet->set_last_fetched_version(_output_rowset->version());
+
int64_t current_max_version;
{
std::shared_lock rdlock(_tablet->get_header_lock());
@@ -165,23 +169,19 @@ Status
SingleReplicaCompaction::_get_rowset_verisons_from_peer(
ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(addr.host,
addr.brpc_port);
if (stub == nullptr) {
- LOG(WARNING) << "get rowset versions from peer: get rpc stub failed,
host = " << addr.host
- << " port = " << addr.brpc_port;
- return Status::Cancelled("get rpc stub failed");
+ return Status::Aborted("get rpc stub failed");
}
brpc::Controller cntl;
stub->get_tablet_rowset_versions(&cntl, &request, &response, nullptr);
if (cntl.Failed()) {
- LOG(WARNING) << "open brpc connection to " << addr.host << " failed: "
<< cntl.ErrorText();
- return Status::Cancelled("open brpc connection failed");
+ return Status::Aborted("open brpc connection failed");
}
if (response.status().status_code() != 0) {
- LOG(WARNING) << "peer " << addr.host << " don't have tablet " <<
_tablet->tablet_id();
- return Status::Cancelled("peer don't have tablet");
+ return Status::Aborted("peer don't have tablet");
}
if (response.versions_size() == 0) {
- return Status::Cancelled("no peer version");
+ return Status::Aborted("no peer version");
}
for (int i = 0; i < response.versions_size(); ++i) {
(*peer_versions)
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 8cfc803ad27..d1d6fa19066 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -1524,6 +1524,38 @@ void Tablet::get_compaction_status(std::string*
json_result) {
root.GetAllocator());
root.AddMember("last base status", base_compaction_status_value,
root.GetAllocator());
+ // last single replica compaction status
+ // "single replica compaction status": {
+ // "remote peer": "172.100.1.0:10875",
+ // "last failure status": "",
+ // "last fetched rowset": "[8-10]"
+ // }
+ rapidjson::Document status;
+ status.SetObject();
+ TReplicaInfo replica_info;
+ std::string dummp_token;
+ if (tablet_meta()->tablet_schema()->enable_single_replica_compaction() &&
+ StorageEngine::instance()->get_peer_replica_info(tablet_id(),
&replica_info,
+ &dummp_token)) {
+ // remote peer
+ rapidjson::Value peer_addr;
+ std::string addr = replica_info.host + ":" +
std::to_string(replica_info.brpc_port);
+ peer_addr.SetString(addr.c_str(), addr.length(),
status.GetAllocator());
+ status.AddMember("remote peer", peer_addr, status.GetAllocator());
+ // last failure status
+ rapidjson::Value compaction_status;
+
compaction_status.SetString(_last_single_compaction_failure_status.c_str(),
+
_last_single_compaction_failure_status.length(),
+ status.GetAllocator());
+ status.AddMember("last failure status", compaction_status,
status.GetAllocator());
+ // last fetched rowset
+ rapidjson::Value version;
+ std::string fetched_version = _last_fetched_version.to_string();
+ version.SetString(fetched_version.c_str(), fetched_version.length(),
status.GetAllocator());
+ status.AddMember("last fetched rowset", version,
status.GetAllocator());
+ root.AddMember("single replica compaction status", status,
root.GetAllocator());
+ }
+
// print all rowsets' version as an array
rapidjson::Document versions_arr;
rapidjson::Document missing_versions_arr;
@@ -1883,13 +1915,24 @@ void
Tablet::execute_single_replica_compaction(SingleReplicaCompaction& compacti
Status res = compaction.execute_compact();
if (!res.ok()) {
set_last_failure_time(this, compaction, UnixMillis());
- LOG(WARNING) << "failed to do single replica compaction. res=" << res
- << ", tablet=" << full_name();
+ set_last_single_compaction_failure_status(res.to_string());
+ if (res.is<CANCELLED>()) {
+ VLOG_CRITICAL << "Cannel fetching from the remote peer. res=" <<
res
+ << ", tablet=" << tablet_id();
+ } else {
+ LOG(WARNING) << "failed to do single replica compaction. res=" <<
res
+ << ", tablet=" << tablet_id();
+ }
return;
}
set_last_failure_time(this, compaction, 0);
}
+bool Tablet::should_fetch_from_peer() {
+ return tablet_meta()->tablet_schema()->enable_single_replica_compaction()
&&
+ StorageEngine::instance()->should_fetch_from_peer(tablet_id());
+}
+
std::vector<Version> Tablet::get_all_local_versions() {
std::vector<Version> local_versions;
{
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index f3ee91b9d1b..29d7209b906 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -259,6 +259,12 @@ public:
_last_base_compaction_schedule_millis = millis;
}
+ void set_last_single_compaction_failure_status(std::string status) {
+ _last_single_compaction_failure_status = std::move(status);
+ }
+
+ void set_last_fetched_version(Version version) { _last_fetched_version =
std::move(version); }
+
void delete_all_files();
void check_tablet_path_exists();
@@ -332,6 +338,8 @@ public:
std::string get_last_base_compaction_status() { return
_last_base_compaction_status; }
+ bool should_fetch_from_peer();
+
inline bool all_beta() const {
std::shared_lock rdlock(_meta_lock);
return _tablet_meta->all_beta();
@@ -680,6 +688,10 @@ private:
std::atomic<int64_t> _last_checkpoint_time;
std::string _last_base_compaction_status;
+ // single replica compaction status
+ std::string _last_single_compaction_failure_status;
+ Version _last_fetched_version;
+
// cumulative compaction policy
std::shared_ptr<CumulativeCompactionPolicy> _cumulative_compaction_policy;
std::string_view _cumulative_compaction_type;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]