This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 3b9394a8c79 [improvement](tablet scheduler) Adjust tablet sched
priority to help load data succ #38528 (#38884)
3b9394a8c79 is described below
commit 3b9394a8c7913f63fff47955196eadc02f5a292e
Author: yujun <[email protected]>
AuthorDate: Tue Aug 6 02:13:47 2024 +0800
[improvement](tablet scheduler) Adjust tablet sched priority to help load
data succ #38528 (#38884)
cherry pick from #38528
---
.../main/java/org/apache/doris/common/Config.java | 13 +
.../java/org/apache/doris/catalog/OlapTable.java | 16 +-
.../main/java/org/apache/doris/catalog/Tablet.java | 234 ++++++++++---
.../clone/ColocateTableCheckerAndBalancer.java | 18 +-
.../java/org/apache/doris/clone/TabletChecker.java | 19 +-
.../org/apache/doris/clone/TabletSchedCtx.java | 85 +++--
.../org/apache/doris/clone/TabletScheduler.java | 164 ++++++----
.../doris/common/proc/TabletHealthProcDir.java | 12 +-
.../common/proc/TabletSchedulerDetailProcDir.java | 3 +-
.../org/apache/doris/master/ReportHandler.java | 19 +-
.../plans/commands/insert/OlapInsertExecutor.java | 2 +-
.../org/apache/doris/planner/OlapTableSink.java | 11 +-
.../doris/transaction/DatabaseTransactionMgr.java | 9 +
.../java/org/apache/doris/catalog/TabletTest.java | 17 +-
.../org/apache/doris/clone/TabletHealthTest.java | 361 +++++++++++++++++++++
.../doris/cluster/DecommissionBackendTest.java | 7 +-
.../apache/doris/utframe/TestWithFeService.java | 56 ++--
17 files changed, 841 insertions(+), 205 deletions(-)
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index d55ac52ebfd..e26f52413db 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -1011,6 +1011,19 @@ public class Config extends ConfigBase {
@ConfField(mutable = true, masterOnly = true)
public static int tablet_further_repair_max_times = 5;
+ /**
+ * if tablet loaded txn failed recently, it will get higher priority to
repair.
+ */
+ @ConfField(mutable = true, masterOnly = true)
+ public static long tablet_recent_load_failed_second = 30 * 60;
+
+ /**
+ * base time for higher tablet scheduler task,
+ * set this config value bigger if want the high priority effect last
longer.
+ */
+ @ConfField(mutable = true, masterOnly = true)
+ public static long tablet_schedule_high_priority_second = 30 * 60;
+
/**
* the default slot number per path for hdd in tablet scheduler
* TODO(cmy): remove this config and dynamically adjust it by clone task
statistic
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
index 12550899a46..cfc57a98991 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -34,7 +34,6 @@ import org.apache.doris.catalog.MaterializedIndex.IndexState;
import org.apache.doris.catalog.Partition.PartitionState;
import org.apache.doris.catalog.Replica.ReplicaState;
import org.apache.doris.catalog.Tablet.TabletStatus;
-import org.apache.doris.clone.TabletSchedCtx;
import org.apache.doris.clone.TabletScheduler;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
@@ -1841,11 +1840,11 @@ public class OlapTable extends Table implements
MTMVRelatedTableIf {
return false;
}
- Pair<TabletStatus, TabletSchedCtx.Priority> statusPair =
tablet.getHealthStatusWithPriority(
- infoService, visibleVersion, replicaAlloc,
aliveBeIds);
- if (statusPair.first != TabletStatus.HEALTHY) {
+ TabletStatus status = tablet.getHealth(infoService,
visibleVersion,
+ replicaAlloc, aliveBeIds).status;
+ if (status != TabletStatus.HEALTHY) {
LOG.info("table {} is not stable because tablet {}
status is {}. replicas: {}",
- id, tablet.getId(), statusPair.first,
tablet.getReplicas());
+ id, tablet.getId(), status,
tablet.getReplicas());
return false;
}
}
@@ -2482,6 +2481,10 @@ public class OlapTable extends Table implements
MTMVRelatedTableIf {
return tableProperty.getEnableUniqueKeyMergeOnWrite();
}
+ public boolean isUniqKeyMergeOnWrite() {
+ return getKeysType() == KeysType.UNIQUE_KEYS &&
getEnableUniqueKeyMergeOnWrite();
+ }
+
public boolean isDuplicateWithoutKey() {
return getKeysType() == KeysType.DUP_KEYS && getKeysNum() == 0;
}
@@ -2573,8 +2576,7 @@ public class OlapTable extends Table implements
MTMVRelatedTableIf {
}
public boolean isDupKeysOrMergeOnWrite() {
- return keysType == KeysType.DUP_KEYS
- || (keysType == KeysType.UNIQUE_KEYS &&
getEnableUniqueKeyMergeOnWrite());
+ return keysType == KeysType.DUP_KEYS || isUniqKeyMergeOnWrite();
}
public void initAutoIncrementGenerator(long dbId) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
index 08c89e14c37..9714ef15719 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
@@ -81,6 +81,36 @@ public class Tablet extends MetaObject implements Writable {
REPLICA_COMPACTION_TOO_SLOW // one replica's version count is much
more than other replicas;
}
+ public static class TabletHealth {
+ public TabletStatus status;
+ public TabletSchedCtx.Priority priority;
+
+ // num of alive replica with version complete
+ public int aliveAndVersionCompleteNum;
+
+ // NEED_FURTHER_REPAIR replica id
+ public long needFurtherRepairReplicaId;
+
+ // has alive replica with version incomplete, prior to repair these
replica
+ public boolean hasAliveAndVersionIncomplete;
+
+ // this tablet recent write failed, then increase its sched priority
+ public boolean hasRecentLoadFailed;
+
+ // this tablet want to add new replica, but not found target backend.
+ public boolean noPathForNewReplica;
+
+ public TabletHealth() {
+ status = null; // don't set for balance task
+ priority = TabletSchedCtx.Priority.NORMAL;
+ aliveAndVersionCompleteNum = 0;
+ needFurtherRepairReplicaId = -1L;
+ hasAliveAndVersionIncomplete = false;
+ hasRecentLoadFailed = false;
+ noPathForNewReplica = false;
+ }
+ }
+
@SerializedName(value = "id")
private long id;
@SerializedName(value = "replicas")
@@ -104,6 +134,16 @@ public class Tablet extends MetaObject implements Writable
{
// no need to persist
private long lastStatusCheckTime = -1;
+ // last time for load data fail
+ private long lastLoadFailedTime = -1;
+
+ // if tablet want to add a new replica, but cann't found any backend to
locate the new replica.
+ // then mark this tablet. For later repair, even try and try to repair
this tablet, sched will always fail.
+ // For example, 1 tablet contains 3 replicas, if 1 backend is dead, then
tablet's healthy status
+ // is REPLICA_MISSING. But since no other backend can held the new
replica, then sched always fail.
+ // So don't increase this tablet's sched priority if it has no path for
new replica.
+ private long lastTimeNoPathForNewReplica = -1;
+
public Tablet() {
this(0L, new ArrayList<>());
}
@@ -466,10 +506,8 @@ public class Tablet extends MetaObject implements Writable
{
* 1. healthy replica num is equal to replicationNum
* 2. all healthy replicas are in right tag
*/
- public Pair<TabletStatus, TabletSchedCtx.Priority>
getHealthStatusWithPriority(SystemInfoService systemInfoService,
+ public TabletHealth getHealth(SystemInfoService systemInfoService,
long visibleVersion, ReplicaAllocation replicaAlloc, List<Long>
aliveBeIds) {
-
-
Map<Tag, Short> allocMap = replicaAlloc.getAllocMap();
Map<Tag, Short> stableAllocMap = Maps.newHashMap();
Map<Tag, Short> stableVersionCompleteAllocMap = Maps.newHashMap();
@@ -480,16 +518,12 @@ public class Tablet extends MetaObject implements
Writable {
int stable = 0;
Replica needFurtherRepairReplica = null;
+ boolean hasAliveAndVersionIncomplete = false;
Set<String> hosts = Sets.newHashSet();
ArrayList<Long> versions = new ArrayList<>();
for (Replica replica : replicas) {
Backend backend =
systemInfoService.getBackend(replica.getBackendId());
- if (backend == null || !backend.isAlive() || !replica.isAlive()
- || checkHost(hosts, backend) || replica.tooSlow() ||
!backend.isMixNode()) {
- // this replica is not alive,
- // or if this replica is on same host with another replica, we
also treat it as 'dead',
- // so that Tablet Scheduler will create a new replica on
different host.
- // ATTN: Replicas on same host is a bug of previous Doris
version, so we fix it by this way.
+ if (!isReplicaAndBackendAlive(replica, backend, hosts)) {
continue;
}
@@ -514,13 +548,30 @@ public class Tablet extends MetaObject implements
Writable {
allocNum =
stableVersionCompleteAllocMap.getOrDefault(backend.getLocationTag(), (short) 0);
stableVersionCompleteAllocMap.put(backend.getLocationTag(), (short) (allocNum +
1));
+ } else {
+ hasAliveAndVersionIncomplete = true;
}
}
}
+ TabletHealth tabletHealth = new TabletHealth();
+ initTabletHealth(tabletHealth);
+ tabletHealth.aliveAndVersionCompleteNum = aliveAndVersionComplete;
+ tabletHealth.hasAliveAndVersionIncomplete =
hasAliveAndVersionIncomplete;
+ if (needFurtherRepairReplica != null) {
+ tabletHealth.needFurtherRepairReplicaId =
needFurtherRepairReplica.getId();
+ }
+
// 0. We can not choose a good replica as src to repair this tablet.
if (aliveAndVersionComplete == 0) {
- return Pair.of(TabletStatus.UNRECOVERABLE, Priority.VERY_HIGH);
+ tabletHealth.status = TabletStatus.UNRECOVERABLE;
+ return tabletHealth;
+ } else if (aliveAndVersionComplete < replicationNum &&
hasAliveAndVersionIncomplete) {
+ // not enough good replica, and there exists schedule available
replicas and version incomplete,
+ // no matter whether they tag is proper right, fix them
immediately.
+ tabletHealth.status = TabletStatus.VERSION_INCOMPLETE;
+ tabletHealth.priority = TabletSchedCtx.Priority.VERY_HIGH;
+ return tabletHealth;
}
// 1. alive replicas are not enough
@@ -536,24 +587,32 @@ public class Tablet extends MetaObject implements
Writable {
// 3. aliveBackendsNum >= replicationNum: make sure after deleting,
// there will be at least one backend for new replica.
// 4. replicationNum > 1: if replication num is set to 1, do not
delete any replica, for safety reason
- return Pair.of(TabletStatus.FORCE_REDUNDANT,
TabletSchedCtx.Priority.VERY_HIGH);
- } else if (alive < (replicationNum / 2) + 1) {
- return Pair.of(TabletStatus.REPLICA_MISSING,
TabletSchedCtx.Priority.HIGH);
+ tabletHealth.status = TabletStatus.FORCE_REDUNDANT;
+ tabletHealth.priority = TabletSchedCtx.Priority.VERY_HIGH;
+ return tabletHealth;
} else if (alive < replicationNum) {
- return Pair.of(TabletStatus.REPLICA_MISSING,
TabletSchedCtx.Priority.NORMAL);
+ tabletHealth.status = TabletStatus.REPLICA_MISSING;
+ tabletHealth.priority = alive < (replicationNum / 2) + 1 ?
TabletSchedCtx.Priority.VERY_HIGH
+ : TabletSchedCtx.Priority.NORMAL;
+ return tabletHealth;
}
// 2. version complete replicas are not enough
- if (aliveAndVersionComplete < (replicationNum / 2) + 1) {
- return Pair.of(TabletStatus.VERSION_INCOMPLETE,
TabletSchedCtx.Priority.HIGH);
- } else if (aliveAndVersionComplete < replicationNum) {
- return Pair.of(TabletStatus.VERSION_INCOMPLETE,
TabletSchedCtx.Priority.NORMAL);
+ if (aliveAndVersionComplete < replicationNum) {
+ tabletHealth.status = TabletStatus.VERSION_INCOMPLETE;
+ tabletHealth.priority = alive < (replicationNum / 2) + 1 ?
TabletSchedCtx.Priority.HIGH
+ : TabletSchedCtx.Priority.NORMAL;
+ return tabletHealth;
} else if (aliveAndVersionComplete > replicationNum) {
if (needFurtherRepairReplica != null) {
- return Pair.of(TabletStatus.NEED_FURTHER_REPAIR,
TabletSchedCtx.Priority.HIGH);
+ tabletHealth.status = TabletStatus.NEED_FURTHER_REPAIR;
+ tabletHealth.priority = TabletSchedCtx.Priority.HIGH;
+ } else {
+ // we set REDUNDANT as VERY_HIGH, because delete redundant
replicas can free the space quickly.
+ tabletHealth.status = TabletStatus.REDUNDANT;
+ tabletHealth.priority = TabletSchedCtx.Priority.VERY_HIGH;
}
- // we set REDUNDANT as VERY_HIGH, because delete redundant
replicas can free the space quickly.
- return Pair.of(TabletStatus.REDUNDANT,
TabletSchedCtx.Priority.VERY_HIGH);
+ return tabletHealth;
}
// 3. replica is under relocating
@@ -564,14 +623,17 @@ public class Tablet extends MetaObject implements
Writable {
if (replicaBeIds.containsAll(availableBeIds)
&& availableBeIds.size() >= replicationNum
&& replicationNum > 1) { // No BE can be choose to create
a new replica
- return Pair.of(TabletStatus.FORCE_REDUNDANT,
- stable < (replicationNum / 2) + 1
- ? TabletSchedCtx.Priority.NORMAL :
TabletSchedCtx.Priority.LOW);
+ tabletHealth.status = TabletStatus.FORCE_REDUNDANT;
+ tabletHealth.priority = stable < (replicationNum / 2) + 1
+ ? TabletSchedCtx.Priority.NORMAL :
TabletSchedCtx.Priority.LOW;
+ return tabletHealth;
}
- if (stable < (replicationNum / 2) + 1) {
- return Pair.of(TabletStatus.REPLICA_RELOCATING,
TabletSchedCtx.Priority.NORMAL);
- } else if (stable < replicationNum) {
- return Pair.of(TabletStatus.REPLICA_RELOCATING,
TabletSchedCtx.Priority.LOW);
+
+ if (stable < replicationNum) {
+ tabletHealth.status = TabletStatus.REPLICA_RELOCATING;
+ tabletHealth.priority = stable < (replicationNum / 2) + 1 ?
TabletSchedCtx.Priority.NORMAL
+ : TabletSchedCtx.Priority.LOW;
+ return tabletHealth;
}
}
@@ -579,19 +641,25 @@ public class Tablet extends MetaObject implements
Writable {
for (Map.Entry<Tag, Short> alloc : allocMap.entrySet()) {
if (stableVersionCompleteAllocMap.getOrDefault(alloc.getKey(),
(short) 0) < alloc.getValue()) {
if (stableAllocMap.getOrDefault(alloc.getKey(), (short) 0) >=
alloc.getValue()) {
- return Pair.of(TabletStatus.VERSION_INCOMPLETE,
TabletSchedCtx.Priority.NORMAL);
+ tabletHealth.status = TabletStatus.VERSION_INCOMPLETE;
} else {
- return Pair.of(TabletStatus.REPLICA_MISSING_FOR_TAG,
TabletSchedCtx.Priority.NORMAL);
+ tabletHealth.status = TabletStatus.REPLICA_MISSING_FOR_TAG;
}
+ tabletHealth.priority = TabletSchedCtx.Priority.NORMAL;
+ return tabletHealth;
}
}
if (replicas.size() > replicationNum) {
if (needFurtherRepairReplica != null) {
- return Pair.of(TabletStatus.NEED_FURTHER_REPAIR,
TabletSchedCtx.Priority.HIGH);
+ tabletHealth.status = TabletStatus.NEED_FURTHER_REPAIR;
+ tabletHealth.priority = TabletSchedCtx.Priority.HIGH;
+ } else {
+ // we set REDUNDANT as VERY_HIGH, because delete redundant
replicas can free the space quickly.
+ tabletHealth.status = TabletStatus.REDUNDANT;
+ tabletHealth.priority = TabletSchedCtx.Priority.VERY_HIGH;
}
- // we set REDUNDANT as VERY_HIGH, because delete redundant
replicas can free the space quickly.
- return Pair.of(TabletStatus.REDUNDANT,
TabletSchedCtx.Priority.VERY_HIGH);
+ return tabletHealth;
}
// 5. find a replica's version count is much more than others, and
drop it
@@ -603,12 +671,36 @@ public class Tablet extends MetaObject implements
Writable {
double ratio = (double) delta / versions.get(versions.size() - 1);
if (versions.get(versions.size() - 1) >=
Config.min_version_count_indicate_replica_compaction_too_slow
&& ratio >
Config.valid_version_count_delta_ratio_between_replicas) {
- return Pair.of(TabletStatus.REPLICA_COMPACTION_TOO_SLOW,
Priority.HIGH);
+ tabletHealth.status = TabletStatus.REPLICA_COMPACTION_TOO_SLOW;
+ tabletHealth.priority = Priority.HIGH;
+ return tabletHealth;
}
}
// 6. healthy
- return Pair.of(TabletStatus.HEALTHY, TabletSchedCtx.Priority.NORMAL);
+ tabletHealth.status = TabletStatus.HEALTHY;
+ tabletHealth.priority = TabletSchedCtx.Priority.NORMAL;
+
+ return tabletHealth;
+ }
+
+ private void initTabletHealth(TabletHealth tabletHealth) {
+ long endTime = System.currentTimeMillis() -
Config.tablet_recent_load_failed_second * 1000L;
+ tabletHealth.hasRecentLoadFailed = lastLoadFailedTime > endTime;
+ tabletHealth.noPathForNewReplica = lastTimeNoPathForNewReplica >
endTime;
+ }
+
+ private boolean isReplicaAndBackendAlive(Replica replica, Backend backend,
Set<String> hosts) {
+ if (backend == null || !backend.isAlive() || !replica.isAlive()
+ || checkHost(hosts, backend) || replica.tooSlow() ||
!backend.isMixNode()) {
+ // this replica is not alive,
+ // or if this replica is on same host with another replica, we
also treat it as 'dead',
+ // so that Tablet Scheduler will create a new replica on different
host.
+ // ATTN: Replicas on same host is a bug of previous Doris version,
so we fix it by this way.
+ return false;
+ } else {
+ return true;
+ }
}
private boolean checkHost(Set<String> hosts, Backend backend) {
@@ -637,8 +729,49 @@ public class Tablet extends MetaObject implements Writable
{
* No need to check if backend is available. We consider all backends in
'backendsSet' are available,
* If not, unavailable backends will be relocated by CalocateTableBalancer
first.
*/
- public TabletStatus getColocateHealthStatus(long visibleVersion,
+ public TabletHealth getColocateHealth(long visibleVersion,
ReplicaAllocation replicaAlloc, Set<Long> backendsSet) {
+ SystemInfoService systemInfoService = Env.getCurrentSystemInfo();
+ short replicationNum = replicaAlloc.getTotalReplicaNum();
+ boolean hasAliveAndVersionIncomplete = false;
+ int aliveAndVersionComplete = 0;
+ Set<String> hosts = Sets.newHashSet();
+ for (Replica replica : replicas) {
+ Backend backend =
systemInfoService.getBackend(replica.getBackendId());
+ if (!isReplicaAndBackendAlive(replica, backend, hosts)) {
+ continue;
+ }
+
+ boolean versionCompleted = replica.getLastFailedVersion() < 0 &&
replica.getVersion() >= visibleVersion;
+ if (versionCompleted) {
+ aliveAndVersionComplete++;
+ }
+
+ if (replica.isScheduleAvailable()) {
+ if (!versionCompleted) {
+ hasAliveAndVersionIncomplete = true;
+ }
+ }
+ }
+
+ TabletHealth tabletHealth = new TabletHealth();
+ initTabletHealth(tabletHealth);
+ tabletHealth.aliveAndVersionCompleteNum = aliveAndVersionComplete;
+ tabletHealth.hasAliveAndVersionIncomplete =
hasAliveAndVersionIncomplete;
+ tabletHealth.priority = TabletSchedCtx.Priority.NORMAL;
+
+ // 0. We can not choose a good replica as src to repair this tablet.
+ if (aliveAndVersionComplete == 0) {
+ tabletHealth.status = TabletStatus.UNRECOVERABLE;
+ return tabletHealth;
+ } else if (aliveAndVersionComplete < replicationNum &&
hasAliveAndVersionIncomplete) {
+ // not enough good replica, and there exists schedule available
replicas and version incomplete,
+ // no matter whether they tag is proper right, fix them
immediately.
+ tabletHealth.status = TabletStatus.VERSION_INCOMPLETE;
+ tabletHealth.priority = TabletSchedCtx.Priority.VERY_HIGH;
+ return tabletHealth;
+ }
+
// Here we don't need to care about tag. Because the replicas of the
colocate table has been confirmed
// in ColocateTableCheckerAndBalancer.
Short totalReplicaNum = replicaAlloc.getTotalReplicaNum();
@@ -647,7 +780,8 @@ public class Tablet extends MetaObject implements Writable {
// Because if the following check doesn't pass, the
COLOCATE_MISMATCH will return.
Set<Long> replicaBackendIds = getBackendIds();
if (!replicaBackendIds.containsAll(backendsSet)) {
- return TabletStatus.COLOCATE_MISMATCH;
+ tabletHealth.status = TabletStatus.COLOCATE_MISMATCH;
+ return tabletHealth;
}
// 2. check version completeness
@@ -663,27 +797,31 @@ public class Tablet extends MetaObject implements
Writable {
if (replica.isBad()) {
// If this replica is bad but located on one of
backendsSet,
// we have drop it first, or we can find any other BE for
new replica.
- return TabletStatus.COLOCATE_REDUNDANT;
+ tabletHealth.status = TabletStatus.COLOCATE_REDUNDANT;
} else {
// maybe in replica's DECOMMISSION state
// Here we return VERSION_INCOMPLETE,
// and the tablet scheduler will finally set it's state to
NORMAL.
- return TabletStatus.VERSION_INCOMPLETE;
+ tabletHealth.status = TabletStatus.VERSION_INCOMPLETE;
}
+ return tabletHealth;
}
if (replica.getLastFailedVersion() > 0 || replica.getVersion() <
visibleVersion) {
// this replica is alive but version incomplete
- return TabletStatus.VERSION_INCOMPLETE;
+ tabletHealth.status = TabletStatus.VERSION_INCOMPLETE;
+ return tabletHealth;
}
}
// 3. check redundant
if (replicas.size() > totalReplicaNum) {
- return TabletStatus.COLOCATE_REDUNDANT;
+ tabletHealth.status = TabletStatus.COLOCATE_REDUNDANT;
+ return tabletHealth;
}
- return TabletStatus.HEALTHY;
+ tabletHealth.status = TabletStatus.HEALTHY;
+ return tabletHealth;
}
/**
@@ -744,4 +882,16 @@ public class Tablet extends MetaObject implements Writable
{
public void setLastStatusCheckTime(long lastStatusCheckTime) {
this.lastStatusCheckTime = lastStatusCheckTime;
}
+
+ public long getLastLoadFailedTime() {
+ return lastLoadFailedTime;
+ }
+
+ public void setLastLoadFailedTime(long lastLoadFailedTime) {
+ this.lastLoadFailedTime = lastLoadFailedTime;
+ }
+
+ public void setLastTimeNoPathForNewReplica(long
lastTimeNoPathForNewReplica) {
+ this.lastTimeNoPathForNewReplica = lastTimeNoPathForNewReplica;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java
b/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java
index 292013ec05a..4febec9e922 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java
@@ -28,6 +28,7 @@ import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.ReplicaAllocation;
import org.apache.doris.catalog.Tablet;
+import org.apache.doris.catalog.Tablet.TabletHealth;
import org.apache.doris.catalog.Tablet.TabletStatus;
import org.apache.doris.clone.TabletChecker.CheckerCounter;
import org.apache.doris.clone.TabletSchedCtx.Priority;
@@ -334,7 +335,7 @@ public class ColocateTableCheckerAndBalancer extends
MasterDaemon {
* tablet to TabletScheduler.
* Otherwise, mark the group as stable
*/
- protected void runAfterCatalogReady() {
+ public void runAfterCatalogReady() {
relocateAndBalanceGroups();
matchGroups();
}
@@ -512,6 +513,7 @@ public class ColocateTableCheckerAndBalancer extends
MasterDaemon {
if (olapTable == null ||
!colocateIndex.isColocateTable(olapTable.getId())) {
continue;
}
+ boolean isUniqKeyMergeOnWrite =
olapTable.isUniqKeyMergeOnWrite();
olapTable.readLock();
try {
for (Partition partition : olapTable.getPartitions()) {
@@ -530,16 +532,20 @@ public class ColocateTableCheckerAndBalancer extends
MasterDaemon {
Preconditions.checkState(bucketsSeq.size() ==
replicationNum,
bucketsSeq.size() + " vs. " +
replicationNum);
Tablet tablet = index.getTablet(tabletId);
- TabletStatus st =
tablet.getColocateHealthStatus(
+ TabletHealth tabletHealth =
tablet.getColocateHealth(
visibleVersion, replicaAlloc,
bucketsSeq);
- if (st != TabletStatus.HEALTHY) {
+ if (tabletHealth.status !=
TabletStatus.HEALTHY) {
counter.unhealthyTabletNum++;
unstableReason = String.format("get
unhealthy tablet %d in colocate table."
- + " status: %s", tablet.getId(),
st);
+ + " status: %s", tablet.getId(),
tabletHealth.status);
if (LOG.isDebugEnabled()) {
LOG.debug(unstableReason);
}
+ if (tabletHealth.status ==
TabletStatus.UNRECOVERABLE) {
+ continue;
+ }
+
if (!tablet.readyToBeRepaired(infoService,
Priority.NORMAL)) {
counter.tabletNotReady++;
continue;
@@ -550,9 +556,9 @@ public class ColocateTableCheckerAndBalancer extends
MasterDaemon {
db.getId(), tableId,
partition.getId(), index.getId(), tablet.getId(),
replicaAlloc,
System.currentTimeMillis());
// the tablet status will be set again
when being scheduled
- tabletCtx.setTabletStatus(st);
- tabletCtx.setPriority(Priority.NORMAL);
+ tabletCtx.setTabletHealth(tabletHealth);
tabletCtx.setTabletOrderIdx(idx);
+
tabletCtx.setIsUniqKeyMergeOnWrite(isUniqKeyMergeOnWrite);
AddResult res =
tabletScheduler.addTablet(tabletCtx, false /* not force */);
if (res == AddResult.LIMIT_EXCEED || res
== AddResult.DISABLED) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletChecker.java
b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletChecker.java
index 0a23894ab3e..f35282d37b6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletChecker.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletChecker.java
@@ -30,6 +30,7 @@ import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.Partition.PartitionState;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.Tablet;
+import org.apache.doris.catalog.Tablet.TabletHealth;
import org.apache.doris.catalog.Tablet.TabletStatus;
import org.apache.doris.clone.TabletScheduler.AddResult;
import org.apache.doris.common.Config;
@@ -197,7 +198,7 @@ public class TabletChecker extends MasterDaemon {
* If a tablet is not healthy, a TabletInfo will be created and sent to
TabletScheduler for repairing.
*/
@Override
- protected void runAfterCatalogReady() {
+ public void runAfterCatalogReady() {
int pendingNum = tabletScheduler.getPendingNum();
int runningNum = tabletScheduler.getRunningNum();
if (pendingNum > Config.max_scheduling_tablets
@@ -357,6 +358,7 @@ public class TabletChecker extends MasterDaemon {
return LoopControlStatus.CONTINUE;
}
boolean prioPartIsHealthy = true;
+ boolean isUniqKeyMergeOnWrite = tbl.isUniqKeyMergeOnWrite();
/*
* Tablet in SHADOW index can not be repaired of balanced
*/
@@ -369,26 +371,25 @@ public class TabletChecker extends MasterDaemon {
continue;
}
- Pair<TabletStatus, TabletSchedCtx.Priority> statusWithPrio =
tablet.getHealthStatusWithPriority(
- infoService, partition.getVisibleVersion(),
+ TabletHealth tabletHealth = tablet.getHealth(infoService,
partition.getVisibleVersion(),
tbl.getPartitionInfo().getReplicaAllocation(partition.getId()), aliveBeIds);
- if (statusWithPrio.first == TabletStatus.HEALTHY) {
+ if (tabletHealth.status == TabletStatus.HEALTHY) {
// Only set last status check time when status is healthy.
tablet.setLastStatusCheckTime(startTime);
continue;
- } else if (statusWithPrio.first == TabletStatus.UNRECOVERABLE)
{
+ } else if (tabletHealth.status == TabletStatus.UNRECOVERABLE) {
// This tablet is not recoverable, do not set it into
tablet scheduler
// all UNRECOVERABLE tablet can be seen from "show proc
'/statistic'"
counter.unhealthyTabletNum++;
continue;
} else if (isInPrios) {
- statusWithPrio.second = TabletSchedCtx.Priority.VERY_HIGH;
+ tabletHealth.priority = TabletSchedCtx.Priority.VERY_HIGH;
prioPartIsHealthy = false;
}
counter.unhealthyTabletNum++;
- if (!tablet.readyToBeRepaired(infoService,
statusWithPrio.second)) {
+ if (!tablet.readyToBeRepaired(infoService,
tabletHealth.priority)) {
continue;
}
@@ -399,8 +400,8 @@ public class TabletChecker extends MasterDaemon {
tbl.getPartitionInfo().getReplicaAllocation(partition.getId()),
System.currentTimeMillis());
// the tablet status will be set again when being scheduled
- tabletCtx.setTabletStatus(statusWithPrio.first);
- tabletCtx.setPriority(statusWithPrio.second);
+ tabletCtx.setTabletHealth(tabletHealth);
+ tabletCtx.setIsUniqKeyMergeOnWrite(isUniqKeyMergeOnWrite);
AddResult res = tabletScheduler.addTablet(tabletCtx, false /*
not force */);
if (res == AddResult.LIMIT_EXCEED || res ==
AddResult.DISABLED) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
index e676774afcf..d004d21f79c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
@@ -27,6 +27,7 @@ import org.apache.doris.catalog.Replica.ReplicaState;
import org.apache.doris.catalog.ReplicaAllocation;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.Tablet;
+import org.apache.doris.catalog.Tablet.TabletHealth;
import org.apache.doris.catalog.Tablet.TabletStatus;
import org.apache.doris.clone.SchedException.Status;
import org.apache.doris.clone.SchedException.SubCode;
@@ -134,8 +135,6 @@ public class TabletSchedCtx implements
Comparable<TabletSchedCtx> {
private Type type;
private BalanceType balanceType;
- private Priority priority;
-
// we change the dynamic priority based on how many times it fails to be
scheduled
private int failedSchedCounter = 0;
// clone task failed counter
@@ -161,7 +160,7 @@ public class TabletSchedCtx implements
Comparable<TabletSchedCtx> {
private long taskTimeoutMs = 0;
private State state;
- private TabletStatus tabletStatus;
+ private TabletHealth tabletHealth;
private long decommissionTime = -1;
@@ -213,6 +212,8 @@ public class TabletSchedCtx implements
Comparable<TabletSchedCtx> {
private SubCode schedFailedCode;
+ private boolean isUniqKeyMergeOnWrite = false;
+
public TabletSchedCtx(Type type, long dbId, long tblId, long partId,
long idxId, long tabletId, ReplicaAllocation replicaAlloc, long
createTime) {
this.type = type;
@@ -227,6 +228,7 @@ public class TabletSchedCtx implements
Comparable<TabletSchedCtx> {
this.replicaAlloc = replicaAlloc;
this.balanceType = BalanceType.BE_BALANCE;
this.schedFailedCode = SubCode.NONE;
+ this.tabletHealth = new TabletHealth();
}
public ReplicaAllocation getReplicaAlloc() {
@@ -262,11 +264,19 @@ public class TabletSchedCtx implements
Comparable<TabletSchedCtx> {
}
public Priority getPriority() {
- return priority;
+ return tabletHealth.priority;
}
public void setPriority(Priority priority) {
- this.priority = priority;
+ this.tabletHealth.priority = priority;
+ }
+
+ public void setTabletHealth(TabletHealth tabletHealth) {
+ this.tabletHealth = tabletHealth;
+ }
+
+ public void setIsUniqKeyMergeOnWrite(boolean isUniqKeyMergeOnWrite) {
+ this.isUniqKeyMergeOnWrite = isUniqKeyMergeOnWrite;
}
public int getFinishedCounter() {
@@ -345,11 +355,11 @@ public class TabletSchedCtx implements
Comparable<TabletSchedCtx> {
}
public void setTabletStatus(TabletStatus tabletStatus) {
- this.tabletStatus = tabletStatus;
+ this.tabletHealth.status = tabletStatus;
}
public TabletStatus getTabletStatus() {
- return tabletStatus;
+ return tabletHealth.status;
}
public long getDbId() {
@@ -739,7 +749,7 @@ public class TabletSchedCtx implements
Comparable<TabletSchedCtx> {
if (replica.getLastFailedVersion() <= 0
&& replica.getVersion() >= visibleVersion) {
- if (tabletStatus == TabletStatus.NEED_FURTHER_REPAIR &&
replica.needFurtherRepair()) {
+ if (tabletHealth.status == TabletStatus.NEED_FURTHER_REPAIR &&
replica.needFurtherRepair()) {
furtherRepairs.add(replica);
}
@@ -1016,10 +1026,10 @@ public class TabletSchedCtx implements
Comparable<TabletSchedCtx> {
// REPLICA_MISSING/REPLICA_RELOCATING,
// we create a new replica with state CLONE
Replica replica = null;
- if (tabletStatus == TabletStatus.REPLICA_MISSING
- || tabletStatus == TabletStatus.REPLICA_RELOCATING || type ==
Type.BALANCE
- || tabletStatus == TabletStatus.COLOCATE_MISMATCH
- || tabletStatus == TabletStatus.REPLICA_MISSING_FOR_TAG) {
+ if (tabletHealth.status == TabletStatus.REPLICA_MISSING
+ || tabletHealth.status == TabletStatus.REPLICA_RELOCATING ||
type == Type.BALANCE
+ || tabletHealth.status == TabletStatus.COLOCATE_MISMATCH
+ || tabletHealth.status ==
TabletStatus.REPLICA_MISSING_FOR_TAG) {
replica = new Replica(
Env.getCurrentEnv().getNextId(), destBackendId,
-1 /* version */, schemaHash,
@@ -1054,7 +1064,7 @@ public class TabletSchedCtx implements
Comparable<TabletSchedCtx> {
destOldVersion = replica.getVersion();
cloneTask.setPathHash(srcPathHash, destPathHash);
LOG.info("create clone task to repair replica, tabletId={},
replica={}, visible version {}, tablet status {}",
- tabletId, replica, visibleVersion, tabletStatus);
+ tabletId, replica, visibleVersion, tabletHealth.status);
this.state = State.RUNNING;
return cloneTask;
@@ -1062,7 +1072,7 @@ public class TabletSchedCtx implements
Comparable<TabletSchedCtx> {
// for storage migration or cloning a new replica
public long getDestEstimatedCopingSize() {
- if ((cloneTask != null && tabletStatus !=
TabletStatus.VERSION_INCOMPLETE)
+ if ((cloneTask != null && tabletHealth.status !=
TabletStatus.VERSION_INCOMPLETE)
|| storageMediaMigrationTask != null) {
return Math.max(getTabletSize(), 10L);
} else {
@@ -1149,10 +1159,8 @@ public class TabletSchedCtx implements
Comparable<TabletSchedCtx> {
List<Long> aliveBeIds = infoService.getAllBackendIds(true);
ReplicaAllocation replicaAlloc =
olapTable.getPartitionInfo().getReplicaAllocation(partitionId);
- Pair<TabletStatus, TabletSchedCtx.Priority> pair =
tablet.getHealthStatusWithPriority(
- infoService, visibleVersion, replicaAlloc,
- aliveBeIds);
- if (pair.first == TabletStatus.HEALTHY) {
+ TabletStatus status = tablet.getHealth(infoService,
visibleVersion, replicaAlloc, aliveBeIds).status;
+ if (status == TabletStatus.HEALTHY) {
throw new SchedException(Status.FINISHED, "tablet is healthy");
}
@@ -1266,10 +1274,13 @@ public class TabletSchedCtx implements
Comparable<TabletSchedCtx> {
result.add(String.valueOf(tabletId));
result.add(type.name());
result.add(storageMedium == null ? FeConstants.null_string :
storageMedium.name());
- result.add(tabletStatus == null ? FeConstants.null_string :
tabletStatus.name());
+ result.add(tabletHealth.status == null ? FeConstants.null_string :
tabletHealth.status.name());
result.add(state.name());
result.add(schedFailedCode.name());
- result.add(priority.name());
+ result.add(tabletHealth.priority == null ? FeConstants.null_string :
tabletHealth.priority.name());
+ // show the real priority value, higher this value, higher sched
priority. Add 10 hour to make it
+ // to be a positive value.
+ result.add(String.valueOf((System.currentTimeMillis() -
getCompareValue()) / 1000 + 10 * 3600L));
result.add(srcReplica == null ? "-1" :
String.valueOf(srcReplica.getBackendId()));
result.add(String.valueOf(srcPathHash));
result.add(String.valueOf(destBackendId));
@@ -1299,26 +1310,44 @@ public class TabletSchedCtx implements
Comparable<TabletSchedCtx> {
return Long.compare(getCompareValue(), o.getCompareValue());
}
+ // smaller compare value, higher priority
private long getCompareValue() {
long value = createTime;
if (lastVisitedTime > 0) {
value = lastVisitedTime;
}
- value += (Priority.VERY_HIGH.ordinal() - priority.ordinal() + 1) * 60
* 1000L;
+ value += (Priority.VERY_HIGH.ordinal() -
tabletHealth.priority.ordinal() + 1) * 60 * 1000L;
value += 5000L * (failedSchedCounter / 10);
if (schedFailedCode == SubCode.WAITING_DECOMMISSION) {
value += 5 * 1000L;
}
+ long baseTime = Config.tablet_schedule_high_priority_second * 1000L;
// repair tasks always prior than balance
if (type == Type.BALANCE) {
- value += 5 * 3600 * 1000L; // 5 hour
+ value += 10 * baseTime;
+ } else {
+ int replicaNum = replicaAlloc.getTotalReplicaNum();
+ if (tabletHealth.aliveAndVersionCompleteNum < replicaNum &&
!tabletHealth.noPathForNewReplica) {
+ if (tabletHealth.aliveAndVersionCompleteNum < (replicaNum / 2
+ 1)) {
+ value -= 3 * baseTime;
+ if (tabletHealth.hasRecentLoadFailed) {
+ value -= 3 * baseTime;
+ }
+ }
+ if (tabletHealth.hasAliveAndVersionIncomplete) {
+ value -= 1 * baseTime;
+ if (isUniqKeyMergeOnWrite) {
+ value -= 1 * baseTime;
+ }
+ }
+ }
}
- if (tabletStatus == TabletStatus.NEED_FURTHER_REPAIR) {
- value -= 3600 * 1000L; // 1 hour
+ if (tabletHealth.status == TabletStatus.NEED_FURTHER_REPAIR) {
+ value -= 1 * baseTime;
}
return value;
@@ -1328,8 +1357,8 @@ public class TabletSchedCtx implements
Comparable<TabletSchedCtx> {
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("tablet id: ").append(tabletId);
- if (tabletStatus != null) {
- sb.append(", status: ").append(tabletStatus.name());
+ if (tabletHealth.status != null) {
+ sb.append(", status: ").append(tabletHealth.status.name());
}
if (state != null) {
sb.append(", state: ").append(state.name());
@@ -1340,9 +1369,7 @@ public class TabletSchedCtx implements
Comparable<TabletSchedCtx> {
if (type == Type.BALANCE && balanceType != null) {
sb.append(", balance: ").append(balanceType.name());
}
- if (priority != null) {
- sb.append(", priority: ").append(priority.name());
- }
+ sb.append(", priority: ").append(tabletHealth.priority.name());
sb.append(", tablet size: ").append(tabletSize);
if (srcReplica != null) {
sb.append(", from backend: ").append(srcReplica.getBackendId());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
index 10a39af8a15..a3a3a93e0fa 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
@@ -34,6 +34,7 @@ import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.Replica.ReplicaState;
import org.apache.doris.catalog.ReplicaAllocation;
import org.apache.doris.catalog.Tablet;
+import org.apache.doris.catalog.Tablet.TabletHealth;
import org.apache.doris.catalog.Tablet.TabletStatus;
import org.apache.doris.catalog.TabletInvertedIndex;
import org.apache.doris.clone.BackendLoadStatistic.BePathLoadStatPair;
@@ -45,7 +46,6 @@ import org.apache.doris.clone.TabletSchedCtx.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
-import org.apache.doris.common.Pair;
import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.persist.ReplicaPersistInfo;
@@ -167,6 +167,17 @@ public class TabletScheduler extends MasterDaemon {
this.diskRebalancer = new DiskRebalancer(infoService, invertedIndex,
backendsWorkingSlots);
}
+ // for fe ut
+ public synchronized void clear() {
+ pendingTablets.clear();
+ allTabletTypes.clear();
+ runningTablets.clear();
+ schedHistory.clear();
+
+ lastStatUpdateTime = 0;
+ lastSlotAdjustTime = 0;
+ }
+
public TabletSchedulerStat getStat() {
return stat;
}
@@ -322,7 +333,7 @@ public class TabletScheduler extends MasterDaemon {
*
*/
@Override
- protected void runAfterCatalogReady() {
+ public void runAfterCatalogReady() {
if (!updateWorkingSlots()) {
return;
}
@@ -481,7 +492,7 @@ public class TabletScheduler extends MasterDaemon {
tabletCtx.setLastVisitedTime(currentTime);
stat.counterTabletScheduled.incrementAndGet();
- Pair<TabletStatus, TabletSchedCtx.Priority> statusPair;
+ TabletHealth tabletHealth;
Database db =
Env.getCurrentInternalCatalog().getDbOrException(tabletCtx.getDbId(),
s -> new SchedException(Status.UNRECOVERABLE,
SubCode.DIAGNOSE_IGNORE,
"db " + tabletCtx.getDbId() + " does not exist"));
@@ -530,15 +541,13 @@ public class TabletScheduler extends MasterDaemon {
Preconditions.checkState(tabletOrderIdx != -1);
Set<Long> backendsSet =
colocateTableIndex.getTabletBackendsByGroup(groupId, tabletOrderIdx);
- TabletStatus st = tablet.getColocateHealthStatus(
- partition.getVisibleVersion(), replicaAlloc,
backendsSet);
- statusPair = Pair.of(st, Priority.HIGH);
+ tabletHealth =
tablet.getColocateHealth(partition.getVisibleVersion(), replicaAlloc,
backendsSet);
+ tabletHealth.priority = Priority.HIGH;
tabletCtx.setColocateGroupBackendIds(backendsSet);
} else {
replicaAlloc =
tbl.getPartitionInfo().getReplicaAllocation(partition.getId());
List<Long> aliveBeIds = infoService.getAllBackendIds(true);
- statusPair = tablet.getHealthStatusWithPriority(
- infoService, partition.getVisibleVersion(),
replicaAlloc, aliveBeIds);
+ tabletHealth = tablet.getHealth(infoService,
partition.getVisibleVersion(), replicaAlloc, aliveBeIds);
}
if (tabletCtx.getType() != allTabletTypes.get(tabletId)) {
@@ -576,7 +585,7 @@ public class TabletScheduler extends MasterDaemon {
}
}
- if (statusPair.first != TabletStatus.VERSION_INCOMPLETE
+ if (tabletHealth.status != TabletStatus.VERSION_INCOMPLETE
&& (partition.getState() != PartitionState.NORMAL ||
tableState != OlapTableState.NORMAL)
&& tableState != OlapTableState.WAITING_STABLE) {
// If table is under ALTER process(before FINISHING), do not
allow to add or delete replica.
@@ -585,13 +594,14 @@ public class TabletScheduler extends MasterDaemon {
// executing an alter job, but the alter job is in a PENDING
state and is waiting for
// the table to become stable. In this case, we allow the
tablet repair to proceed.
throw new SchedException(Status.UNRECOVERABLE,
SubCode.DIAGNOSE_IGNORE,
- "table is in alter process, but tablet status is " +
statusPair.first.name());
+ "table is in alter process, but tablet status is " +
tabletHealth.status.name());
}
- tabletCtx.setTabletStatus(statusPair.first);
- if (statusPair.first == TabletStatus.HEALTHY &&
tabletCtx.getType() == TabletSchedCtx.Type.REPAIR) {
+ tabletCtx.setTabletHealth(tabletHealth);
+ tabletCtx.setIsUniqKeyMergeOnWrite(tbl.isUniqKeyMergeOnWrite());
+ if (tabletHealth.status == TabletStatus.HEALTHY &&
tabletCtx.getType() == TabletSchedCtx.Type.REPAIR) {
throw new SchedException(Status.UNRECOVERABLE,
SubCode.DIAGNOSE_IGNORE, "tablet is healthy");
- } else if (statusPair.first != TabletStatus.HEALTHY
+ } else if (tabletHealth.status != TabletStatus.HEALTHY
&& tabletCtx.getType() == TabletSchedCtx.Type.BALANCE) {
// we select an unhealthy tablet to do balance, which is not
right.
// so here we stop this task.
@@ -612,7 +622,7 @@ public class TabletScheduler extends MasterDaemon {
tabletCtx.setSchemaHash(tbl.getSchemaHashByIndexId(idx.getId()));
tabletCtx.setStorageMedium(tbl.getPartitionInfo().getDataProperty(partition.getId()).getStorageMedium());
- handleTabletByTypeAndStatus(statusPair.first, tabletCtx,
batchTask);
+ handleTabletByTypeAndStatus(tabletHealth.status, tabletCtx,
batchTask);
} finally {
tbl.writeUnlock();
}
@@ -1379,6 +1389,24 @@ public class TabletScheduler extends MasterDaemon {
// if forColocate is false, the tag must be set.
private RootPathLoadStatistic chooseAvailableDestPath(TabletSchedCtx
tabletCtx, Tag tag, boolean forColocate)
throws SchedException {
+ boolean noPathForNewReplica = false;
+ try {
+ return doChooseAvailableDestPath(tabletCtx, tag, forColocate);
+ } catch (SchedException e) {
+ if (e.getStatus() == Status.UNRECOVERABLE) {
+ noPathForNewReplica = true;
+ }
+ throw e;
+ } finally {
+ Tablet tablet = tabletCtx.getTablet();
+ if (tablet != null) {
+ tablet.setLastTimeNoPathForNewReplica(noPathForNewReplica ?
System.currentTimeMillis() : -1L);
+ }
+ }
+ }
+
+ private RootPathLoadStatistic doChooseAvailableDestPath(TabletSchedCtx
tabletCtx, Tag tag, boolean forColocate)
+ throws SchedException {
List<BackendLoadStatistic> beStatistics;
if (tag != null) {
Preconditions.checkState(!forColocate);
@@ -1552,13 +1580,11 @@ public class TabletScheduler extends MasterDaemon {
}
}
-
private void addBackToPendingTablets(TabletSchedCtx tabletCtx) {
Preconditions.checkState(tabletCtx.getState() ==
TabletSchedCtx.State.PENDING);
addTablet(tabletCtx, true /* force */);
}
-
private void finalizeTabletCtx(TabletSchedCtx tabletCtx,
TabletSchedCtx.State state, Status status, String reason) {
if (state == TabletSchedCtx.State.CANCELLED || state ==
TabletSchedCtx.State.UNEXPECTED) {
if (tabletCtx.getType() == TabletSchedCtx.Type.BALANCE
@@ -1595,8 +1621,6 @@ public class TabletScheduler extends MasterDaemon {
if (tbl == null) {
return;
}
- Pair<TabletStatus, TabletSchedCtx.Priority> statusPair;
- ReplicaAllocation replicaAlloc = null;
tbl.readLock();
try {
Partition partition = tbl.getPartition(tabletCtx.getPartitionId());
@@ -1614,67 +1638,79 @@ public class TabletScheduler extends MasterDaemon {
return;
}
- boolean isColocateTable =
colocateTableIndex.isColocateTable(tbl.getId());
- if (isColocateTable) {
- GroupId groupId = colocateTableIndex.getGroup(tbl.getId());
- if (groupId == null) {
- return;
- }
- ColocateGroupSchema groupSchema =
colocateTableIndex.getGroupSchema(groupId);
- if (groupSchema == null) {
- return;
- }
+ tryAddRepairTablet(tablet, tabletCtx.getDbId(), tbl, partition,
idx, finishedCounter);
+ } finally {
+ tbl.readUnlock();
+ }
+ }
- replicaAlloc = groupSchema.getReplicaAlloc();
- int tabletOrderIdx = tabletCtx.getTabletOrderIdx();
- if (tabletOrderIdx == -1) {
- tabletOrderIdx = idx.getTabletOrderIdx(tablet.getId());
- }
- Preconditions.checkState(tabletOrderIdx != -1);
+ public void tryAddRepairTablet(Tablet tablet, long dbId, OlapTable table,
Partition partition,
+ MaterializedIndex idx, int finishedCounter) {
+ if (Config.disable_tablet_scheduler) {
+ return;
+ }
- Set<Long> backendsSet =
colocateTableIndex.getTabletBackendsByGroup(groupId, tabletOrderIdx);
- TabletStatus st = tablet.getColocateHealthStatus(
- partition.getVisibleVersion(), replicaAlloc,
backendsSet);
- statusPair = Pair.of(st, Priority.HIGH);
- tabletCtx.setColocateGroupBackendIds(backendsSet);
- } else {
- replicaAlloc =
tbl.getPartitionInfo().getReplicaAllocation(partition.getId());
- List<Long> aliveBeIds = infoService.getAllBackendIds(true);
- statusPair = tablet.getHealthStatusWithPriority(
- infoService, partition.getVisibleVersion(),
replicaAlloc, aliveBeIds);
+ TabletHealth tabletHealth;
+ ReplicaAllocation replicaAlloc;
+ Set<Long> colocateBackendIds = null;
+ boolean isColocateTable =
colocateTableIndex.isColocateTable(table.getId());
+ if (isColocateTable) {
+ GroupId groupId = colocateTableIndex.getGroup(table.getId());
+ if (groupId == null) {
+ return;
+ }
+ ColocateGroupSchema groupSchema =
colocateTableIndex.getGroupSchema(groupId);
+ if (groupSchema == null) {
+ return;
+ }
- if (statusPair.second.ordinal() <
tabletCtx.getPriority().ordinal()) {
- statusPair.second = tabletCtx.getPriority();
- }
+ replicaAlloc = groupSchema.getReplicaAlloc();
+ int tabletOrderIdx = idx.getTabletOrderIdx(tablet.getId());
+ if (tabletOrderIdx == -1) {
+ LOG.warn("Unknow colocate tablet order idx: group {}, table
{}, partition {}, index {}, tablet {}",
+ groupId, table.getId(), partition.getId(),
idx.getId(), tablet.getId());
+ return;
}
- if (statusPair.first == TabletStatus.NEED_FURTHER_REPAIR) {
+ colocateBackendIds =
colocateTableIndex.getTabletBackendsByGroup(groupId, tabletOrderIdx);
+ tabletHealth =
tablet.getColocateHealth(partition.getVisibleVersion(), replicaAlloc,
colocateBackendIds);
+ tabletHealth.priority = Priority.HIGH;
+ } else {
+ replicaAlloc =
table.getPartitionInfo().getReplicaAllocation(partition.getId());
+ List<Long> aliveBeIds = infoService.getAllBackendIds(true);
+ tabletHealth = tablet.getHealth(infoService,
partition.getVisibleVersion(), replicaAlloc, aliveBeIds);
+ }
+
+ if (tabletHealth.status == TabletStatus.HEALTHY || tabletHealth.status
== TabletStatus.UNRECOVERABLE) {
+ return;
+ }
+
+ // first time found this tablet is unhealthy
+ if (finishedCounter == 0) {
+ if (!tablet.readyToBeRepaired(Env.getCurrentSystemInfo(),
tabletHealth.priority)) {
+ return;
+ }
+ } else {
+ if (tabletHealth.status == TabletStatus.NEED_FURTHER_REPAIR) {
// replica is just waiting for finishing txns before
furtherRepairWatermarkTxnTd,
- // no need to add it immediately
- Replica replica =
tablet.getReplicaByBackendId(tabletCtx.getDestBackendId());
+ // no need to re add it immediately, can wait a little
+ Replica replica =
tablet.getReplicaById(tabletHealth.needFurtherRepairReplicaId);
if (replica != null && replica.getVersion() >=
partition.getVisibleVersion()
&& replica.getLastFailedVersion() < 0) {
return;
}
}
- } finally {
- tbl.readUnlock();
}
- if (statusPair.first == TabletStatus.HEALTHY) {
- return;
- }
+ TabletSchedCtx tabletCtx = new
TabletSchedCtx(TabletSchedCtx.Type.REPAIR, dbId, table.getId(),
+ partition.getId(), idx.getId(), tablet.getId(), replicaAlloc,
System.currentTimeMillis());
- TabletSchedCtx newTabletCtx = new TabletSchedCtx(
- TabletSchedCtx.Type.REPAIR, tabletCtx.getDbId(),
tabletCtx.getTblId(),
- tabletCtx.getPartitionId(), tabletCtx.getIndexId(),
tabletCtx.getTabletId(),
- replicaAlloc, System.currentTimeMillis());
-
- newTabletCtx.setTabletStatus(statusPair.first);
- newTabletCtx.setPriority(statusPair.second);
- newTabletCtx.setFinishedCounter(finishedCounter);
+ tabletCtx.setTabletHealth(tabletHealth);
+ tabletCtx.setFinishedCounter(finishedCounter);
+ tabletCtx.setColocateGroupBackendIds(colocateBackendIds);
+ tabletCtx.setIsUniqKeyMergeOnWrite(table.isUniqKeyMergeOnWrite());
- addTablet(newTabletCtx, false);
+ addTablet(tabletCtx, false);
}
private void releaseTabletCtx(TabletSchedCtx tabletCtx,
TabletSchedCtx.State state, boolean resetReplicaState) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletHealthProcDir.java
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletHealthProcDir.java
index e28c74c327e..8aed3600741 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletHealthProcDir.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletHealthProcDir.java
@@ -29,10 +29,8 @@ import org.apache.doris.catalog.ReplicaAllocation;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.Tablet;
-import org.apache.doris.clone.TabletSchedCtx;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
-import org.apache.doris.common.Pair;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.task.AgentTask;
import org.apache.doris.task.AgentTaskQueue;
@@ -181,6 +179,7 @@ public class TabletHealthProcDir implements
ProcDirInterface {
olapTable.readLock();
try {
for (Partition partition : olapTable.getAllPartitions()) {
+ long visibleVersion = partition.getVisibleVersion();
ReplicaAllocation replicaAlloc =
olapTable.getPartitionInfo()
.getReplicaAllocation(partition.getId());
for (MaterializedIndex materializedIndex :
partition.getMaterializedIndices(
@@ -196,13 +195,10 @@ public class TabletHealthProcDir implements
ProcDirInterface {
replicaAlloc =
groupSchema.getReplicaAlloc();
}
Set<Long> backendsSet =
colocateTableIndex.getTabletBackendsByGroup(groupId, i);
- res =
tablet.getColocateHealthStatus(partition.getVisibleVersion(), replicaAlloc,
- backendsSet);
+ res =
tablet.getColocateHealth(visibleVersion, replicaAlloc, backendsSet).status;
} else {
- Pair<Tablet.TabletStatus,
TabletSchedCtx.Priority> pair
- =
tablet.getHealthStatusWithPriority(infoService,
- partition.getVisibleVersion(),
replicaAlloc, aliveBeIds);
- res = pair.first;
+ res = tablet.getHealth(infoService,
visibleVersion, replicaAlloc,
+ aliveBeIds).status;
}
switch (res) { // CHECKSTYLE IGNORE THIS LINE:
missing switch default
case HEALTHY:
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletSchedulerDetailProcDir.java
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletSchedulerDetailProcDir.java
index 67e24870f9f..060516eb7ba 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletSchedulerDetailProcDir.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletSchedulerDetailProcDir.java
@@ -35,7 +35,8 @@ import java.util.List;
*/
public class TabletSchedulerDetailProcDir implements ProcDirInterface {
public static final ImmutableList<String> TITLE_NAMES = new
ImmutableList.Builder<String>().add("TabletId")
-
.add("Type").add("Medium").add("Status").add("State").add("SchedCode").add("Priority").add("SrcBe")
+
.add("Type").add("Medium").add("Status").add("State").add("SchedCode")
+ .add("Priority").add("RealPriorityVal").add("SrcBe")
.add("SrcPath").add("DestBe").add("DestPath").add("Timeout").add("Create").add("LstSched").add("LstVisit")
.add("Finished").add("ReplicaSize").add("Rate").add("FailedSched").add("FailedRunning").add("VisibleVer")
.add("CmtVer").add("ErrMsg")
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
index 635a8bb675f..de56b4ce9b6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
@@ -38,7 +38,6 @@ import org.apache.doris.catalog.Tablet;
import org.apache.doris.catalog.Tablet.TabletStatus;
import org.apache.doris.catalog.TabletInvertedIndex;
import org.apache.doris.catalog.TabletMeta;
-import org.apache.doris.clone.TabletSchedCtx;
import org.apache.doris.common.Config;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.Pair;
@@ -1252,8 +1251,7 @@ public class ReportHandler extends Daemon {
if (groupSchema != null) {
replicaAlloc = groupSchema.getReplicaAlloc();
}
- TabletStatus status =
- tablet.getColocateHealthStatus(visibleVersion,
replicaAlloc, backendsSet);
+ TabletStatus status = tablet.getColocateHealth(visibleVersion,
replicaAlloc, backendsSet).status;
if (status == TabletStatus.HEALTHY) {
return false;
}
@@ -1265,8 +1263,7 @@ public class ReportHandler extends Daemon {
SystemInfoService infoService = Env.getCurrentSystemInfo();
List<Long> aliveBeIds = infoService.getAllBackendIds(true);
- Pair<TabletStatus, TabletSchedCtx.Priority> status =
tablet.getHealthStatusWithPriority(infoService,
- visibleVersion, replicaAlloc, aliveBeIds);
+ TabletStatus status = tablet.getHealth(infoService,
visibleVersion, replicaAlloc, aliveBeIds).status;
// FORCE_REDUNDANT is a specific missing case.
// So it can add replica when it's in FORCE_REDUNDANT.
@@ -1275,16 +1272,16 @@ public class ReportHandler extends Daemon {
// it's safe to add this replica.
// Because if the tablet scheduler want to delete a replica, it
will choose the sched
// unavailable replica and avoid the repeating loop as above.
- boolean canAddForceRedundant = status.first ==
TabletStatus.FORCE_REDUNDANT
+ boolean canAddForceRedundant = status ==
TabletStatus.FORCE_REDUNDANT
&& infoService.checkBackendScheduleAvailable(backendId)
&& tablet.getReplicas().stream().anyMatch(
r ->
!infoService.checkBackendScheduleAvailable(r.getBackendId()));
if (isColocateBackend
|| canAddForceRedundant
- || status.first == TabletStatus.VERSION_INCOMPLETE
- || status.first == TabletStatus.REPLICA_MISSING
- || status.first == TabletStatus.UNRECOVERABLE) {
+ || status == TabletStatus.VERSION_INCOMPLETE
+ || status == TabletStatus.REPLICA_MISSING
+ || status == TabletStatus.UNRECOVERABLE) {
long lastFailedVersion = -1L;
// For some partition created by old version's Doris
@@ -1360,7 +1357,7 @@ public class ReportHandler extends Daemon {
LOG.info("add replica[{}-{}] to catalog. backend[{}], tablet
status {}, tablet size {}, "
+ "is colocate backend {}",
- tabletId, replicaId, backendId, status.first.name(),
tablet.getReplicas().size(),
+ tabletId, replicaId, backendId, status.name(),
tablet.getReplicas().size(),
isColocateBackend);
return true;
} else {
@@ -1374,7 +1371,7 @@ public class ReportHandler extends Daemon {
}
LOG.warn("no add replica [{}-{}] cause it is enough[{}-{}],
tablet status {}",
tabletId, replicaId, tablet.getReplicas().size(),
replicaAlloc.toCreateStmt(),
- status.first.name());
+ status.name());
return false;
}
} finally {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
index 04065db8d32..0dcbfee9c4b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
@@ -177,7 +177,7 @@ public class OlapInsertExecutor extends
AbstractInsertExecutor {
database.getId(), olapTableSink.getDstTable(),
analyzer));
dataStreamSink.setTabletSinkTupleDesc(olapTableSink.getTupleDescriptor());
List<TOlapTableLocationParam> locationParams = olapTableSink
- .createLocation(olapTableSink.getDstTable());
+ .createLocation(database.getId(),
olapTableSink.getDstTable());
dataStreamSink.setTabletSinkLocationParam(locationParams.get(0));
dataStreamSink.setTabletSinkTxnId(olapTableSink.getTxnId());
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
index e3195eec135..4196748cb8e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
@@ -213,7 +213,7 @@ public class OlapTableSink extends DataSink {
tSink.setNeedGenRollup(dstTable.shouldLoadToNewRollup());
tSink.setSchema(createSchema(tSink.getDbId(), dstTable, analyzer));
tSink.setPartition(createPartition(tSink.getDbId(), dstTable,
analyzer));
- List<TOlapTableLocationParam> locationParams =
createLocation(dstTable);
+ List<TOlapTableLocationParam> locationParams =
createLocation(tSink.getDbId(), dstTable);
tSink.setLocation(locationParams.get(0));
if (singleReplicaLoad) {
tSink.setSlaveLocation(locationParams.get(1));
@@ -604,7 +604,7 @@ public class OlapTableSink extends DataSink {
return Arrays.asList(locationParam, slaveLocationParam);
}
- public List<TOlapTableLocationParam> createLocation(OlapTable table)
throws UserException {
+ public List<TOlapTableLocationParam> createLocation(long dbId, OlapTable
table) throws UserException {
if (table.getPartitionInfo().enableAutomaticPartition() &&
partitionIds.isEmpty()) {
return createDummyLocation(table);
}
@@ -622,6 +622,13 @@ public class OlapTableSink extends DataSink {
for (Tablet tablet : index.getTablets()) {
Multimap<Long, Long> bePathsMap =
tablet.getNormalReplicaBackendPathMap();
if (bePathsMap.keySet().size() < loadRequiredReplicaNum) {
+ long now = System.currentTimeMillis();
+ long lastLoadFailedTime =
tablet.getLastLoadFailedTime();
+ tablet.setLastLoadFailedTime(now);
+ if (now - lastLoadFailedTime >= 5000L) {
+
Env.getCurrentEnv().getTabletScheduler().tryAddRepairTablet(
+ tablet, dbId, table, partition, index, 0);
+ }
throw new
UserException(InternalErrorCode.REPLICA_FEW_ERR,
"tablet " + tablet.getId() + " alive replica
num " + bePathsMap.keySet().size()
+ " < load required replica num " +
loadRequiredReplicaNum
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
index db34dc2be3c..cc4ba211084 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
@@ -621,6 +621,14 @@ public class DatabaseTransactionMgr {
int successReplicaNum = tabletSuccReplicas.size();
if (successReplicaNum < loadRequiredReplicaNum) {
+ long now = System.currentTimeMillis();
+ long lastLoadFailedTime =
tablet.getLastLoadFailedTime();
+ tablet.setLastLoadFailedTime(now);
+ if (now - lastLoadFailedTime >= 5000L) {
+
Env.getCurrentEnv().getTabletScheduler().tryAddRepairTablet(
+ tablet, db.getId(), table, partition,
index, 0);
+ }
+
String writeDetail =
getTabletWriteDetail(tabletSuccReplicas, tabletWriteFailedReplicas,
tabletVersionFailedReplicas);
@@ -1271,6 +1279,7 @@ public class DatabaseTransactionMgr {
transactionState, tablet.getId(),
newVersion, loadRequiredReplicaNum, tableId,
partitionId,
partition.getCommittedVersion(), writeDetail));
}
+ tablet.setLastLoadFailedTime(-1L);
continue;
}
diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/TabletTest.java
b/fe/fe-core/src/test/java/org/apache/doris/catalog/TabletTest.java
index d7fdb2694a8..3d54a772853 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/catalog/TabletTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/TabletTest.java
@@ -20,6 +20,8 @@ package org.apache.doris.catalog;
import org.apache.doris.catalog.Replica.ReplicaState;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Pair;
+import org.apache.doris.system.Backend;
+import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TStorageMedium;
import com.google.common.collect.Sets;
@@ -42,6 +44,7 @@ public class TabletTest {
private Replica replica3;
private TabletInvertedIndex invertedIndex;
+ private SystemInfoService infoService;
@Mocked
private Env env;
@@ -49,6 +52,12 @@ public class TabletTest {
@Before
public void makeTablet() {
invertedIndex = new TabletInvertedIndex();
+ infoService = new SystemInfoService();
+ for (long beId = 1L; beId <= 4L; beId++) {
+ Backend be = new Backend(beId, "127.0.0." + beId, 8030);
+ be.setAlive(true);
+ infoService.addBackend(be);
+ }
new Expectations(env) {
{
Env.getCurrentEnvJournalVersion();
@@ -59,6 +68,10 @@ public class TabletTest {
minTimes = 0;
result = invertedIndex;
+ Env.getCurrentSystemInfo();
+ minTimes = 0;
+ result = infoService;
+
Env.isCheckpointThread();
minTimes = 0;
result = false;
@@ -170,8 +183,8 @@ public class TabletTest {
tablet.addReplica(new Replica(replicaId++, pair.first,
versionAndSuccessVersion, 0,
200000L, 0, 3000L, ReplicaState.NORMAL, lastFailVersion,
versionAndSuccessVersion));
}
- Assert.assertEquals(tablet.getColocateHealthStatus(100L, new
ReplicaAllocation((short) 3),
- Sets.newHashSet(1L, 2L, 3L)), exceptedTabletStatus);
+ Assert.assertEquals(tablet.getColocateHealth(100L, new
ReplicaAllocation((short) 3),
+ Sets.newHashSet(1L, 2L, 3L)).status, exceptedTabletStatus);
}
@Test
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletHealthTest.java
b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletHealthTest.java
new file mode 100644
index 00000000000..b22925e5d89
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletHealthTest.java
@@ -0,0 +1,361 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.clone;
+
+import org.apache.doris.catalog.ColocateTableIndex;
+import org.apache.doris.catalog.ColocateTableIndex.GroupId;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.Replica;
+import org.apache.doris.catalog.ReplicaAllocation;
+import org.apache.doris.catalog.Table;
+import org.apache.doris.catalog.Tablet;
+import org.apache.doris.catalog.Tablet.TabletHealth;
+import org.apache.doris.catalog.Tablet.TabletStatus;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.util.DebugPointUtil;
+import org.apache.doris.resource.Tag;
+import org.apache.doris.system.Backend;
+import org.apache.doris.system.SystemInfoService;
+import org.apache.doris.utframe.TestWithFeService;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class TabletHealthTest extends TestWithFeService {
+
+ private Database db;
+
+ @Override
+ protected void beforeCreatingConnectContext() throws Exception {
+ Config.enable_debug_points = true;
+ Config.disable_balance = true;
+ Config.disable_colocate_balance_between_groups = true;
+ Config.drop_backend_after_decommission = false;
+ Config.colocate_group_relocate_delay_second = -1000; // be dead will
imm relocate
+ Config.tablet_schedule_interval_ms = 7200_000L; //disable schedule
+ Config.tablet_checker_interval_ms = 7200_000L; //disable checker
+ }
+
+ @Override
+ protected int backendNum() {
+ return 3;
+ }
+
+ @Override
+ protected void runBeforeAll() throws Exception {
+ Thread.sleep(1000);
+ createDatabase("test");
+ useDatabase("test");
+ db = Env.getCurrentInternalCatalog().getDbOrMetaException("test");
+ }
+
+ @Override
+ protected void runBeforeEach() throws Exception {
+ for (Table table : db.getTables()) {
+ dropTable(table.getName(), true);
+ }
+ for (Backend be :
Env.getCurrentSystemInfo().getBackendsByTag(Tag.DEFAULT_BACKEND_TAG)) {
+ be.setDecommissioned(false);
+ }
+ Env.getCurrentEnv().getTabletScheduler().clear();
+ DebugPointUtil.clearDebugPoints();
+
Assertions.assertTrue(checkBEHeartbeat(Env.getCurrentSystemInfo().getBackendsByTag(Tag.DEFAULT_BACKEND_TAG)));
+ }
+
+ private void shutdownBackends(List<Long> backendIds) throws Exception {
+ if (backendIds.isEmpty()) {
+ return;
+ }
+ Map<String, String> params = Maps.newHashMap();
+ params.put("deadBeIds", Joiner.on(",").join(backendIds));
+
DebugPointUtil.addDebugPointWithParams("HeartbeatMgr.BackendHeartbeatHandler",
params);
+ List<Backend> backends = backendIds.stream().map(beId ->
Env.getCurrentSystemInfo().getBackend(beId))
+ .collect(Collectors.toList());
+ Assertions.assertTrue(checkBELostHeartbeat(backends));
+ }
+
+ private void doRepair() throws Exception {
+ RebalancerTestUtil.updateReplicaPathHash();
+ for (int i = 0; i < 10; i++) {
+ Env.getCurrentEnv().getTabletChecker().runAfterCatalogReady();
+
ColocateTableCheckerAndBalancer.getInstance().runAfterCatalogReady();
+ if (Env.getCurrentEnv().getTabletScheduler().getPendingNum() == 0)
{
+ break;
+ }
+
+ Env.getCurrentEnv().getTabletScheduler().runAfterCatalogReady();
+ Thread.sleep(500);
+ }
+ }
+
+ private void checkTabletStatus(Tablet tablet, TabletStatus status,
+ OlapTable table, Partition partition) throws Exception {
+ SystemInfoService infoService = Env.getCurrentSystemInfo();
+ ColocateTableIndex colocateTableIndex = Env.getCurrentColocateIndex();
+ TabletHealth health;
+ if (colocateTableIndex.isColocateTable(table.getId())) {
+ GroupId groupId = colocateTableIndex.getGroup(table.getId());
+ ReplicaAllocation replicaAlloc =
colocateTableIndex.getGroupSchema(groupId).getReplicaAlloc();
+ Set<Long> colocateBackends =
colocateTableIndex.getTabletBackendsByGroup(groupId, 0);
+ health = tablet.getColocateHealth(partition.getVisibleVersion(),
replicaAlloc, colocateBackends);
+ } else {
+ ReplicaAllocation replicaAlloc =
table.getPartitionInfo().getReplicaAllocation(partition.getId());
+ health = tablet.getHealth(infoService,
partition.getVisibleVersion(),
+ replicaAlloc, infoService.getAllBackendIds(true));
+ }
+ Assertions.assertEquals(status, health.status);
+ }
+
+ private void checkTabletIsHealth(Tablet tablet, OlapTable table, Partition
partition) throws Exception {
+ checkTabletStatus(tablet, TabletStatus.HEALTHY, table, partition);
+ ReplicaAllocation replicaAlloc =
table.getPartitionInfo().getReplicaAllocation(partition.getId());
+ Assertions.assertEquals((int) replicaAlloc.getTotalReplicaNum(),
tablet.getReplicas().size());
+ for (Replica replica : tablet.getReplicas()) {
+ Assertions.assertEquals(partition.getVisibleVersion(),
replica.getVersion());
+ Assertions.assertEquals(-1L, replica.getLastFailedVersion());
+ Assertions.assertTrue(replica.isScheduleAvailable());
+ Assertions.assertTrue(replica.isAlive());
+ }
+ ColocateTableIndex colocateTableIndex = Env.getCurrentColocateIndex();
+ if (colocateTableIndex.isColocateTable(table.getId())) {
+ GroupId groupId = colocateTableIndex.getGroup(table.getId());
+
Assertions.assertFalse(colocateTableIndex.isGroupUnstable(groupId));
+ }
+ }
+
+ @Test
+ public void testTabletHealth() throws Exception {
+ createTable("CREATE TABLE tbl1 (k INT) DISTRIBUTED BY HASH(k) BUCKETS
1"
+ + " PROPERTIES ('replication_num' = '3')");
+
+ OlapTable table = (OlapTable) db.getTableOrMetaException("tbl1");
+ Partition partition = table.getPartitions().iterator().next();
+ Tablet tablet =
partition.getMaterializedIndices(IndexExtState.ALL).iterator().next()
+ .getTablets().iterator().next();
+
+ partition.updateVisibleVersion(10L);
+ tablet.getReplicas().forEach(replica -> replica.updateVersion(10L));
+
+ checkTabletIsHealth(tablet, table, partition);
+
+ tablet.getReplicas().get(0).adminUpdateVersionInfo(8L, null, null, 0L);
+ // 1 replica miss version
+ checkTabletStatus(tablet, TabletStatus.VERSION_INCOMPLETE, table,
partition);
+
tablet.deleteReplicaByBackendId(tablet.getReplicas().get(2).getBackendId());
+ Assertions.assertEquals(2, tablet.getReplicas().size());
+ // 1 replica miss version, 1 replica lost
+ checkTabletStatus(tablet, TabletStatus.VERSION_INCOMPLETE, table,
partition);
+ doRepair();
+ checkTabletIsHealth(tablet, table, partition);
+
+ tablet.getReplicas().get(0).adminUpdateVersionInfo(8L, null, null, 0L);
+ tablet.getReplicas().get(1).setBad(true);
+ // 1 replica miss version, 1 replica bad
+ checkTabletStatus(tablet, TabletStatus.VERSION_INCOMPLETE, table,
partition);
+ doRepair();
+ checkTabletIsHealth(tablet, table, partition);
+
+
tablet.deleteReplicaByBackendId(tablet.getReplicas().get(2).getBackendId());
+ Assertions.assertEquals(2, tablet.getReplicas().size());
+ // 1 replica lost
+ checkTabletStatus(tablet, TabletStatus.REPLICA_MISSING, table,
partition);
+ doRepair();
+ checkTabletIsHealth(tablet, table, partition);
+
+ tablet.getReplicas().get(2).setBad(true);
+ // 1 replica bad
+ checkTabletStatus(tablet, TabletStatus.FORCE_REDUNDANT, table,
partition);
+ doRepair();
+ checkTabletIsHealth(tablet, table, partition);
+
+ tablet.getReplicas().get(0).adminUpdateVersionInfo(8L, null, null, 0L);
+ Assertions.assertEquals(8L, tablet.getReplicas().get(0).getVersion());
+ // 1 replica miss version
+ checkTabletStatus(tablet, TabletStatus.VERSION_INCOMPLETE, table,
partition);
+
+
shutdownBackends(Lists.newArrayList(tablet.getReplicas().get(2).getBackendId()));
+ // 1 replica miss version, 1 replica dead
+ checkTabletStatus(tablet, TabletStatus.VERSION_INCOMPLETE, table,
partition);
+ doRepair();
+ Assertions.assertEquals(10L, tablet.getReplicas().get(0).getVersion());
+ // 1 replica dead
+ checkTabletStatus(tablet, TabletStatus.REPLICA_MISSING, table,
partition);
+
+ // be alive again
+ DebugPointUtil.clearDebugPoints();
+
Assertions.assertTrue(checkBEHeartbeat(Env.getCurrentSystemInfo().getBackendsByTag(Tag.DEFAULT_BACKEND_TAG)));
+
+ alterTableSync("ALTER TABLE tbl1 MODIFY PARTITION(*) SET
('replication_num' = '2')");
+ ReplicaAllocation replicaAlloc =
table.getPartitionInfo().getReplicaAllocation(partition.getId());
+ Assertions.assertEquals(2, (int) replicaAlloc.getTotalReplicaNum());
+
+ checkTabletStatus(tablet, TabletStatus.REDUNDANT, table, partition);
+ doRepair();
+ checkTabletIsHealth(tablet, table, partition);
+
+ tablet.getReplicas().get(1).setBad(true);
+ // 1 replica bad
+ checkTabletStatus(tablet, TabletStatus.REPLICA_MISSING, table,
partition);
+ doRepair();
+ checkTabletIsHealth(tablet, table, partition);
+
+ Backend decommissionBe =
Env.getCurrentSystemInfo().getBackend(tablet.getReplicas().get(0).getBackendId());
+ decommissionBe.setDecommissioned(true);
+ // 1 replica's backend is decommission
+ checkTabletStatus(tablet, TabletStatus.REPLICA_RELOCATING, table,
partition);
+ doRepair();
+ checkTabletIsHealth(tablet, table, partition);
+ decommissionBe.setDecommissioned(false);
+
+
shutdownBackends(Lists.newArrayList(tablet.getReplicas().get(1).getBackendId()));
+ // 1 replica dead
+ checkTabletStatus(tablet, TabletStatus.FORCE_REDUNDANT, table,
partition);
+ doRepair();
+ checkTabletIsHealth(tablet, table, partition);
+
+ shutdownBackends(Lists.newArrayList(tablet.getBackendIds()));
+ doRepair();
+ // all replica dead
+ checkTabletStatus(tablet, TabletStatus.UNRECOVERABLE, table,
partition);
+ Assertions.assertEquals(0,
Env.getCurrentEnv().getTabletScheduler().getPendingNum());
+
+ dropTable(table.getName(), true);
+ }
+
+ @Test
+ public void testColocateTabletHealth() throws Exception {
+ createTable("CREATE TABLE tbl2 (k INT) DISTRIBUTED BY HASH(k) BUCKETS
1"
+ + " PROPERTIES ('replication_num' = '3', 'colocate_with' =
'foo')");
+
+ OlapTable table = (OlapTable) db.getTableOrMetaException("tbl2");
+ Partition partition = table.getPartitions().iterator().next();
+ Tablet tablet =
partition.getMaterializedIndices(IndexExtState.ALL).iterator().next()
+ .getTablets().iterator().next();
+
+
Assertions.assertTrue(Env.getCurrentColocateIndex().isColocateTable(table.getId()));
+
+ partition.updateVisibleVersion(10L);
+ tablet.getReplicas().forEach(replica -> replica.updateVersion(10L));
+
+ checkTabletIsHealth(tablet, table, partition);
+
+ tablet.getReplicas().get(0).adminUpdateVersionInfo(8L, null, null, 0L);
+ // 1 replica miss version
+ checkTabletStatus(tablet, TabletStatus.VERSION_INCOMPLETE, table,
partition);
+
+
tablet.deleteReplicaByBackendId(tablet.getReplicas().get(2).getBackendId());
+ Assertions.assertEquals(2, tablet.getReplicas().size());
+ // 1 replica miss version, 1 replica lost
+ checkTabletStatus(tablet, TabletStatus.VERSION_INCOMPLETE, table,
partition);
+ doRepair();
+ checkTabletIsHealth(tablet, table, partition);
+
+
tablet.deleteReplicaByBackendId(tablet.getReplicas().get(2).getBackendId());
+ Assertions.assertEquals(2, tablet.getReplicas().size());
+ // 1 replica lost
+ checkTabletStatus(tablet, TabletStatus.COLOCATE_MISMATCH, table,
partition);
+ doRepair();
+ checkTabletIsHealth(tablet, table, partition);
+
+ tablet.getReplicas().get(0).adminUpdateVersionInfo(8L, null, null, 0L);
+ Assertions.assertEquals(8L, tablet.getReplicas().get(0).getVersion());
+ // 1 replica miss version
+ checkTabletStatus(tablet, TabletStatus.VERSION_INCOMPLETE, table,
partition);
+ tablet.getReplicas().get(2).setBad(true);
+ // 1 replica miss version, 1 replica bad
+ checkTabletStatus(tablet, TabletStatus.VERSION_INCOMPLETE, table,
partition);
+ doRepair();
+ checkTabletIsHealth(tablet, table, partition);
+
+ tablet.getReplicas().get(2).setBad(true);
+ // 1 replica bad
+ checkTabletStatus(tablet, TabletStatus.COLOCATE_REDUNDANT, table,
partition);
+ doRepair();
+ checkTabletIsHealth(tablet, table, partition);
+
+ Assertions.assertNotNull(getSqlStmtExecutor("ALTER COLOCATE GROUP foo
SET ('replication_num' = '2')"));
+ ReplicaAllocation replicaAlloc =
table.getPartitionInfo().getReplicaAllocation(partition.getId());
+ Assertions.assertEquals(2, (int) replicaAlloc.getTotalReplicaNum());
+
+ checkTabletStatus(tablet, TabletStatus.COLOCATE_REDUNDANT, table,
partition);
+ doRepair();
+ checkTabletIsHealth(tablet, table, partition);
+
+ tablet.getReplicas().get(1).setBad(true);
+ // 1 replica bad, first delete it, then re-add it
+ checkTabletStatus(tablet, TabletStatus.COLOCATE_REDUNDANT, table,
partition);
+ doRepair();
+ checkTabletIsHealth(tablet, table, partition);
+
+ long deleteBeId = tablet.getReplicas().get(1).getBackendId();
+ shutdownBackends(Lists.newArrayList(deleteBeId));
+ ColocateTableCheckerAndBalancer.getInstance().runAfterCatalogReady();
// colocate relocate
+ // 1 replica dead
+ checkTabletStatus(tablet, TabletStatus.COLOCATE_MISMATCH, table,
partition);
+ doRepair();
+ checkTabletIsHealth(tablet, table, partition);
+
+ // be alive again
+ DebugPointUtil.clearDebugPoints();
+
Assertions.assertTrue(checkBEHeartbeat(Env.getCurrentSystemInfo().getBackendsByTag(Tag.DEFAULT_BACKEND_TAG)));
+
+ // temporary delete replica 1
+ tablet.deleteReplica(tablet.getReplicas().get(1));
+ Assertions.assertFalse(tablet.getBackendIds().contains(deleteBeId));
+ Replica replica = new Replica(1234567890L, deleteBeId,
Replica.ReplicaState.NORMAL, 8L, 0);
+ // add a `error` replica on other backend
+ tablet.addReplica(replica);
+ // colocate don't relocate because no be dead
+ ColocateTableCheckerAndBalancer.getInstance().runAfterCatalogReady();
+ // first repair the replica on deleteBeId, then add a new replica on
the located backend,
+ // then drop the replica on deleteBeId
+ checkTabletStatus(tablet, TabletStatus.VERSION_INCOMPLETE, table,
partition);
+ doRepair();
+ checkTabletIsHealth(tablet, table, partition);
+ Assertions.assertFalse(tablet.getBackendIds().contains(deleteBeId));
+
+ Backend decommissionBe =
Env.getCurrentSystemInfo().getBackend(tablet.getReplicas().get(0).getBackendId());
+ decommissionBe.setDecommissioned(true);
+ // 1 replica's backend is decommission
+ ColocateTableCheckerAndBalancer.getInstance().runAfterCatalogReady();
+ checkTabletStatus(tablet, TabletStatus.COLOCATE_MISMATCH, table,
partition);
+ doRepair();
+ checkTabletIsHealth(tablet, table, partition);
+ decommissionBe.setDecommissioned(false);
+
+ shutdownBackends(Lists.newArrayList(tablet.getBackendIds()));
+ doRepair();
+ // all replica dead
+ checkTabletStatus(tablet, TabletStatus.UNRECOVERABLE, table,
partition);
+ Assertions.assertEquals(0,
Env.getCurrentEnv().getTabletScheduler().getPendingNum());
+
+ dropTable(table.getName(), true);
+ }
+}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java
b/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java
index 8916cc44e91..a8d72eada36 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java
@@ -98,7 +98,7 @@ public class DecommissionBackendTest extends
TestWithFeService {
}
Assertions.assertNotNull(srcBackend);
- String decommissionStmtStr = "alter system decommission backend
\"127.0.0.1:" + srcBackend.getHeartbeatPort() + "\"";
+ String decommissionStmtStr = "alter system decommission backend \"" +
srcBackend.getAddress() + "\"";
AlterSystemStmt decommissionStmt = (AlterSystemStmt)
parseAndAnalyzeStmt(decommissionStmtStr);
Env.getCurrentEnv().getAlterInstance().processAlterCluster(decommissionStmt);
@@ -213,7 +213,7 @@ public class DecommissionBackendTest extends
TestWithFeService {
dropTable("db3.tbl1", false);
// 6. execute decommission
- String decommissionStmtStr = "alter system decommission backend
\"127.0.0.1:" + srcBackend.getHeartbeatPort() + "\"";
+ String decommissionStmtStr = "alter system decommission backend \"" +
srcBackend.getAddress() + "\"";
AlterSystemStmt decommissionStmt = (AlterSystemStmt)
parseAndAnalyzeStmt(decommissionStmtStr);
Env.getCurrentEnv().getAlterInstance().processAlterCluster(decommissionStmt);
Assertions.assertTrue(srcBackend.isDecommissioned());
@@ -313,8 +313,7 @@ public class DecommissionBackendTest extends
TestWithFeService {
// 4. query tablet num
int tabletNum =
Env.getCurrentInvertedIndex().getTabletMetaMap().size();
- String decommissionStmtStr = "alter system decommission backend
\"127.0.0.1:"
- + srcBackend.getHeartbeatPort() + "\"";
+ String decommissionStmtStr = "alter system decommission backend \"" +
srcBackend.getAddress() + "\"";
AlterSystemStmt decommissionStmt = (AlterSystemStmt)
parseAndAnalyzeStmt(decommissionStmtStr);
Env.getCurrentEnv().getAlterInstance().processAlterCluster(decommissionStmt);
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
index 063ab21d8bc..510b2ba00eb 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
@@ -412,7 +412,7 @@ public abstract class TestWithFeService {
protected void createDorisCluster()
throws InterruptedException, NotInitException, IOException,
DdlException, EnvVarNotSetException,
FeStartException {
- createDorisCluster(runningDir, backendNum());
+ createDorisClusterWithMultiTag(runningDir, backendNum());
}
protected void createDorisCluster(String runningDir, int backendNum)
@@ -425,26 +425,13 @@ public abstract class TestWithFeService {
bes.add(createBackend("127.0.0.1", feRpcPort));
}
System.out.println("after create backend");
- checkBEHeartbeat(bes);
+ if (!checkBEHeartbeat(bes)) {
+ System.out.println("Some backends dead, all backends: " + bes);
+ }
// Thread.sleep(2000);
System.out.println("after create backend2");
}
- private void checkBEHeartbeat(List<Backend> bes) throws
InterruptedException {
- int maxTry = Config.heartbeat_interval_second + 2;
- boolean allAlive = false;
- while (maxTry-- > 0 && !allAlive) {
- Thread.sleep(1000);
- boolean hasDead = false;
- for (Backend be : bes) {
- if (!be.isAlive()) {
- hasDead = true;
- }
- }
- allAlive = !hasDead;
- }
- }
-
// Create multi backends with different host for unit test.
// the host of BE will be "127.0.0.1", "127.0.0.2"
protected void createDorisClusterWithMultiTag(String runningDir, int
backendNum)
@@ -452,14 +439,45 @@ public abstract class TestWithFeService {
InterruptedException {
// set runningUnitTest to true, so that for ut, the agent task will be
send to "127.0.0.1"
// to make cluster running well.
- FeConstants.runningUnitTest = true;
+ if (backendNum > 1) {
+ FeConstants.runningUnitTest = true;
+ }
int feRpcPort = startFEServer(runningDir);
List<Backend> bes = Lists.newArrayList();
+ System.out.println("start create backend, backend num " + backendNum);
for (int i = 0; i < backendNum; i++) {
String host = "127.0.0." + (i + 1);
bes.add(createBackend(host, feRpcPort));
}
- checkBEHeartbeat(bes);
+ System.out.println("after create backend");
+ if (!checkBEHeartbeat(bes)) {
+ System.out.println("Some backends dead, all backends: " + bes);
+ }
+ System.out.println("after create backend2");
+ }
+
+ protected boolean checkBEHeartbeat(List<Backend> bes) {
+ return checkBEHeartbeatStatus(bes, true);
+ }
+
+ protected boolean checkBELostHeartbeat(List<Backend> bes) {
+ return checkBEHeartbeatStatus(bes, false);
+ }
+
+ private boolean checkBEHeartbeatStatus(List<Backend> bes, boolean isAlive)
{
+ int maxTry = Config.heartbeat_interval_second + 2;
+ while (maxTry-- > 0) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ // no exception
+ }
+ if (bes.stream().allMatch(be -> be.isAlive() == isAlive)) {
+ return true;
+ }
+ }
+
+ return false;
}
protected Backend addNewBackend() throws IOException, InterruptedException
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]