This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-1.2-lts in repository https://gitbox.apache.org/repos/asf/doris.git
commit 783c7d38658ad5169868a2d8028b23f2f73da07a Author: plat1ko <[email protected]> AuthorDate: Wed Feb 22 16:21:11 2023 +0800 [fix](replica) Fix inconsistent replica id between BE and FE in corner case of tablet rebalance (#16889) --- be/src/olap/tablet_manager.cpp | 14 +++++--------- be/src/olap/tablet_manager.h | 3 ++- be/src/olap/task/engine_clone_task.cpp | 17 +++++++++++++---- .../java/org/apache/doris/master/ReportHandler.java | 9 +++++++-- 4 files changed, 27 insertions(+), 16 deletions(-) diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp index 9d111dd697..36b0eb64a9 100644 --- a/be/src/olap/tablet_manager.cpp +++ b/be/src/olap/tablet_manager.cpp @@ -436,8 +436,7 @@ Status TabletManager::drop_tablet(TTabletId tablet_id, TReplicaId replica_id, auto& shard = _get_tablets_shard(tablet_id); std::lock_guard wrlock(shard.lock); if (shard.tablets_under_clone.count(tablet_id) > 0) { - LOG(INFO) << "tablet " << tablet_id << " is under clone, skip drop task"; - return Status::Aborted("aborted"); + return Status::Aborted("tablet {} is under clone, skip drop task", tablet_id); } SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); return _drop_tablet_unlocked(tablet_id, replica_id, false, is_drop_table_or_partition); @@ -459,12 +458,9 @@ Status TabletManager::_drop_tablet_unlocked(TTabletId tablet_id, TReplicaId repl // We should compare replica id to avoid dropping new cloned tablet. // Iff request replica id is 0, FE may be an older release, then we drop this tablet as before. if (to_drop_tablet->replica_id() != replica_id && replica_id != 0) { - LOG(WARNING) << "fail to drop tablet because replica id not match. " - << "tablet_id=" << tablet_id << ", replica_id=" << to_drop_tablet->replica_id() - << ", request replica_id=" << replica_id; - return Status::Aborted("aborted"); + return Status::Aborted("replica_id not match({} vs {})", to_drop_tablet->replica_id(), + replica_id); } - _remove_tablet_from_partition(to_drop_tablet); tablet_map_t& tablet_map = _get_tablet_map(tablet_id); tablet_map.erase(tablet_id); @@ -1043,10 +1039,10 @@ Status TabletManager::start_trash_sweep() { return Status::OK(); } // start_trash_sweep -void TabletManager::register_clone_tablet(int64_t tablet_id) { +bool TabletManager::register_clone_tablet(int64_t tablet_id) { tablets_shard& shard = _get_tablets_shard(tablet_id); std::lock_guard<std::shared_mutex> wrlock(shard.lock); - shard.tablets_under_clone.insert(tablet_id); + return shard.tablets_under_clone.insert(tablet_id).second; } void TabletManager::unregister_clone_tablet(int64_t tablet_id) { diff --git a/be/src/olap/tablet_manager.h b/be/src/olap/tablet_manager.h index bbb298cae2..b01a5141d5 100644 --- a/be/src/olap/tablet_manager.h +++ b/be/src/olap/tablet_manager.h @@ -133,7 +133,8 @@ public: void obtain_specific_quantity_tablets(std::vector<TabletInfo>& tablets_info, int64_t num); - void register_clone_tablet(int64_t tablet_id); + // return `true` if register success + bool register_clone_tablet(int64_t tablet_id); void unregister_clone_tablet(int64_t tablet_id); void get_tablets_distribution_on_different_disks( diff --git a/be/src/olap/task/engine_clone_task.cpp b/be/src/olap/task/engine_clone_task.cpp index 4bb86501b0..7550366412 100644 --- a/be/src/olap/task/engine_clone_task.cpp +++ b/be/src/olap/task/engine_clone_task.cpp @@ -63,7 +63,9 @@ EngineCloneTask::EngineCloneTask(const TCloneReq& clone_req, const TMasterInfo& Status EngineCloneTask::execute() { // register the tablet to avoid it is deleted by gc thread during clone process SCOPED_ATTACH_TASK(_mem_tracker); - StorageEngine::instance()->tablet_manager()->register_clone_tablet(_clone_req.tablet_id); + if (StorageEngine::instance()->tablet_manager()->register_clone_tablet(_clone_req.tablet_id)) { + return Status::InternalError("tablet {} is under clone", _clone_req.tablet_id); + } Status st = _do_clone(); StorageEngine::instance()->tablet_manager()->unregister_clone_tablet(_clone_req.tablet_id); return st; @@ -81,6 +83,13 @@ Status EngineCloneTask::_do_clone() { std::vector<Version> missed_versions; // try to repair a tablet with missing version if (tablet != nullptr) { + if (tablet->replica_id() != _clone_req.replica_id) { + // `tablet` may be a dropped replica in FE, e.g: BE1 migrates replica of tablet_1 to BE2, + // but before BE1 drop this replica, another new replica of tablet_1 is migrated to BE1. + // If we allow to clone success on dropped replica, replica id may never be consistent between FE and BE. + return Status::InternalError("replica_id not match({} vs {})", tablet->replica_id(), + _clone_req.replica_id); + } std::shared_lock migration_rlock(tablet->get_migration_lock(), std::try_to_lock); if (!migration_rlock.owns_lock()) { return Status::OLAPInternalError(OLAP_ERR_RWLOCK_ERROR); @@ -96,7 +105,7 @@ Status EngineCloneTask::_do_clone() { // completed. Or remote be will just return header not the rowset files. clone will failed. if (missed_versions.empty()) { LOG(INFO) << "missed version size = 0, skip clone and return success. tablet_id=" - << _clone_req.tablet_id; + << _clone_req.tablet_id << " replica_id=" << _clone_req.replica_id; _set_tablet_info(is_new_tablet); return Status::OK(); } @@ -104,7 +113,8 @@ Status EngineCloneTask::_do_clone() { LOG(INFO) << "clone to existed tablet. missed_versions_size=" << missed_versions.size() << ", allow_incremental_clone=" << allow_incremental_clone << ", signature=" << _signature << ", tablet_id=" << _clone_req.tablet_id - << ", committed_version=" << _clone_req.committed_version; + << ", committed_version=" << _clone_req.committed_version + << ", replica_id=" << _clone_req.replica_id; // try to download missing version from src backend. // if tablet on src backend does not contains missing version, it will download all versions, @@ -112,7 +122,6 @@ Status EngineCloneTask::_do_clone() { RETURN_IF_ERROR(_make_and_download_snapshots(*(tablet->data_dir()), local_data_path, &src_host, &src_file_path, missed_versions, &allow_incremental_clone)); - RETURN_IF_ERROR(_finish_clone(tablet.get(), local_data_path, _clone_req.committed_version, allow_incremental_clone)); } else { diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java index a38b91ff78..6741e3bfcf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java @@ -921,6 +921,12 @@ public class ReportHandler extends Daemon { throw new MetaNotFoundException("tablet[" + tabletId + "] does not exist"); } + // check replica id + long replicaId = backendTabletInfo.getReplicaId(); + if (replicaId <= 0) { + throw new MetaNotFoundException("replica id is invalid, tablet id: " + tabletId); + } + long visibleVersion = partition.getVisibleVersion(); // check replica version @@ -964,8 +970,7 @@ public class ReportHandler extends Daemon { } else if (version < partition.getCommittedVersion()) { lastFailedVersion = partition.getCommittedVersion(); } - - long replicaId = Env.getCurrentEnv().getNextId(); + // use replicaId reported by BE to maintain replica meta consistent between FE and BE Replica replica = new Replica(replicaId, backendId, version, schemaHash, dataSize, remoteDataSize, rowCount, ReplicaState.NORMAL, lastFailedVersion, version); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
