This is an automated email from the ASF dual-hosted git repository.
gavinchou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 9d45269b041 [Feature](cloud) Add cloud report for clean up expired
tablets (#42066)
9d45269b041 is described below
commit 9d45269b041441d08d032a79a1504d67ecc8b103
Author: deardeng <[email protected]>
AuthorDate: Tue Nov 5 00:35:17 2024 +0800
[Feature](cloud) Add cloud report for clean up expired tablets (#42066)
1. add tablet report to fe in cloud
2. clean up expired tablet in be
---
be/src/agent/agent_server.cpp | 15 +-
be/src/agent/heartbeat_server.cpp | 6 +
be/src/agent/task_worker_pool.cpp | 61 ++++++++
be/src/agent/task_worker_pool.h | 4 +
be/src/cloud/cloud_tablet.cpp | 8 ++
be/src/cloud/cloud_tablet.h | 3 +
be/src/cloud/cloud_tablet_mgr.cpp | 72 +++++++++-
be/src/cloud/cloud_tablet_mgr.h | 16 +++
be/src/cloud/config.cpp | 1 +
be/src/cloud/config.h | 2 +
be/src/http/action/tablets_info_action.cpp | 20 +--
be/src/olap/base_tablet.h | 3 +
be/src/olap/tablet.cpp | 4 -
be/src/olap/tablet.h | 3 -
be/src/service/http_service.cpp | 2 +-
.../apache/doris/cloud/CacheHotspotManager.java | 4 +-
.../doris/cloud/catalog/CloudTabletRebalancer.java | 42 +++++-
.../doris/cloud/master/CloudReportHandler.java | 89 ++++++++++++
.../java/org/apache/doris/master/MasterImpl.java | 3 +-
.../org/apache/doris/master/ReportHandler.java | 19 ++-
.../java/org/apache/doris/qe/StmtExecutor.java | 2 +-
.../java/org/apache/doris/system/HeartbeatMgr.java | 1 +
.../org/apache/doris/clone/RepairVersionTest.java | 10 +-
gensrc/thrift/HeartbeatService.thrift | 2 +
gensrc/thrift/MasterService.thrift | 2 +
.../test_clean_tablet_when_drop_force_table.groovy | 140 ++++++++++++++++++
.../test_clean_tablet_when_rebalance.groovy | 158 +++++++++++++++++++++
27 files changed, 650 insertions(+), 42 deletions(-)
diff --git a/be/src/agent/agent_server.cpp b/be/src/agent/agent_server.cpp
index 9d36148b64f..361a8ab93a9 100644
--- a/be/src/agent/agent_server.cpp
+++ b/be/src/agent/agent_server.cpp
@@ -33,6 +33,7 @@
#include "agent/utils.h"
#include "agent/workload_group_listener.h"
#include "agent/workload_sched_policy_listener.h"
+#include "cloud/config.h"
#include "common/config.h"
#include "common/logging.h"
#include "common/status.h"
@@ -193,7 +194,7 @@ void AgentServer::start_workers(StorageEngine& engine,
ExecEnv* exec_env) {
"REPORT_DISK_STATE", _master_info,
config::report_disk_state_interval_seconds, [&engine, &master_info =
_master_info] { report_disk_callback(engine, master_info); }));
_report_workers.push_back(std::make_unique<ReportWorker>(
- "REPORT_OLAP_TABLE", _master_info,
config::report_tablet_interval_seconds,[&engine, &master_info = _master_info] {
report_tablet_callback(engine, master_info); }));
+ "REPORT_OLAP_TABLET", _master_info,
config::report_tablet_interval_seconds,[&engine, &master_info = _master_info] {
report_tablet_callback(engine, master_info); }));
// clang-format on
}
@@ -211,6 +212,10 @@ void AgentServer::cloud_start_workers(CloudStorageEngine&
engine, ExecEnv* exec_
"CALC_DBM_TASK", config::calc_delete_bitmap_worker_count,
[&engine](auto&& task) { return
calc_delete_bitmap_callback(engine, task); });
+ // cloud, drop tablet just clean clear_cache, so just one thread do it
+ _workers[TTaskType::DROP] = std::make_unique<TaskWorkerPool>(
+ "DROP_TABLE", 1, [&engine](auto&& task) { return
drop_tablet_callback(engine, task); });
+
_report_workers.push_back(std::make_unique<ReportWorker>(
"REPORT_TASK", _master_info, config::report_task_interval_seconds,
[&master_info = _master_info] { report_task_callback(master_info);
}));
@@ -218,6 +223,14 @@ void AgentServer::cloud_start_workers(CloudStorageEngine&
engine, ExecEnv* exec_
_report_workers.push_back(std::make_unique<ReportWorker>(
"REPORT_DISK_STATE", _master_info,
config::report_disk_state_interval_seconds,
[&engine, &master_info = _master_info] {
report_disk_callback(engine, master_info); }));
+
+ if (config::enable_cloud_tablet_report) {
+ _report_workers.push_back(std::make_unique<ReportWorker>(
+ "REPORT_OLAP_TABLET", _master_info,
config::report_tablet_interval_seconds,
+ [&engine, &master_info = _master_info] {
+ report_tablet_callback(engine, master_info);
+ }));
+ }
}
// TODO(lingbin): each task in the batch may have it own status or FE must
check and
diff --git a/be/src/agent/heartbeat_server.cpp
b/be/src/agent/heartbeat_server.cpp
index 146604aaab2..78002ed08fe 100644
--- a/be/src/agent/heartbeat_server.cpp
+++ b/be/src/agent/heartbeat_server.cpp
@@ -26,6 +26,7 @@
#include <ostream>
#include <string>
+#include "cloud/cloud_tablet_mgr.h"
#include "cloud/config.h"
#include "common/config.h"
#include "common/status.h"
@@ -275,6 +276,11 @@ Status HeartbeatServer::_heartbeat(const TMasterInfo&
master_info) {
LOG(INFO) << "set config cloud_unique_id " <<
master_info.cloud_unique_id << " " << st;
}
+ if (master_info.__isset.tablet_report_inactive_duration_ms) {
+ doris::g_tablet_report_inactive_duration_ms =
+ master_info.tablet_report_inactive_duration_ms;
+ }
+
if (need_report) {
LOG(INFO) << "Master FE is changed or restarted. report tablet and
disk info immediately";
_engine.notify_listeners();
diff --git a/be/src/agent/task_worker_pool.cpp
b/be/src/agent/task_worker_pool.cpp
index 5906511ce15..d9efe6dbedd 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -48,6 +48,8 @@
#include "cloud/cloud_delete_task.h"
#include "cloud/cloud_engine_calc_delete_bitmap_task.h"
#include "cloud/cloud_schema_change_job.h"
+#include "cloud/cloud_tablet_mgr.h"
+#include "cloud/config.h"
#include "common/config.h"
#include "common/logging.h"
#include "common/status.h"
@@ -116,6 +118,10 @@ bool register_task_info(const TTaskType::type task_type,
int64_t signature) {
// no need to report task of these types
return true;
}
+ if (task_type == TTaskType::type::DROP && config::is_cloud_mode()) {
+ // cloud no need to report drop task status
+ return true;
+ }
if (signature == -1) { // No need to report task with unintialized
signature
return true;
@@ -1134,6 +1140,46 @@ void report_tablet_callback(StorageEngine& engine, const
TMasterInfo& master_inf
}
}
+void report_tablet_callback(CloudStorageEngine& engine, const TMasterInfo&
master_info) {
+ // Random sleep 1~5 seconds before doing report.
+ // In order to avoid the problem that the FE receives many report requests
at the same time
+ // and can not be processed.
+ if (config::report_random_wait) {
+ random_sleep(5);
+ }
+
+ TReportRequest request;
+ request.__set_backend(BackendOptions::get_local_backend());
+ request.__isset.tablets = true;
+
+ increase_report_version();
+ uint64_t report_version;
+ uint64_t total_num_tablets = 0;
+ for (int i = 0; i < 5; i++) {
+ request.tablets.clear();
+ report_version = s_report_version;
+ engine.tablet_mgr().build_all_report_tablets_info(&request.tablets,
&total_num_tablets);
+ if (report_version == s_report_version) {
+ break;
+ }
+ }
+
+ if (report_version < s_report_version) {
+ LOG(WARNING) << "report version " << report_version << " change to "
<< s_report_version;
+
DorisMetrics::instance()->report_all_tablets_requests_skip->increment(1);
+ return;
+ }
+
+ request.__set_report_version(report_version);
+ request.__set_num_tablets(total_num_tablets);
+
+ bool succ = handle_report(request, master_info, "tablet");
+ report_tablet_total << 1;
+ if (!succ) [[unlikely]] {
+ report_tablet_failed << 1;
+ }
+}
+
void upload_callback(StorageEngine& engine, ExecEnv* env, const
TAgentTaskRequest& req) {
const auto& upload_request = req.upload_req;
@@ -1610,6 +1656,21 @@ void drop_tablet_callback(StorageEngine& engine, const
TAgentTaskRequest& req) {
remove_task_info(req.task_type, req.signature);
}
+void drop_tablet_callback(CloudStorageEngine& engine, const TAgentTaskRequest&
req) {
+ const auto& drop_tablet_req = req.drop_tablet_req;
+ DBUG_EXECUTE_IF("WorkPoolCloudDropTablet.drop_tablet_callback.failed", {
+ LOG_WARNING("WorkPoolCloudDropTablet.drop_tablet_callback.failed")
+ .tag("tablet_id", drop_tablet_req.tablet_id);
+ return;
+ });
+ // 1. erase lru from tablet mgr
+ // TODO(dx) clean tablet file cache
+ // get tablet's info(such as cachekey, tablet id, rsid)
+ engine.tablet_mgr().erase_tablet(drop_tablet_req.tablet_id);
+ // 2. gen clean file cache task
+ return;
+}
+
void push_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
const auto& push_req = req.push_req;
diff --git a/be/src/agent/task_worker_pool.h b/be/src/agent/task_worker_pool.h
index f51d6c2a4c0..c50ac57ffe9 100644
--- a/be/src/agent/task_worker_pool.h
+++ b/be/src/agent/task_worker_pool.h
@@ -155,6 +155,8 @@ void create_tablet_callback(StorageEngine& engine, const
TAgentTaskRequest& req)
void drop_tablet_callback(StorageEngine& engine, const TAgentTaskRequest& req);
+void drop_tablet_callback(CloudStorageEngine& engine, const TAgentTaskRequest&
req);
+
void clear_transaction_task_callback(StorageEngine& engine, const
TAgentTaskRequest& req);
void push_callback(StorageEngine& engine, const TAgentTaskRequest& req);
@@ -188,6 +190,8 @@ void report_disk_callback(CloudStorageEngine& engine, const
TMasterInfo& master_
void report_tablet_callback(StorageEngine& engine, const TMasterInfo&
master_info);
+void report_tablet_callback(CloudStorageEngine& engine, const TMasterInfo&
master_info);
+
void calc_delete_bitmap_callback(CloudStorageEngine& engine, const
TAgentTaskRequest& req);
} // namespace doris
diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp
index 54ea450f204..b467703637c 100644
--- a/be/src/cloud/cloud_tablet.cpp
+++ b/be/src/cloud/cloud_tablet.cpp
@@ -897,4 +897,12 @@ Status CloudTablet::sync_meta() {
return Status::OK();
}
+void CloudTablet::build_tablet_report_info(TTabletInfo* tablet_info) {
+ std::shared_lock rdlock(_meta_lock);
+ tablet_info->__set_total_version_count(_tablet_meta->version_count());
+ tablet_info->__set_tablet_id(_tablet_meta->tablet_id());
+ // Currently, this information will not be used by the cloud report,
+ // but it may be used in the future.
+}
+
} // namespace doris
diff --git a/be/src/cloud/cloud_tablet.h b/be/src/cloud/cloud_tablet.h
index a79d25f7540..5f4785b62d2 100644
--- a/be/src/cloud/cloud_tablet.h
+++ b/be/src/cloud/cloud_tablet.h
@@ -196,10 +196,13 @@ public:
int64_t last_base_compaction_success_time_ms = 0;
int64_t last_cumu_compaction_success_time_ms = 0;
int64_t last_cumu_no_suitable_version_ms = 0;
+ int64_t last_access_time_ms = 0;
// Return merged extended schema
TabletSchemaSPtr merged_tablet_schema() const override;
+ void build_tablet_report_info(TTabletInfo* tablet_info);
+
private:
// FIXME(plat1ko): No need to record base size if rowsets are ordered by
version
void update_base_size(const Rowset& rs);
diff --git a/be/src/cloud/cloud_tablet_mgr.cpp
b/be/src/cloud/cloud_tablet_mgr.cpp
index e5c31785c1e..7ecb72e62fd 100644
--- a/be/src/cloud/cloud_tablet_mgr.cpp
+++ b/be/src/cloud/cloud_tablet_mgr.cpp
@@ -28,6 +28,7 @@
#include "runtime/memory/cache_policy.h"
namespace doris {
+uint64_t g_tablet_report_inactive_duration_ms = 0;
namespace {
// port from
@@ -142,6 +143,12 @@ CloudTabletMgr::CloudTabletMgr(CloudStorageEngine& engine)
CloudTabletMgr::~CloudTabletMgr() = default;
+void set_tablet_access_time_ms(CloudTablet* tablet) {
+ using namespace std::chrono;
+ int64_t now =
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
+ tablet->last_access_time_ms = now;
+}
+
Result<std::shared_ptr<CloudTablet>> CloudTabletMgr::get_tablet(int64_t
tablet_id,
bool
warmup_data) {
// LRU value type. `Value`'s lifetime MUST NOT be longer than
`CloudTabletMgr`
@@ -181,8 +188,11 @@ Result<std::shared_ptr<CloudTablet>>
CloudTabletMgr::get_tablet(int64_t tablet_i
auto* handle = _cache->insert(key, value.release(), 1,
sizeof(CloudTablet),
CachePriority::NORMAL);
- auto ret = std::shared_ptr<CloudTablet>(
- tablet.get(), [this, handle](...) {
_cache->release(handle); });
+ auto ret =
+ std::shared_ptr<CloudTablet>(tablet.get(), [this,
handle](CloudTablet* tablet) {
+ set_tablet_access_time_ms(tablet);
+ _cache->release(handle);
+ });
_tablet_map->put(std::move(tablet));
return ret;
};
@@ -191,12 +201,16 @@ Result<std::shared_ptr<CloudTablet>>
CloudTabletMgr::get_tablet(int64_t tablet_i
if (tablet == nullptr) {
return ResultError(Status::InternalError("failed to get tablet
{}", tablet_id));
}
+ set_tablet_access_time_ms(tablet.get());
return tablet;
}
CloudTablet* tablet_raw_ptr =
reinterpret_cast<Value*>(_cache->value(handle))->tablet.get();
- auto tablet = std::shared_ptr<CloudTablet>(tablet_raw_ptr,
- [this, handle](...) {
_cache->release(handle); });
+ set_tablet_access_time_ms(tablet_raw_ptr);
+ auto tablet = std::shared_ptr<CloudTablet>(tablet_raw_ptr, [this,
handle](CloudTablet* tablet) {
+ set_tablet_access_time_ms(tablet);
+ _cache->release(handle);
+ });
return tablet;
}
@@ -357,4 +371,54 @@ Status CloudTabletMgr::get_topn_tablets_to_compact(
return Status::OK();
}
+void CloudTabletMgr::build_all_report_tablets_info(std::map<TTabletId,
TTablet>* tablets_info,
+ uint64_t* tablet_num) {
+ DCHECK(tablets_info != nullptr);
+ VLOG_NOTICE << "begin to build all report cloud tablets info";
+
+ HistogramStat tablet_version_num_hist;
+
+ auto handler = [&](const std::weak_ptr<CloudTablet>& tablet_wk) {
+ auto tablet = tablet_wk.lock();
+ if (!tablet) return;
+ (*tablet_num)++;
+ TTabletInfo tablet_info;
+ tablet->build_tablet_report_info(&tablet_info);
+ using namespace std::chrono;
+ int64_t now =
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
+ if (now - g_tablet_report_inactive_duration_ms * 1000 <
tablet->last_access_time_ms) {
+ // the tablet is still being accessed and used in recently, so not
report it
+ return;
+ }
+ auto& t_tablet = (*tablets_info)[tablet->tablet_id()];
+ // On the cloud, a specific BE has only one tablet replica;
+ // there are no multiple replicas for a specific BE.
+ // This is only to reuse the non-cloud report protocol.
+ tablet_version_num_hist.add(tablet_info.total_version_count);
+ t_tablet.tablet_infos.emplace_back(std::move(tablet_info));
+ };
+
+ auto weak_tablets = get_weak_tablets();
+ std::for_each(weak_tablets.begin(), weak_tablets.end(), handler);
+
+ DorisMetrics::instance()->tablet_version_num_distribution->set_histogram(
+ tablet_version_num_hist);
+ LOG(INFO) << "success to build all cloud report tablets info.
all_tablet_count=" << *tablet_num
+ << " exceed drop time limit count=" << tablets_info->size();
+}
+
+void CloudTabletMgr::get_tablet_info(int64_t num_tablets,
std::vector<TabletInfo>* tablets_info) {
+ auto weak_tablets = get_weak_tablets();
+ for (auto& weak_tablet : weak_tablets) {
+ auto tablet = weak_tablet.lock();
+ if (tablet == nullptr) {
+ continue;
+ }
+ if (tablets_info->size() >= num_tablets) {
+ return;
+ }
+ tablets_info->push_back(tablet->get_tablet_info());
+ }
+}
+
} // namespace doris
diff --git a/be/src/cloud/cloud_tablet_mgr.h b/be/src/cloud/cloud_tablet_mgr.h
index 976d483b36c..903f372cbde 100644
--- a/be/src/cloud/cloud_tablet_mgr.h
+++ b/be/src/cloud/cloud_tablet_mgr.h
@@ -17,6 +17,9 @@
#pragma once
+#include <gen_cpp/MasterService_types.h>
+#include <gen_cpp/Types_types.h>
+
#include <functional>
#include <memory>
#include <vector>
@@ -31,6 +34,8 @@ class CloudStorageEngine;
class LRUCachePolicy;
class CountDownLatch;
+extern uint64_t g_tablet_report_inactive_duration_ms;
+
class CloudTabletMgr {
public:
CloudTabletMgr(CloudStorageEngine& engine);
@@ -65,6 +70,17 @@ public:
std::vector<std::shared_ptr<CloudTablet>>* tablets,
int64_t* max_score);
+ /**
+ * Gets tablets info and total tablet num that are reported
+ *
+ * @param tablets_info used by report
+ * @param tablet_num tablets in be tabletMgr, total num
+ */
+ void build_all_report_tablets_info(std::map<TTabletId, TTablet>*
tablets_info,
+ uint64_t* tablet_num);
+
+ void get_tablet_info(int64_t num_tablets, std::vector<TabletInfo>*
tablets_info);
+
private:
CloudStorageEngine& _engine;
diff --git a/be/src/cloud/config.cpp b/be/src/cloud/config.cpp
index e724dbea84e..32e3250f87c 100644
--- a/be/src/cloud/config.cpp
+++ b/be/src/cloud/config.cpp
@@ -75,4 +75,5 @@ DEFINE_mInt32(tablet_txn_info_min_expired_seconds, "120");
DEFINE_mBool(enable_use_cloud_unique_id_from_fe, "true");
+DEFINE_mBool(enable_cloud_tablet_report, "true");
} // namespace doris::config
diff --git a/be/src/cloud/config.h b/be/src/cloud/config.h
index 86197f924d0..8af967afb8c 100644
--- a/be/src/cloud/config.h
+++ b/be/src/cloud/config.h
@@ -108,4 +108,6 @@ DECLARE_mInt32(tablet_txn_info_min_expired_seconds);
DECLARE_mBool(enable_use_cloud_unique_id_from_fe);
+DECLARE_Bool(enable_cloud_tablet_report);
+
} // namespace doris::config
diff --git a/be/src/http/action/tablets_info_action.cpp
b/be/src/http/action/tablets_info_action.cpp
index 9c27c1de9a0..672b03ce6ce 100644
--- a/be/src/http/action/tablets_info_action.cpp
+++ b/be/src/http/action/tablets_info_action.cpp
@@ -24,6 +24,8 @@
#include <string>
#include <vector>
+#include "cloud/cloud_storage_engine.h"
+#include "cloud/cloud_tablet_mgr.h"
#include "cloud/config.h"
#include "http/http_channel.h"
#include "http/http_headers.h"
@@ -51,12 +53,6 @@ void TabletsInfoAction::handle(HttpRequest* req) {
EasyJson TabletsInfoAction::get_tablets_info(string tablet_num_to_return) {
EasyJson tablets_info_ej;
- if (config::is_cloud_mode()) {
- // TODO(plat1ko): CloudStorageEngine
- tablets_info_ej["msg"] = "TabletsInfoAction::get_tablets_info is not
implemented";
- tablets_info_ej["code"] = 0;
- return tablets_info_ej;
- }
int64_t number;
std::string msg;
@@ -74,9 +70,15 @@ EasyJson TabletsInfoAction::get_tablets_info(string
tablet_num_to_return) {
msg = "Parameter Error";
}
std::vector<TabletInfo> tablets_info;
- TabletManager* tablet_manager =
-
ExecEnv::GetInstance()->storage_engine().to_local().tablet_manager();
- tablet_manager->obtain_specific_quantity_tablets(tablets_info, number);
+ if (!config::is_cloud_mode()) {
+ TabletManager* tablet_manager =
+
ExecEnv::GetInstance()->storage_engine().to_local().tablet_manager();
+ tablet_manager->obtain_specific_quantity_tablets(tablets_info, number);
+ } else {
+ CloudTabletMgr& cloud_tablet_manager =
+
ExecEnv::GetInstance()->storage_engine().to_cloud().tablet_mgr();
+ cloud_tablet_manager.get_tablet_info(number, &tablets_info);
+ }
tablets_info_ej["msg"] = msg;
tablets_info_ej["code"] = 0;
diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h
index 11eae0e47f9..b5da0e3bf06 100644
--- a/be/src/olap/base_tablet.h
+++ b/be/src/olap/base_tablet.h
@@ -293,6 +293,9 @@ public:
Status show_nested_index_file(std::string* json_meta);
+ TabletUid tablet_uid() const { return _tablet_meta->tablet_uid(); }
+ TabletInfo get_tablet_info() const { return TabletInfo(tablet_id(),
tablet_uid()); }
+
protected:
// Find the missed versions until the spec_version.
//
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 7c69ba54831..b1d4c9dfb89 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -1205,10 +1205,6 @@ Status Tablet::_contains_version(const Version& version)
{
return Status::OK();
}
-TabletInfo Tablet::get_tablet_info() const {
- return TabletInfo(tablet_id(), tablet_uid());
-}
-
std::vector<RowsetSharedPtr>
Tablet::pick_candidate_rowsets_to_cumulative_compaction() {
std::vector<RowsetSharedPtr> candidate_rowsets;
if (_cumulative_point == K_INVALID_CUMULATIVE_POINT) {
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index e181af3d4d3..f5866c67641 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -115,7 +115,6 @@ public:
DataDir* data_dir() const { return _data_dir; }
int64_t replica_id() const { return _tablet_meta->replica_id(); }
- TabletUid tablet_uid() const { return _tablet_meta->tablet_uid(); }
const std::string& tablet_path() const { return _tablet_path; }
@@ -279,8 +278,6 @@ public:
void check_tablet_path_exists();
- TabletInfo get_tablet_info() const;
-
std::vector<RowsetSharedPtr>
pick_candidate_rowsets_to_cumulative_compaction();
std::vector<RowsetSharedPtr> pick_candidate_rowsets_to_base_compaction();
std::vector<RowsetSharedPtr> pick_candidate_rowsets_to_full_compaction();
diff --git a/be/src/service/http_service.cpp b/be/src/service/http_service.cpp
index 7704d07b6f9..e7b920796a1 100644
--- a/be/src/service/http_service.cpp
+++ b/be/src/service/http_service.cpp
@@ -387,7 +387,7 @@ void HttpService::register_local_handler(StorageEngine&
engine) {
_ev_http_server->register_handler(HttpMethod::POST, "/api/pad_rowset",
pad_rowset_action);
ReportAction* report_tablet_action = _pool.add(new ReportAction(
- _env, TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN,
"REPORT_OLAP_TABLE"));
+ _env, TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN,
"REPORT_OLAP_TABLET"));
_ev_http_server->register_handler(HttpMethod::GET, "/api/report/tablet",
report_tablet_action);
ReportAction* report_disk_action = _pool.add(new ReportAction(
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/CacheHotspotManager.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/CacheHotspotManager.java
index b35a3b9e911..0b83baa94d6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/CacheHotspotManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/CacheHotspotManager.java
@@ -429,7 +429,7 @@ public class CacheHotspotManager extends MasterDaemon {
for (Backend backend : backends) {
Set<Long> beTabletIds = ((CloudEnv) Env.getCurrentEnv())
.getCloudTabletRebalancer()
- .getSnapshotTabletsByBeId(backend.getId());
+
.getSnapshotTabletsInPrimaryByBeId(backend.getId());
List<Tablet> warmUpTablets = new ArrayList<>();
for (Tablet tablet : tablets) {
if (beTabletIds.contains(tablet.getId())) {
@@ -559,7 +559,7 @@ public class CacheHotspotManager extends MasterDaemon {
for (Backend backend : backends) {
Set<Long> beTabletIds = ((CloudEnv) Env.getCurrentEnv())
.getCloudTabletRebalancer()
-
.getSnapshotTabletsByBeId(backend.getId());
+
.getSnapshotTabletsInPrimaryByBeId(backend.getId());
List<Tablet> warmUpTablets = new ArrayList<>();
for (Tablet tablet : tablets) {
if (beTabletIds.contains(tablet.getId())) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
index 78947afdb11..8e5033470b0 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
@@ -73,6 +73,10 @@ public class CloudTabletRebalancer extends MasterDaemon {
private volatile ConcurrentHashMap<Long, List<Tablet>>
beToColocateTabletsGlobal =
new ConcurrentHashMap<Long, List<Tablet>>();
+ // used for cloud tablet report
+ private volatile ConcurrentHashMap<Long, List<Tablet>>
beToTabletsGlobalInSecondary =
+ new ConcurrentHashMap<Long, List<Tablet>>();
+
private Map<Long, List<Tablet>> futureBeToTabletsGlobal;
private Map<String, List<Long>> clusterToBes;
@@ -164,7 +168,7 @@ public class CloudTabletRebalancer extends MasterDaemon {
public boolean srcDecommissioned;
}
- public Set<Long> getSnapshotTabletsByBeId(Long beId) {
+ public Set<Long> getSnapshotTabletsInPrimaryByBeId(Long beId) {
Set<Long> tabletIds = Sets.newHashSet();
List<Tablet> tablets = beToTabletsGlobal.get(beId);
if (tablets != null) {
@@ -183,6 +187,24 @@ public class CloudTabletRebalancer extends MasterDaemon {
return tabletIds;
}
+ public Set<Long> getSnapshotTabletsInSecondaryByBeId(Long beId) {
+ Set<Long> tabletIds = Sets.newHashSet();
+ List<Tablet> tablets = beToTabletsGlobalInSecondary.get(beId);
+ if (tablets != null) {
+ for (Tablet tablet : tablets) {
+ tabletIds.add(tablet.getId());
+ }
+ }
+ return tabletIds;
+ }
+
+ public Set<Long> getSnapshotTabletsInPrimaryAndSecondaryByBeId(Long beId) {
+ Set<Long> tabletIds = Sets.newHashSet();
+ tabletIds.addAll(getSnapshotTabletsInPrimaryByBeId(beId));
+ tabletIds.addAll(getSnapshotTabletsInSecondaryByBeId(beId));
+ return tabletIds;
+ }
+
public int getTabletNumByBackendId(long beId) {
List<Tablet> tablets = beToTabletsGlobal.get(beId);
List<Tablet> colocateTablets = beToColocateTabletsGlobal.get(beId);
@@ -617,6 +639,8 @@ public class CloudTabletRebalancer extends MasterDaemon {
public void statRouteInfo() {
ConcurrentHashMap<Long, List<Tablet>> tmpBeToTabletsGlobal = new
ConcurrentHashMap<Long, List<Tablet>>();
+ ConcurrentHashMap<Long, List<Tablet>> tmpBeToTabletsGlobalInSecondary
+ = new ConcurrentHashMap<Long, List<Tablet>>();
ConcurrentHashMap<Long, List<Tablet>> tmpBeToColocateTabletsGlobal
= new ConcurrentHashMap<Long, List<Tablet>>();
@@ -641,11 +665,8 @@ public class CloudTabletRebalancer extends MasterDaemon {
continue;
}
if (allBes.contains(beId)) {
- List<Tablet> colocateTablets =
tmpBeToColocateTabletsGlobal.get(beId);
- if (colocateTablets == null) {
- colocateTablets = new ArrayList<Tablet>();
- tmpBeToColocateTabletsGlobal.put(beId,
colocateTablets);
- }
+ List<Tablet> colocateTablets =
+
tmpBeToColocateTabletsGlobal.computeIfAbsent(beId, k -> new ArrayList<>());
colocateTablets.add(tablet);
}
continue;
@@ -657,6 +678,14 @@ public class CloudTabletRebalancer extends MasterDaemon {
continue;
}
+ Backend secondaryBe = replica.getSecondaryBackend(cluster);
+ long secondaryBeId = secondaryBe == null ? -1L :
secondaryBe.getId();
+ if (allBes.contains(secondaryBeId)) {
+ List<Tablet> tablets = tmpBeToTabletsGlobalInSecondary
+ .computeIfAbsent(secondaryBeId, k -> new
ArrayList<>());
+ tablets.add(tablet);
+ }
+
InfightTablet taskKey = new InfightTablet(tablet.getId(),
cluster);
InfightTask task = tabletToInfightTask.get(taskKey);
long futureBeId = task == null ? beId : task.destBe;
@@ -670,6 +699,7 @@ public class CloudTabletRebalancer extends MasterDaemon {
});
beToTabletsGlobal = tmpBeToTabletsGlobal;
+ beToTabletsGlobalInSecondary = tmpBeToTabletsGlobalInSecondary;
beToColocateTabletsGlobal = tmpBeToColocateTabletsGlobal;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/master/CloudReportHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/master/CloudReportHandler.java
new file mode 100644
index 00000000000..6564bd7d3a5
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/master/CloudReportHandler.java
@@ -0,0 +1,89 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cloud.master;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.cloud.catalog.CloudEnv;
+import org.apache.doris.master.ReportHandler;
+import org.apache.doris.system.Backend;
+import org.apache.doris.task.AgentBatchTask;
+import org.apache.doris.task.AgentTaskExecutor;
+import org.apache.doris.task.DropReplicaTask;
+import org.apache.doris.thrift.TTablet;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+public class CloudReportHandler extends ReportHandler {
+ private static final Logger LOG =
LogManager.getLogger(CloudReportHandler.class);
+
+ @Override
+ public void tabletReport(long backendId, Map<Long, TTablet> backendTablets,
+ Map<Long, Long> backendPartitionsVersion, long
backendReportVersion, long numTablets) {
+ long start = System.currentTimeMillis();
+ LOG.info("backend[{}] have {} tablet(s), {} need deal tablet(s).
report version: {}",
+ backendId, numTablets, backendTablets.size(),
backendReportVersion);
+ // current be useful
+ Set<Long> tabletIdsInFe = ((CloudEnv)
Env.getCurrentEnv()).getCloudTabletRebalancer()
+ .getSnapshotTabletsInPrimaryAndSecondaryByBeId(backendId);
+
+ Set<Long> tabletIdsInBe = backendTablets.keySet();
+ // handle (be - meta)
+ Set<Long> tabletIdsNeedDrop = diffTablets(tabletIdsInFe,
tabletIdsInBe);
+ // drop agent task
+ deleteFromBackend(backendId, tabletIdsNeedDrop);
+
+ Backend be = Env.getCurrentSystemInfo().getBackend(backendId);
+ LOG.info("finished to handle task report from backend {}-{}, "
+ + "diff task num: {}, cost: {} ms.",
+ backendId, be != null ? be.getHost() : "",
+ tabletIdsNeedDrop.size(),
+ (System.currentTimeMillis() - start));
+ }
+
+ // tabletIdsInFe, tablet is used in Primary or Secondary
+ // tabletIdsInBe, tablet report exceed time, need to check
+ // returns tabletIds need to drop
+ private Set<Long> diffTablets(Set<Long> tabletIdsInFe, Set<Long>
tabletIdsInBe) {
+ // tabletsInBe - tabletsInFe
+ Set<Long> result = new HashSet<>(tabletIdsInBe);
+ result.removeAll(tabletIdsInFe);
+ return result;
+ }
+
+ private static void deleteFromBackend(long backendId, Set<Long>
tabletIdsWillDrop) {
+ int deleteFromBackendCounter = 0;
+ AgentBatchTask batchTask = new AgentBatchTask();
+ for (Long tabletId : tabletIdsWillDrop) {
+ DropReplicaTask task = new DropReplicaTask(backendId, tabletId,
-1, -1, false);
+ batchTask.addTask(task);
+ LOG.info("delete tablet[{}] from backend[{}]", tabletId,
backendId);
+ ++deleteFromBackendCounter;
+ }
+
+ if (batchTask.getTaskNum() != 0) {
+ AgentTaskExecutor.submit(batchTask);
+ }
+
+ LOG.info("delete {} tablet(s) from backend[{}]",
deleteFromBackendCounter, backendId);
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
index 35b6a230b24..a4bbe763f60 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
@@ -30,6 +30,7 @@ import org.apache.doris.catalog.Tablet;
import org.apache.doris.catalog.TabletInvertedIndex;
import org.apache.doris.catalog.TabletMeta;
import org.apache.doris.cloud.catalog.CloudTablet;
+import org.apache.doris.cloud.master.CloudReportHandler;
import org.apache.doris.common.Config;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.load.DeleteJob;
@@ -76,7 +77,7 @@ import java.util.stream.Collectors;
public class MasterImpl {
private static final Logger LOG = LogManager.getLogger(MasterImpl.class);
- private ReportHandler reportHandler = new ReportHandler();
+ private ReportHandler reportHandler = Config.isCloudMode() ? new
CloudReportHandler() : new ReportHandler();
public MasterImpl() {
reportHandler.start();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
index d773cb5850e..1c8f51bd4eb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
@@ -162,6 +162,7 @@ public class ReportHandler extends Daemon {
Map<Long, TTablet> tablets = null;
Map<Long, Long> partitionsVersion = null;
long reportVersion = -1;
+ long numTablets = 0;
ReportType reportType = null;
@@ -188,6 +189,12 @@ public class ReportHandler extends Daemon {
Env.getCurrentSystemInfo().updateBackendReportVersion(beId,
reportVersion, -1L, -1L, false);
}
+ if (tablets == null) {
+ numTablets = request.isSetNumTablets() ? request.getNumTablets() :
0;
+ } else {
+ numTablets = request.isSetNumTablets() ? request.getNumTablets() :
tablets.size();
+ }
+
if (request.isSetPartitionsVersion()) {
partitionsVersion = request.getPartitionsVersion();
}
@@ -206,7 +213,7 @@ public class ReportHandler extends Daemon {
ReportTask reportTask = new ReportTask(beId, reportType, tasks, disks,
tablets, partitionsVersion,
reportVersion, request.getStoragePolicy(),
request.getResource(), request.getNumCores(),
- request.getPipelineExecutorSize());
+ request.getPipelineExecutorSize(), numTablets);
try {
putToQueue(reportTask);
} catch (Exception e) {
@@ -294,12 +301,13 @@ public class ReportHandler extends Daemon {
private List<TStorageResource> storageResources;
private int cpuCores;
private int pipelineExecutorSize;
+ private long numTablets;
public ReportTask(long beId, ReportType reportType, Map<TTaskType,
Set<Long>> tasks,
Map<String, TDisk> disks, Map<Long, TTablet> tablets,
Map<Long, Long> partitionsVersion, long reportVersion,
List<TStoragePolicy> storagePolicies, List<TStorageResource>
storageResources, int cpuCores,
- int pipelineExecutorSize) {
+ int pipelineExecutorSize, long numTablets) {
this.beId = beId;
this.reportType = reportType;
this.tasks = tasks;
@@ -311,6 +319,7 @@ public class ReportHandler extends Daemon {
this.storageResources = storageResources;
this.cpuCores = cpuCores;
this.pipelineExecutorSize = pipelineExecutorSize;
+ this.numTablets = numTablets;
}
@Override
@@ -336,7 +345,7 @@ public class ReportHandler extends Daemon {
if (partitions == null) {
partitions = Maps.newHashMap();
}
- ReportHandler.tabletReport(beId, tablets, partitions,
reportVersion);
+ tabletReport(beId, tablets, partitions, reportVersion,
numTablets);
}
}
}
@@ -471,8 +480,8 @@ public class ReportHandler extends Daemon {
}
// public for fe ut
- public static void tabletReport(long backendId, Map<Long, TTablet>
backendTablets,
- Map<Long, Long> backendPartitionsVersion, long
backendReportVersion) {
+ public void tabletReport(long backendId, Map<Long, TTablet> backendTablets,
+ Map<Long, Long> backendPartitionsVersion, long
backendReportVersion, long numTablets) {
long start = System.currentTimeMillis();
LOG.info("backend[{}] reports {} tablet(s). report version: {}",
backendId, backendTablets.size(), backendReportVersion);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 494232ef38c..f4c33d75cdc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -2584,7 +2584,7 @@ public class StmtExecutor {
List<Long> tabletIdList = new ArrayList<Long>();
Set<Long> beTabletIds = ((CloudEnv) Env.getCurrentEnv())
.getCloudTabletRebalancer()
-
.getSnapshotTabletsByBeId(backend.getId());
+
.getSnapshotTabletsInPrimaryByBeId(backend.getId());
allTabletIds.forEach(tabletId -> {
if (beTabletIds.contains(tabletId)) {
tabletIdList.add(tabletId);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
index 89f55239f7f..fb6853e83c3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
@@ -253,6 +253,7 @@ public class HeartbeatMgr extends MasterDaemon {
if (Config.isCloudMode()) {
String cloudUniqueId =
backend.getTagMap().get(Tag.CLOUD_UNIQUE_ID);
copiedMasterInfo.setCloudUniqueId(cloudUniqueId);
+
copiedMasterInfo.setTabletReportInactiveDurationMs(Config.rehash_tablet_after_be_dead_seconds);
}
THeartbeatResult result;
if (!FeConstants.runningUnitTest) {
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/clone/RepairVersionTest.java
b/fe/fe-core/src/test/java/org/apache/doris/clone/RepairVersionTest.java
index 1ac497dbebe..423c839faa2 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/clone/RepairVersionTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/clone/RepairVersionTest.java
@@ -105,8 +105,8 @@ public class RepairVersionTest extends TestWithFeService {
Map<Long, TTablet> tablets = Maps.newHashMap();
tablets.put(tablet.getId(), tTablet);
Assertions.assertEquals(partition.getVisibleVersion(),
replica.getVersion());
-
- ReportHandler.tabletReport(replica.getBackendId(), tablets,
Maps.newHashMap(), 100L);
+ ReportHandler reportHandler = new ReportHandler();
+ reportHandler.tabletReport(replica.getBackendId(), tablets,
Maps.newHashMap(), 100L, tablets.size());
Assertions.assertEquals(partition.getVisibleVersion(),
replica.getVersion());
Assertions.assertEquals(-1L, replica.getLastFailedVersion());
@@ -135,12 +135,12 @@ public class RepairVersionTest extends TestWithFeService {
tTablet.addToTabletInfos(tTabletInfo);
Map<Long, TTablet> tablets = Maps.newHashMap();
tablets.put(tablet.getId(), tTablet);
-
- ReportHandler.tabletReport(replica.getBackendId(), tablets,
Maps.newHashMap(), 100L);
+ ReportHandler reportHandler = new ReportHandler();
+ reportHandler.tabletReport(replica.getBackendId(), tablets,
Maps.newHashMap(), 100L, tablets.size());
Assertions.assertEquals(-1L, replica.getLastFailedVersion());
DebugPointUtil.addDebugPoint("Replica.regressive_version_immediately",
new DebugPoint());
- ReportHandler.tabletReport(replica.getBackendId(), tablets,
Maps.newHashMap(), 100L);
+ reportHandler.tabletReport(replica.getBackendId(), tablets,
Maps.newHashMap(), 100L, tablets.size());
Assertions.assertEquals(replica.getVersion() + 1,
replica.getLastFailedVersion());
Assertions.assertEquals(partition.getVisibleVersion(),
replica.getVersion());
diff --git a/gensrc/thrift/HeartbeatService.thrift
b/gensrc/thrift/HeartbeatService.thrift
index c03f04a6543..acdc608f21b 100644
--- a/gensrc/thrift/HeartbeatService.thrift
+++ b/gensrc/thrift/HeartbeatService.thrift
@@ -41,6 +41,8 @@ struct TMasterInfo {
9: optional list<TFrontendInfo> frontend_infos
10: optional string meta_service_endpoint;
11: optional string cloud_unique_id;
+ // See configuration item Config.java rehash_tablet_after_be_dead_seconds
for meaning
+ 12: optional i64 tablet_report_inactive_duration_ms;
}
struct TBackendInfo {
diff --git a/gensrc/thrift/MasterService.thrift
b/gensrc/thrift/MasterService.thrift
index ecedf0ee1af..9d8cd9111ba 100644
--- a/gensrc/thrift/MasterService.thrift
+++ b/gensrc/thrift/MasterService.thrift
@@ -114,6 +114,8 @@ struct TReportRequest {
11: i32 num_cores
12: i32 pipeline_executor_size
13: optional map<Types.TPartitionId, Types.TVersion> partitions_version
+ // tablet num in be, in cloud num_tablets may not eq tablet_list.size()
+ 14: optional i64 num_tablets
}
struct TMasterResult {
diff --git
a/regression-test/suites/cloud_p0/tablets/test_clean_tablet_when_drop_force_table.groovy
b/regression-test/suites/cloud_p0/tablets/test_clean_tablet_when_drop_force_table.groovy
new file mode 100644
index 00000000000..4dc847d603a
--- /dev/null
+++
b/regression-test/suites/cloud_p0/tablets/test_clean_tablet_when_drop_force_table.groovy
@@ -0,0 +1,140 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+import org.apache.doris.regression.util.Http
+
+suite('test_clean_tablet_when_drop_force_table', 'docker') {
+ if (!isCloudMode()) {
+ return;
+ }
+ def options = new ClusterOptions()
+ options.feConfigs += [
+ 'cloud_cluster_check_interval_second=1',
+ 'cloud_tablet_rebalancer_interval_second=1',
+ 'sys_log_verbose_modules=org',
+ 'heartbeat_interval_second=1',
+ 'rehash_tablet_after_be_dead_seconds=5'
+ ]
+ options.beConfigs += [
+ 'report_tablet_interval_seconds=1'
+ ]
+ options.setFeNum(3)
+ options.setBeNum(3)
+ options.cloudMode = true
+ options.enableDebugPoints()
+
+ def backendIdToHost = { ->
+ def spb = sql_return_maparray """SHOW BACKENDS"""
+ def beIdToHost = [:]
+ spb.each {
+ beIdToHost[it.BackendId] = it.Host
+ }
+ beIdToHost
+ }
+
+ def getTabletAndBeHostFromFe = { table ->
+ def result = sql_return_maparray """SHOW TABLETS FROM $table"""
+ def bes = backendIdToHost.call()
+ // tablet : host
+ def ret = [:]
+ result.each {
+ ret[it.TabletId] = bes[it.BackendId]
+ }
+ ret
+ }
+
+ def getTabletAndBeHostFromBe = { ->
+ def bes = cluster.getAllBackends()
+ def ret = [:]
+ bes.each { be ->
+ //
{"msg":"OK","code":0,"data":{"host":"128.2.51.2","tablets":[{"tablet_id":10560},{"tablet_id":10554},{"tablet_id":10552}]},"count":3}
+ def data =
Http.GET("http://${be.host}:${be.httpPort}/tablets_json?limit=all", true).data
+ def tablets = data.tablets.collect { it.tablet_id as String }
+ tablets.each{
+ ret[it] = data.host
+ }
+ }
+ ret
+ }
+
+ def testCase = { table, waitTime, useDp=false->
+ sql """CREATE TABLE $table (
+ `k1` int(11) NULL,
+ `k2` int(11) NULL
+ )
+ DUPLICATE KEY(`k1`, `k2`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+ PROPERTIES (
+ "replication_num"="1"
+ );
+ """
+ sql """
+ insert into $table values (1, 1), (2, 2), (3, 3)
+ """
+
+ for (int i = 0; i < 5; i++) {
+ sql """
+ select * from $table
+ """
+ }
+
+ // before drop table force
+ def beforeGetFromFe = getTabletAndBeHostFromFe(table)
+ def beforeGetFromBe = getTabletAndBeHostFromBe.call()
+ logger.info("fe tablets {}, be tablets {}", beforeGetFromFe,
beforeGetFromBe)
+ beforeGetFromFe.each {
+ assertTrue(beforeGetFromBe.containsKey(it.Key))
+ assertEquals(beforeGetFromBe[it.Key], it.Value)
+ }
+ if (useDp) {
+
GetDebugPoint().enableDebugPointForAllBEs("WorkPoolCloudDropTablet.drop_tablet_callback.failed")
+ }
+ // after drop table force
+
+ sql """
+ DROP TABLE $table FORCE
+ """
+ def futrue
+ if (useDp) {
+ futrue = thread {
+ sleep(10 * 1000)
+
GetDebugPoint().disableDebugPointForAllBEs("WorkPoolCloudDropTablet.drop_tablet_callback.failed")
+ }
+ }
+ def start = System.currentTimeMillis() / 1000
+ // tablet can't find in be
+ dockerAwaitUntil(50) {
+ def beTablets = getTabletAndBeHostFromBe.call().keySet()
+ logger.info("before drop tablets {}, after tablets {}",
beforeGetFromFe, beTablets)
+ beforeGetFromFe.keySet().every {
!getTabletAndBeHostFromBe.call().containsKey(it) }
+ }
+ logger.info("table {}, cost {}s", table, System.currentTimeMillis() /
1000 - start)
+ assertTrue(System.currentTimeMillis() / 1000 - start > waitTime)
+ if (useDp) {
+ futrue.get()
+ }
+ }
+
+ docker(options) {
+ // because rehash_tablet_after_be_dead_seconds=5
+ testCase("test_clean_tablet_when_drop_force_table_1", 5)
+ // report retry
+ testCase("test_clean_tablet_when_drop_force_table_2", 10, true)
+ }
+}
diff --git
a/regression-test/suites/cloud_p0/tablets/test_clean_tablet_when_rebalance.groovy
b/regression-test/suites/cloud_p0/tablets/test_clean_tablet_when_rebalance.groovy
new file mode 100644
index 00000000000..4a44b317cc2
--- /dev/null
+++
b/regression-test/suites/cloud_p0/tablets/test_clean_tablet_when_rebalance.groovy
@@ -0,0 +1,158 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+import org.apache.doris.regression.util.Http
+
+suite('test_clean_tablet_when_rebalance', 'docker') {
+ if (!isCloudMode()) {
+ return;
+ }
+ def options = new ClusterOptions()
+ def rehashTime = 100
+ options.feConfigs += [
+ 'cloud_cluster_check_interval_second=1',
+ 'cloud_tablet_rebalancer_interval_second=1',
+ 'sys_log_verbose_modules=org',
+ 'heartbeat_interval_second=1'
+ ]
+ options.feConfigs.add("rehash_tablet_after_be_dead_seconds=$rehashTime")
+ options.beConfigs += [
+ 'report_tablet_interval_seconds=1'
+ ]
+ options.setFeNum(3)
+ options.setBeNum(3)
+ options.cloudMode = true
+ options.enableDebugPoints()
+
+ def choseDeadBeIndex = 1
+ def table = "test_clean_tablet_when_rebalance"
+
+ def backendIdToHost = { ->
+ def spb = sql_return_maparray """SHOW BACKENDS"""
+ def beIdToHost = [:]
+ spb.each {
+ beIdToHost[it.BackendId] = it.Host
+ }
+ beIdToHost
+ }
+
+ def getTabletAndBeHostFromFe = { ->
+ def result = sql_return_maparray """SHOW TABLETS FROM $table"""
+ def bes = backendIdToHost.call()
+ // tablet : host
+ def ret = [:]
+ result.each {
+ ret[it.TabletId] = bes[it.BackendId]
+ }
+ ret
+ }
+
+ def getTabletAndBeHostFromBe = { ->
+ def bes = cluster.getAllBackends()
+ def ret = [:]
+ bes.each { be ->
+ //
{"msg":"OK","code":0,"data":{"host":"128.2.51.2","tablets":[{"tablet_id":10560},{"tablet_id":10554},{"tablet_id":10552}]},"count":3}
+ def data =
Http.GET("http://${be.host}:${be.httpPort}/tablets_json?limit=all", true).data
+ def tablets = data.tablets.collect { it.tablet_id as String }
+ tablets.each{
+ ret[it] = data.host
+ }
+ }
+ ret
+ }
+
+ def testCase = { deadTime ->
+ boolean beDeadLong = deadTime > rehashTime ? true : false
+ logger.info("begin exec beDeadLong {}", beDeadLong)
+
+ for (int i = 0; i < 5; i++) {
+ sql """
+ select * from $table
+ """
+ }
+
+ def beforeGetFromFe = getTabletAndBeHostFromFe()
+ def beforeGetFromBe = getTabletAndBeHostFromBe.call()
+ logger.info("before fe tablets {}, be tablets {}", beforeGetFromFe,
beforeGetFromBe)
+ beforeGetFromFe.each {
+ assertTrue(beforeGetFromBe.containsKey(it.Key))
+ assertEquals(beforeGetFromBe[it.Key], it.Value)
+ }
+
+ cluster.stopBackends(choseDeadBeIndex)
+ dockerAwaitUntil(50) {
+ def bes = sql_return_maparray("SHOW TABLETS FROM ${table}")
+ .collect { it.BackendId }
+ .unique()
+ logger.info("bes {}", bes)
+ bes.size() == 2
+ }
+
+ if (beDeadLong) {
+ setFeConfig('enable_cloud_partition_balance', false)
+ setFeConfig('enable_cloud_table_balance', false)
+ setFeConfig('enable_cloud_global_balance', false)
+ }
+ sleep(deadTime * 1000)
+
+ cluster.startBackends(choseDeadBeIndex)
+
+ dockerAwaitUntil(50) {
+ def bes = sql_return_maparray("SHOW TABLETS FROM ${table}")
+ .collect { it.BackendId }
+ .unique()
+ logger.info("bes {}", bes)
+ bes.size() == (beDeadLong ? 2 : 3)
+ }
+ for (int i = 0; i < 5; i++) {
+ sql """
+ select * from $table
+ """
+ sleep(1000)
+ }
+ beforeGetFromFe = getTabletAndBeHostFromFe()
+ beforeGetFromBe = getTabletAndBeHostFromBe.call()
+ logger.info("after fe tablets {}, be tablets {}", beforeGetFromFe,
beforeGetFromBe)
+ beforeGetFromFe.each {
+ assertTrue(beforeGetFromBe.containsKey(it.Key))
+ assertEquals(beforeGetFromBe[it.Key], it.Value)
+ }
+ }
+
+ docker(options) {
+ sql """CREATE TABLE $table (
+ `k1` int(11) NULL,
+ `k2` int(11) NULL
+ )
+ DUPLICATE KEY(`k1`, `k2`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+ PROPERTIES (
+ "replication_num"="1"
+ );
+ """
+ sql """
+ insert into $table values (1, 1), (2, 2), (3, 3)
+ """
+ // 'rehash_tablet_after_be_dead_seconds=10'
+ // be-1 dead, but not dead for a long time
+ testCase(5)
+ // be-1 dead, and dead for a long time
+ testCase(200)
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]