This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new c701ec83a42 [fix](migrate disk) fix migrate disk lost data during
publish version (#29887)
c701ec83a42 is described below
commit c701ec83a42db15e9436bcac132294aba29893f4
Author: yujun <[email protected]>
AuthorDate: Sun Jan 14 10:27:05 2024 +0800
[fix](migrate disk) fix migrate disk lost data during publish version
(#29887)
Co-authored-by: Yongqiang YANG
<[email protected]>
---
be/src/agent/task_worker_pool.cpp | 12 +++-
be/src/common/config.cpp | 5 +-
be/src/common/config.h | 5 +-
be/src/olap/tablet.h | 4 +-
be/src/olap/task/engine_publish_version_task.cpp | 17 +++++
be/src/olap/task/engine_storage_migration_task.cpp | 16 ++---
be/src/olap/task/engine_storage_migration_task.h | 3 +-
be/src/util/debug_points.h | 14 ++++
.../test_migrate_disk_with_publish_version.out | 11 +++
.../test_migrate_disk_with_publish_version.groovy | 84 ++++++++++++++++++++++
10 files changed, 151 insertions(+), 20 deletions(-)
diff --git a/be/src/agent/task_worker_pool.cpp
b/be/src/agent/task_worker_pool.cpp
index a845b6253f9..f40fe73758f 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -856,7 +856,9 @@ void check_consistency_callback(StorageEngine& engine,
const TAgentTaskRequest&
void report_task_callback(const TMasterInfo& master_info) {
TReportRequest request;
- random_sleep(5);
+ if (config::report_random_wait) {
+ random_sleep(5);
+ }
request.__isset.tasks = true;
{
std::lock_guard lock(s_task_signatures_mtx);
@@ -880,7 +882,9 @@ void report_disk_callback(StorageEngine& engine, const
TMasterInfo& master_info)
// Random sleep 1~5 seconds before doing report.
// In order to avoid the problem that the FE receives many report requests
at the same time
// and can not be processed.
- random_sleep(5);
+ if (config::report_random_wait) {
+ random_sleep(5);
+ }
TReportRequest request;
request.__set_backend(BackendOptions::get_local_backend());
@@ -914,7 +918,9 @@ void report_disk_callback(StorageEngine& engine, const
TMasterInfo& master_info)
}
void report_tablet_callback(StorageEngine& engine, const TMasterInfo&
master_info) {
- random_sleep(5);
+ if (config::report_random_wait) {
+ random_sleep(5);
+ }
TReportRequest request;
request.__set_backend(BackendOptions::get_local_backend());
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 17ac1905dd6..2d418b2bf24 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -178,6 +178,9 @@ DEFINE_Int32(download_worker_count, "1");
DEFINE_Int32(make_snapshot_worker_count, "5");
// the count of thread to release snapshot
DEFINE_Int32(release_snapshot_worker_count, "5");
+// report random wait a little time to avoid FE receiving multiple be reports
at the same time.
+// do not set it to false for production environment
+DEFINE_mBool(report_random_wait, "true");
// the interval time(seconds) for agent report tasks signature to FE
DEFINE_mInt32(report_task_interval_seconds, "10");
// the interval time(seconds) for refresh storage policy from FE
@@ -192,8 +195,6 @@ DEFINE_mInt32(max_download_speed_kbps, "50000");
DEFINE_mInt32(download_low_speed_limit_kbps, "50");
// download low speed time(seconds)
DEFINE_mInt32(download_low_speed_time, "300");
-// sleep time for one second
-DEFINE_Int32(sleep_one_second, "1");
// log dir
DEFINE_String(sys_log_dir, "${DORIS_HOME}/log");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 0f7c47f8ec3..7ae583cedb6 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -226,6 +226,9 @@ DECLARE_Int32(download_worker_count);
DECLARE_Int32(make_snapshot_worker_count);
// the count of thread to release snapshot
DECLARE_Int32(release_snapshot_worker_count);
+// report random wait a little time to avoid FE receiving multiple be reports
at the same time.
+// do not set it to false for production environment
+DECLARE_mBool(report_random_wait);
// the interval time(seconds) for agent report tasks signature to FE
DECLARE_mInt32(report_task_interval_seconds);
// the interval time(seconds) for refresh storage policy from FE
@@ -240,8 +243,6 @@ DECLARE_mInt32(max_download_speed_kbps);
DECLARE_mInt32(download_low_speed_limit_kbps);
// download low speed time(seconds)
DECLARE_mInt32(download_low_speed_time);
-// sleep time for one second
-DECLARE_Int32(sleep_one_second);
// log dir
DECLARE_String(sys_log_dir);
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index 8492b0b158e..d6ad0285233 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -195,7 +195,7 @@ public:
std::mutex& get_base_compaction_lock() { return _base_compaction_lock; }
std::mutex& get_cumulative_compaction_lock() { return
_cumulative_compaction_lock; }
- std::shared_mutex& get_migration_lock() { return _migration_lock; }
+ std::shared_timed_mutex& get_migration_lock() { return _migration_lock; }
std::mutex& get_schema_change_lock() { return _schema_change_lock; }
@@ -625,7 +625,7 @@ private:
std::mutex _base_compaction_lock;
std::mutex _cumulative_compaction_lock;
std::mutex _schema_change_lock;
- std::shared_mutex _migration_lock;
+ std::shared_timed_mutex _migration_lock;
std::mutex _build_inverted_index_lock;
// In unique key table with MoW, we should guarantee that only one
diff --git a/be/src/olap/task/engine_publish_version_task.cpp
b/be/src/olap/task/engine_publish_version_task.cpp
index ac3cc82e526..78f5cb9c328 100644
--- a/be/src/olap/task/engine_publish_version_task.cpp
+++ b/be/src/olap/task/engine_publish_version_task.cpp
@@ -348,6 +348,13 @@
TabletPublishTxnTask::TabletPublishTxnTask(EnginePublishVersionTask* engine_task
}
void TabletPublishTxnTask::handle() {
+ std::shared_lock migration_rlock(_tablet->get_migration_lock(),
std::chrono::seconds(5));
+ if (!migration_rlock.owns_lock()) {
+ _result = Status::Error<TRY_LOCK_FAILED, false>("got migration_rlock
failed");
+ LOG(WARNING) << "failed to publish version. tablet_id=" <<
_tablet_info.tablet_id
+ << ", txn_id=" << _transaction_id << ", res=" << _result;
+ return;
+ }
std::unique_lock<std::mutex>
rowset_update_lock(_tablet->get_rowset_update_lock(),
std::defer_lock);
if (_tablet->enable_unique_key_merge_on_write()) {
@@ -364,6 +371,8 @@ void TabletPublishTxnTask::handle() {
return;
}
+ DBUG_EXECUTE_IF("EnginePublishVersionTask.handle.block_add_rowsets",
DBUG_BLOCK);
+
// add visible rowset to tablet
int64_t t1 = MonotonicMicros();
_result = _tablet->add_inc_rowset(_rowset);
@@ -389,6 +398,12 @@ void TabletPublishTxnTask::handle() {
}
void AsyncTabletPublishTask::handle() {
+ std::shared_lock migration_rlock(_tablet->get_migration_lock(),
std::chrono::seconds(5));
+ if (!migration_rlock.owns_lock()) {
+ LOG(WARNING) << "failed to publish version. tablet_id=" <<
_tablet->tablet_id()
+ << ", txn_id=" << _transaction_id << ", got
migration_rlock failed";
+ return;
+ }
std::lock_guard<std::mutex> wrlock(_tablet->get_rowset_update_lock());
_stats.schedule_time_us = MonotonicMicros() - _stats.submit_time_us;
std::map<TabletInfo, RowsetSharedPtr> tablet_related_rs;
@@ -409,6 +424,8 @@ void AsyncTabletPublishTask::handle() {
return;
}
+ DBUG_EXECUTE_IF("EnginePublishVersionTask.handle.block_add_rowsets",
DBUG_BLOCK);
+
// add visible rowset to tablet
int64_t t1 = MonotonicMicros();
publish_status = _tablet->add_inc_rowset(rowset);
diff --git a/be/src/olap/task/engine_storage_migration_task.cpp
b/be/src/olap/task/engine_storage_migration_task.cpp
index 8cd5717afb2..efa85406c69 100644
--- a/be/src/olap/task/engine_storage_migration_task.cpp
+++ b/be/src/olap/task/engine_storage_migration_task.cpp
@@ -49,8 +49,6 @@ namespace doris {
using std::stringstream;
-const int CHECK_TXNS_MAX_WAIT_TIME_SECS = 60;
-
EngineStorageMigrationTask::EngineStorageMigrationTask(const TabletSharedPtr&
tablet,
DataDir* dest_store)
: _tablet(tablet), _dest_store(dest_store) {
@@ -115,16 +113,15 @@ Status EngineStorageMigrationTask::_check_running_txns() {
}
Status EngineStorageMigrationTask::_check_running_txns_until_timeout(
- std::unique_lock<std::shared_mutex>* migration_wlock) {
+ std::unique_lock<std::shared_timed_mutex>* migration_wlock) {
// caller should not hold migration lock, and 'migration_wlock' should not
be nullptr
// ownership of the migration_wlock is transferred to the caller if check
succ
DCHECK_NE(migration_wlock, nullptr);
Status res = Status::OK();
- int try_times = 1;
do {
// to avoid invalid loops, the lock is guaranteed to be acquired here
{
- std::unique_lock<std::shared_mutex>
wlock(_tablet->get_migration_lock());
+ std::unique_lock<std::shared_timed_mutex>
wlock(_tablet->get_migration_lock());
if (_tablet->tablet_state() == TABLET_SHUTDOWN) {
return Status::Error<ErrorCode::INTERNAL_ERROR, false>("tablet
{} has deleted",
_tablet->tablet_id());
@@ -136,8 +133,7 @@ Status
EngineStorageMigrationTask::_check_running_txns_until_timeout(
return res;
}
}
- sleep(std::min(config::sleep_one_second * try_times,
CHECK_TXNS_MAX_WAIT_TIME_SECS));
- ++try_times;
+ std::this_thread::sleep_for(std::chrono::milliseconds(200));
} while (!_is_timeout());
return res;
}
@@ -224,8 +220,8 @@ Status EngineStorageMigrationTask::_migrate() {
uint64_t shard = 0;
std::string full_path;
{
- std::unique_lock<std::shared_mutex>
migration_wlock(_tablet->get_migration_lock(),
- std::try_to_lock);
+ std::unique_lock<std::shared_timed_mutex>
migration_wlock(_tablet->get_migration_lock(),
+
std::chrono::seconds(1));
if (!migration_wlock.owns_lock()) {
return Status::InternalError("could not own migration_wlock");
}
@@ -260,7 +256,7 @@ Status EngineStorageMigrationTask::_migrate() {
if (!res.ok()) {
break;
}
- std::unique_lock<std::shared_mutex> migration_wlock;
+ std::unique_lock<std::shared_timed_mutex> migration_wlock;
res = _check_running_txns_until_timeout(&migration_wlock);
if (!res.ok()) {
break;
diff --git a/be/src/olap/task/engine_storage_migration_task.h
b/be/src/olap/task/engine_storage_migration_task.h
index 2831fc11df4..c15b0576701 100644
--- a/be/src/olap/task/engine_storage_migration_task.h
+++ b/be/src/olap/task/engine_storage_migration_task.h
@@ -54,7 +54,8 @@ private:
Status _check_running_txns();
// caller should not hold migration lock, and 'migration_wlock' should not
be nullptr
// ownership of the migration lock is transferred to the caller if check
succ
- Status
_check_running_txns_until_timeout(std::unique_lock<std::shared_mutex>*
migration_wlock);
+ Status _check_running_txns_until_timeout(
+ std::unique_lock<std::shared_timed_mutex>* migration_wlock);
// if the size less than threshold, return true
bool _is_rowsets_size_less_than_threshold(
diff --git a/be/src/util/debug_points.h b/be/src/util/debug_points.h
index 1106a548f8d..7af666f1196 100644
--- a/be/src/util/debug_points.h
+++ b/be/src/util/debug_points.h
@@ -19,9 +19,11 @@
#include <atomic>
#include <boost/lexical_cast.hpp>
+#include <chrono>
#include <functional>
#include <map>
#include <memory>
+#include <thread>
#include <type_traits>
#include "common/compiler_util.h"
@@ -33,10 +35,22 @@
if (UNLIKELY(config::enable_debug_points)) { \
auto dp = DebugPoints::instance()->get_debug_point(debug_point_name); \
if (dp) { \
+ [[maybe_unused]] auto DP_NAME = debug_point_name; \
code; \
} \
}
+// define some common debug actions
+// usage example: DBUG_EXECUTE_IF("xxx", DBUG_BLOCK);
+#define DBUG_BLOCK \
+ { \
+ LOG(INFO) << "start debug block " << DP_NAME; \
+ while (DebugPoints::instance()->is_enable(DP_NAME)) { \
+ std::this_thread::sleep_for(std::chrono::milliseconds(10)); \
+ } \
+ LOG(INFO) << "end debug block " << DP_NAME; \
+ }
+
namespace doris {
struct DebugPoint {
diff --git
a/regression-test/data/migrate_p0/test_migrate_disk_with_publish_version.out
b/regression-test/data/migrate_p0/test_migrate_disk_with_publish_version.out
new file mode 100644
index 00000000000..004ada24cbc
--- /dev/null
+++ b/regression-test/data/migrate_p0/test_migrate_disk_with_publish_version.out
@@ -0,0 +1,11 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !select_1 --
+1 10
+
+-- !select_2 --
+1 10
+
+-- !select_3 --
+1 10
+2 20
+
diff --git
a/regression-test/suites/migrate_p0/test_migrate_disk_with_publish_version.groovy
b/regression-test/suites/migrate_p0/test_migrate_disk_with_publish_version.groovy
new file mode 100644
index 00000000000..e5b22b791af
--- /dev/null
+++
b/regression-test/suites/migrate_p0/test_migrate_disk_with_publish_version.groovy
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+import org.apache.doris.regression.util.NodeType
+
+suite('test_migrate_disk_with_publish_version') {
+ def checkTabletOnHDD = { isOnHdd ->
+ sleep 5000
+
+ def targetPathHash = [] as Set
+ sql_return_maparray("SHOW PROC '/backends'").each {
+ def paths = sql_return_maparray("SHOW PROC
'/backends/${it.BackendId}'")
+ for (def path : paths) {
+ if (path.RootPath.endsWith(isOnHdd ? 'HDD' : 'SSD')) {
+ targetPathHash.add(path.PathHash)
+ }
+ }
+ }
+
+ def tablets = sql_return_maparray 'SHOW TABLETS FROM tbl'
+ tablets.each {
+ assertTrue(it.PathHash in targetPathHash, "tablet path hash
${it.PathHash} not in ${targetPathHash}")
+ }
+ }
+
+ def options = new ClusterOptions()
+ options.enableDebugPoints()
+ options.beConfigs += [
+ 'report_random_wait=false',
+ 'report_tablet_interval_seconds=1',
+ 'report_disk_state_interval_seconds=1'
+ ]
+ options.beDisks = ['HDD=1', 'SSD=1' ]
+ docker(options) {
+ cluster.checkBeIsAlive(1, true)
+ cluster.checkBeIsAlive(2, true)
+ cluster.checkBeIsAlive(3, true)
+ sleep 2000
+
+ sql 'SET GLOBAL insert_visible_timeout_ms = 2000'
+ sql "ADMIN SET FRONTEND CONFIG ('agent_task_resend_wait_time_ms' =
'1000')"
+
+ sql 'CREATE TABLE tbl (k1 INT, k2 INT) DISTRIBUTED BY HASH(k1) BUCKETS
1'
+ sql 'INSERT INTO tbl VALUES (1, 10)'
+
+ checkTabletOnHDD true
+
+ // add debug point, txn will block
+ cluster.injectDebugPoints(NodeType.FE,
['PublishVersionDaemon.stop_publish':null])
+ cluster.injectDebugPoints(NodeType.BE,
['EnginePublishVersionTask.handle.block_add_rowsets':null])
+ sql 'INSERT INTO tbl VALUES (2, 20)'
+
+ sql "ALTER TABLE tbl MODIFY PARTITION(*) SET ( 'storage_medium' =
'ssd' )"
+ // tablet has unfinished txn, it couldn't migrate among disks
+ checkTabletOnHDD true
+
+ order_qt_select_1 'SELECT * FROM tbl'
+
+ cluster.clearFrontendDebugPoints()
+ // tablet finished all txns, but publish thread hold the migrate lock,
migrate will failed
+ checkTabletOnHDD true
+ order_qt_select_2 'SELECT * FROM tbl'
+
+ cluster.clearBackendDebugPoints()
+ // tablet finished all txns, and publish thread not hold migrate lock,
migrate should succ
+ checkTabletOnHDD false
+ order_qt_select_3 'SELECT * FROM tbl'
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]