This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new be9288352a9 [fix][cloud] Prevent fe CloudReplica.getBackendIdImpl
consuming too much CPU when high-frequency stream loads (#48564)
be9288352a9 is described below
commit be9288352a968432162cdd415b8644ce7f866a72
Author: deardeng <[email protected]>
AuthorDate: Thu Mar 13 20:37:10 2025 +0800
[fix][cloud] Prevent fe CloudReplica.getBackendIdImpl consuming too much
CPU when high-frequency stream loads (#48564)
---
.../java/org/apache/doris/backup/RestoreJob.java | 5 +--
.../apache/doris/catalog/ColocateTableIndex.java | 17 ++++++++-
.../org/apache/doris/catalog/MetadataViewer.java | 21 ++++++-----
.../org/apache/doris/clone/DiskRebalancer.java | 8 ++---
.../apache/doris/clone/PartitionRebalancer.java | 5 +--
.../org/apache/doris/clone/TabletSchedCtx.java | 42 ++++++++++++----------
.../org/apache/doris/clone/TabletScheduler.java | 18 +++++-----
.../apache/doris/cloud/catalog/CloudReplica.java | 4 +--
.../apache/doris/common/proc/ReplicasProcNode.java | 5 +--
.../apache/doris/common/proc/TabletsProcDir.java | 7 ++--
.../main/java/org/apache/doris/load/DeleteJob.java | 2 +-
.../org/apache/doris/planner/OlapScanNode.java | 7 ++--
.../doris/transaction/DatabaseTransactionMgr.java | 5 +--
13 files changed, 89 insertions(+), 57 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
index 2e5ea7977be..19f01378ed8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
@@ -1254,7 +1254,8 @@ public class RestoreJob extends AbstractJob implements
GsonPostProcessable {
boolean isRestoreTask = true;
// We don't care the visible version in restore job, the
end version is used.
long visibleVersion = -1L;
- SnapshotTask task = new SnapshotTask(null,
replica.getBackendIdWithoutException(),
+ long beId = replica.getBackendIdWithoutException();
+ SnapshotTask task = new SnapshotTask(null, beId,
signature, jobId, db.getId(),
tbl.getId(), part.getId(), index.getId(),
tablet.getId(), visibleVersion,
tbl.getSchemaHashByIndexId(index.getId()),
timeoutMs, isRestoreTask);
@@ -1263,7 +1264,7 @@ public class RestoreJob extends AbstractJob implements
GsonPostProcessable {
}
batchTask.addTask(task);
unfinishedSignatureToId.put(signature, tablet.getId());
- bePathsMap.put(replica.getBackendIdWithoutException(),
replica.getPathHash());
+ bePathsMap.put(beId, replica.getPathHash());
} finally {
tbl.readUnlock();
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateTableIndex.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateTableIndex.java
index dae95698807..fcdd7042160 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateTableIndex.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateTableIndex.java
@@ -176,7 +176,7 @@ public class ColocateTableIndex implements Writable {
private Multimap<GroupId, Long> group2Tables = ArrayListMultimap.create();
// table_id -> group_id
@SerializedName(value = "table2Group")
- private Map<Long, GroupId> table2Group = Maps.newHashMap();
+ private Map<Long, GroupId> table2Group = Maps.newConcurrentMap();
// group id -> group schema
@SerializedName(value = "group2Schema")
private Map<GroupId, ColocateGroupSchema> group2Schema = Maps.newHashMap();
@@ -385,6 +385,13 @@ public class ColocateTableIndex implements Writable {
}
}
+ // ATTN: in cloud, CloudReplica.getBackendIdImpl has some logic,
+ // If the FE concurrency is high, the CPU may be fully loaded, so try not
to lock it here
+ // table2Group is ConcurrentHashMap
+ public boolean isColocateTableNoLock(long tableId) {
+ return table2Group.containsKey(tableId);
+ }
+
public boolean isColocateTable(long tableId) {
readLock();
try {
@@ -424,6 +431,14 @@ public class ColocateTableIndex implements Writable {
}
}
+ // ATTN: in cloud, CloudReplica.getBackendIdImpl has some logic,
+ // If the FE concurrency is high, the CPU may be fully loaded, so try not
to lock it here
+ // table2Group is ConcurrentHashMap
+ public GroupId getGroupNoLock(long tableId) {
+ Preconditions.checkState(table2Group.containsKey(tableId));
+ return table2Group.get(tableId);
+ }
+
public GroupId getGroup(long tableId) {
readLock();
try {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/MetadataViewer.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/MetadataViewer.java
index 61853bea3d4..3d557c13e49 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/MetadataViewer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/MetadataViewer.java
@@ -90,7 +90,8 @@ public class MetadataViewer {
List<String> row = Lists.newArrayList();
ReplicaStatus status = ReplicaStatus.OK;
- Backend be =
infoService.getBackend(replica.getBackendIdWithoutException());
+ long beId = replica.getBackendIdWithoutException();
+ Backend be = infoService.getBackend(beId);
if (be == null || !be.isAlive() ||
replica.isBad()) {
status = ReplicaStatus.DEAD;
} else if (replica.getVersion() < visibleVersion
@@ -109,7 +110,7 @@ public class MetadataViewer {
row.add(String.valueOf(tabletId));
row.add(String.valueOf(replica.getId()));
-
row.add(String.valueOf(replica.getBackendIdWithoutException()));
+ row.add(String.valueOf(beId));
row.add(String.valueOf(replica.getVersion()));
row.add(String.valueOf(replica.getLastFailedVersion()));
row.add(String.valueOf(replica.getLastSuccessVersion()));
@@ -197,7 +198,8 @@ public class MetadataViewer {
List<String> row = Lists.newArrayList();
ReplicaStatus status = ReplicaStatus.OK;
- Backend be =
infoService.getBackend(replica.getBackendIdWithoutException());
+ long beId = replica.getBackendIdWithoutException();
+ Backend be = infoService.getBackend(beId);
if (be == null || !be.isAlive() ||
replica.isBad()) {
status = ReplicaStatus.DEAD;
} else if (replica.getVersion() < visibleVersion
@@ -216,7 +218,7 @@ public class MetadataViewer {
row.add(String.valueOf(tabletId));
row.add(String.valueOf(replica.getId()));
-
row.add(String.valueOf(replica.getBackendIdWithoutException()));
+ row.add(String.valueOf(beId));
row.add(String.valueOf(replica.getVersion()));
row.add(String.valueOf(replica.getLastFailedVersion()));
row.add(String.valueOf(replica.getLastSuccessVersion()));
@@ -338,13 +340,14 @@ public class MetadataViewer {
for (MaterializedIndex index :
partition.getMaterializedIndices(IndexExtState.VISIBLE)) {
for (Tablet tablet : index.getTablets()) {
for (Replica replica : tablet.getReplicas()) {
- if
(!countMap.containsKey(replica.getBackendIdWithoutException())) {
+ long beId = replica.getBackendIdWithoutException();
+ if (!countMap.containsKey(beId)) {
continue;
}
-
countMap.put(replica.getBackendIdWithoutException(),
-
countMap.get(replica.getBackendIdWithoutException()) + 1);
- sizeMap.put(replica.getBackendIdWithoutException(),
-
sizeMap.get(replica.getBackendIdWithoutException()) + replica.getDataSize());
+ countMap.put(beId,
+ countMap.get(beId) + 1);
+ sizeMap.put(beId,
+ sizeMap.get(beId) + replica.getDataSize());
totalReplicaNum++;
totalReplicaSize += replica.getDataSize();
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java
b/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java
index c66a28a39cf..6fc413a861c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java
@@ -313,12 +313,12 @@ public class DiskRebalancer extends Rebalancer {
if (replica.getDataSize() == 0) {
throw new SchedException(Status.UNRECOVERABLE,
SubCode.DIAGNOSE_IGNORE, "size of src replica is zero");
}
-
+ long beId = replica.getBackendIdWithoutException();
// check src slot
- PathSlot slot =
backendsWorkingSlots.get(replica.getBackendIdWithoutException());
+ PathSlot slot = backendsWorkingSlots.get(beId);
if (slot == null) {
if (LOG.isDebugEnabled()) {
- LOG.debug("BE does not have slot: {}",
replica.getBackendIdWithoutException());
+ LOG.debug("BE does not have slot: {}", beId);
}
throw new SchedException(Status.UNRECOVERABLE, "unable to take src
slot");
}
@@ -329,7 +329,7 @@ public class DiskRebalancer extends Rebalancer {
// after take src slot, we can set src replica now
tabletCtx.setSrc(replica);
- BackendLoadStatistic beStat =
clusterStat.getBackendLoadStatistic(replica.getBackendIdWithoutException());
+ BackendLoadStatistic beStat =
clusterStat.getBackendLoadStatistic(beId);
if (!beStat.isAvailable()) {
throw new SchedException(Status.UNRECOVERABLE, "the backend is not
available");
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java
b/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java
index 657523123ad..30a7a76b920 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java
@@ -284,9 +284,10 @@ public class PartitionRebalancer extends Rebalancer {
// Check src replica's validation
Replica srcReplica =
tabletCtx.getTablet().getReplicaByBackendId(move.fromBe);
Preconditions.checkNotNull(srcReplica);
- TabletScheduler.PathSlot slot =
backendsWorkingSlots.get(srcReplica.getBackendIdWithoutException());
+ long beId = srcReplica.getBackendIdWithoutException();
+ TabletScheduler.PathSlot slot = backendsWorkingSlots.get(beId);
Preconditions.checkNotNull(slot, "unable to get fromBe "
- + srcReplica.getBackendIdWithoutException() + " slot");
+ + beId + " slot");
if (slot.takeBalanceSlot(srcReplica.getPathHash()) != -1) {
tabletCtx.setSrc(srcReplica);
} else {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
index b8a098cc891..11d134cabc2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
@@ -511,27 +511,28 @@ public class TabletSchedCtx implements
Comparable<TabletSchedCtx> {
}
String host = backend.getHost();
for (Replica replica : tablet.getReplicas()) {
- Backend be =
infoService.getBackend(replica.getBackendIdWithoutException());
+ long replicaBeId = replica.getBackendIdWithoutException();
+ Backend be = infoService.getBackend(beId);
if (be == null) {
// BE has been dropped, skip it
if (LOG.isDebugEnabled()) {
LOG.debug("replica's backend {} does not exist, skip.
tablet: {}",
- replica.getBackendIdWithoutException(), tabletId);
+ replicaBeId, tabletId);
}
continue;
}
if (!Config.allow_replica_on_same_host &&
!FeConstants.runningUnitTest && host.equals(be.getHost())) {
if (LOG.isDebugEnabled()) {
LOG.debug("replica's backend {} is on same host {}, skip.
tablet: {}",
- replica.getBackendIdWithoutException(), host,
tabletId);
+ replicaBeId, host, tabletId);
}
return true;
}
- if (replica.getBackendIdWithoutException() == beId) {
+ if (replicaBeId == beId) {
if (LOG.isDebugEnabled()) {
LOG.debug("replica's backend {} is same as dest backend
{}, skip. tablet: {}",
- replica.getBackendIdWithoutException(), beId,
tabletId);
+ replicaBeId, beId, tabletId);
}
return true;
}
@@ -587,10 +588,11 @@ public class TabletSchedCtx implements
Comparable<TabletSchedCtx> {
*/
List<Replica> candidates = Lists.newArrayList();
for (Replica replica : tablet.getReplicas()) {
- if (exceptBeId != -1 && replica.getBackendIdWithoutException() ==
exceptBeId) {
+ long replicaBeId = replica.getBackendIdWithoutException();
+ if (exceptBeId != -1 && replicaBeId == exceptBeId) {
if (LOG.isDebugEnabled()) {
LOG.debug("replica's backend {} is same as except backend
{}, skip. tablet: {}",
- replica.getBackendIdWithoutException(),
exceptBeId, tabletId);
+ replicaBeId, exceptBeId, tabletId);
}
continue;
}
@@ -603,12 +605,12 @@ public class TabletSchedCtx implements
Comparable<TabletSchedCtx> {
continue;
}
- Backend be =
infoService.getBackend(replica.getBackendIdWithoutException());
+ Backend be = infoService.getBackend(replicaBeId);
if (be == null || !be.isAlive()) {
// backend which is in decommission can still be the source
backend
if (LOG.isDebugEnabled()) {
LOG.debug("replica's backend {} does not exist or is not
alive, skip. tablet: {}",
- replica.getBackendIdWithoutException(), tabletId);
+ replicaBeId, tabletId);
}
continue;
}
@@ -640,11 +642,12 @@ public class TabletSchedCtx implements
Comparable<TabletSchedCtx> {
// sort replica by version count asc, so that we prefer to choose
replicas with fewer versions
Collections.sort(candidates, VERSION_COUNTER_COMPARATOR);
for (Replica srcReplica : candidates) {
- PathSlot slot =
backendsWorkingSlots.get(srcReplica.getBackendIdWithoutException());
+ long replicaBeId = srcReplica.getBackendIdWithoutException();
+ PathSlot slot = backendsWorkingSlots.get(replicaBeId);
if (slot == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("replica's backend {} does not have working
slot, skip. tablet: {}",
- srcReplica.getBackendIdWithoutException(),
tabletId);
+ replicaBeId, tabletId);
}
continue;
}
@@ -653,7 +656,7 @@ public class TabletSchedCtx implements
Comparable<TabletSchedCtx> {
if (srcPathHash == -1) {
if (LOG.isDebugEnabled()) {
LOG.debug("replica's backend {} does not have available
slot, skip. tablet: {}",
- srcReplica.getBackendIdWithoutException(),
tabletId);
+ replicaBeId, tabletId);
}
continue;
}
@@ -701,10 +704,11 @@ public class TabletSchedCtx implements
Comparable<TabletSchedCtx> {
}
if (!replica.isScheduleAvailable()) {
- if
(Env.getCurrentSystemInfo().checkBackendScheduleAvailable(replica.getBackendIdWithoutException()))
{
+ long replicaBeId = replica.getBackendIdWithoutException();
+ if
(Env.getCurrentSystemInfo().checkBackendScheduleAvailable(replicaBeId)) {
if (LOG.isDebugEnabled()) {
LOG.debug("replica's backend {} does not exist or is
not scheduler available, skip. tablet: {}",
- replica.getBackendIdWithoutException(),
tabletId);
+ replicaBeId, tabletId);
}
} else {
if (LOG.isDebugEnabled()) {
@@ -816,6 +820,7 @@ public class TabletSchedCtx implements
Comparable<TabletSchedCtx> {
throw new SchedException(Status.SCHEDULE_FAILED,
SubCode.WAITING_SLOT,
"unable to take slot of dest path");
}
+ long chosenReplicaBeId = chosenReplica.getBackendIdWithoutException();
if (chosenReplica.getState() == ReplicaState.DECOMMISSION) {
// Since this replica is selected as the repair object of
VERSION_INCOMPLETE,
@@ -838,9 +843,9 @@ public class TabletSchedCtx implements
Comparable<TabletSchedCtx> {
setDecommissionTime(-1);
LOG.info("choose replica {} on backend {} of tablet {} as dest
replica for version incomplete,"
+ " and change state from DECOMMISSION to NORMAL",
- chosenReplica.getId(),
chosenReplica.getBackendIdWithoutException(), tabletId);
+ chosenReplica.getId(), chosenReplicaBeId, tabletId);
}
- setDest(chosenReplica.getBackendIdWithoutException(),
chosenReplica.getPathHash());
+ setDest(chosenReplicaBeId, chosenReplica.getPathHash());
}
private boolean checkFurtherRepairFinish(Replica replica, long version) {
@@ -974,10 +979,11 @@ public class TabletSchedCtx implements
Comparable<TabletSchedCtx> {
// database lock should be held.
public CloneTask createCloneReplicaAndTask() throws SchedException {
- Backend srcBe =
infoService.getBackend(srcReplica.getBackendIdWithoutException());
+ long beId = srcReplica.getBackendIdWithoutException();
+ Backend srcBe = infoService.getBackend(beId);
if (srcBe == null) {
throw new SchedException(Status.SCHEDULE_FAILED,
- "src backend " + srcReplica.getBackendIdWithoutException() + "
does not exist");
+ "src backend " + beId + " does not exist");
}
Backend destBe = infoService.getBackend(destBackendId);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
index da13d5c61c5..d2413552b8d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
@@ -1107,8 +1107,9 @@ public class TabletScheduler extends MasterDaemon {
double maxScore = 0;
long debugHighBeId =
DebugPointUtil.getDebugParamOrDefault("FE.HIGH_LOAD_BE_ID", -1L);
for (Replica replica : replicas) {
+ long beId = replica.getBackendIdWithoutException();
BackendLoadStatistic beStatistic = statistic
-
.getBackendLoadStatistic(replica.getBackendIdWithoutException());
+ .getBackendLoadStatistic(beId);
if (beStatistic == null) {
continue;
}
@@ -1132,7 +1133,7 @@ public class TabletScheduler extends MasterDaemon {
chosenReplica = replica;
}
- if (debugHighBeId > 0 && replica.getBackendIdWithoutException() ==
debugHighBeId) {
+ if (debugHighBeId > 0 && beId == debugHighBeId) {
chosenReplica = replica;
break;
}
@@ -1219,6 +1220,7 @@ public class TabletScheduler extends MasterDaemon {
* If all are finished, which means this replica is
* safe to be deleted.
*/
+ long beId = replica.getBackendIdWithoutException();
if (!force && !Config.enable_force_drop_redundant_replica
&& !FeConstants.runningUnitTest
&& (replica.getState().canLoad() || replica.getState() ==
ReplicaState.DECOMMISSION)) {
@@ -1228,7 +1230,7 @@ public class TabletScheduler extends MasterDaemon {
// Remain it as VERY_HIGH may block other task.
tabletCtx.setPriority(Priority.NORMAL);
LOG.info("set replica {} on backend {} of tablet {} state to
DECOMMISSION due to reason {}",
- replica.getId(),
replica.getBackendIdWithoutException(), tabletCtx.getTabletId(), reason);
+ replica.getId(), beId, tabletCtx.getTabletId(),
reason);
}
try {
long preWatermarkTxnId = replica.getPreWatermarkTxnId();
@@ -1237,7 +1239,7 @@ public class TabletScheduler extends MasterDaemon {
.getTransactionIDGenerator().getNextTransactionId();
replica.setPreWatermarkTxnId(preWatermarkTxnId);
LOG.info("set decommission replica {} on backend {} of
tablet {} pre watermark txn id {}",
- replica.getId(), replica.getBackendId(),
tabletCtx.getTabletId(), preWatermarkTxnId);
+ replica.getId(), beId, tabletCtx.getTabletId(),
preWatermarkTxnId);
}
long postWatermarkTxnId = replica.getPostWatermarkTxnId();
@@ -1251,7 +1253,7 @@ public class TabletScheduler extends MasterDaemon {
replica.setPostWatermarkTxnId(postWatermarkTxnId);
LOG.info("set decommission replica {} on backend {} of
tablet {} post watermark txn id {}",
- replica.getId(), replica.getBackendId(),
tabletCtx.getTabletId(), postWatermarkTxnId);
+ replica.getId(), beId, tabletCtx.getTabletId(),
postWatermarkTxnId);
}
if
(!Env.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(postWatermarkTxnId,
@@ -1276,7 +1278,7 @@ public class TabletScheduler extends MasterDaemon {
// NOTICE: only delete the replica from meta may not work.
sometimes we can depend on tablet report
// deleting these replicas, but in FORCE_REDUNDANT case, replica
may be added to meta again in report
// process.
- sendDeleteReplicaTask(replica.getBackendIdWithoutException(),
tabletCtx.getTabletId(), replica.getId(),
+ sendDeleteReplicaTask(beId, tabletCtx.getTabletId(),
replica.getId(),
tabletCtx.getSchemaHash());
}
@@ -1286,12 +1288,12 @@ public class TabletScheduler extends MasterDaemon {
tabletCtx.getPartitionId(),
tabletCtx.getIndexId(),
tabletCtx.getTabletId(),
- replica.getBackendIdWithoutException());
+ beId);
Env.getCurrentEnv().getEditLog().logDeleteReplica(info);
LOG.info("delete replica. tablet id: {}, backend id: {}. reason: {},
force: {}",
- tabletCtx.getTabletId(),
replica.getBackendIdWithoutException(), reason, force);
+ tabletCtx.getTabletId(), beId, reason, force);
}
private void sendDeleteReplicaTask(long backendId, long tabletId, long
replicaId, int schemaHash) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java
index 12f27eae3d0..f27b9fdf0cc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java
@@ -94,7 +94,7 @@ public class CloudReplica extends Replica {
}
private boolean isColocated() {
- return Env.getCurrentColocateIndex().isColocateTable(tableId);
+ return Env.getCurrentColocateIndex().isColocateTableNoLock(tableId);
}
public long getColocatedBeId(String clusterId) throws
ComputeGroupException {
@@ -130,7 +130,7 @@ public class CloudReplica extends Replica {
ComputeGroupException.FailedTypeEnum.COMPUTE_GROUPS_NO_ALIVE_BE);
}
- GroupId groupId = Env.getCurrentColocateIndex().getGroup(tableId);
+ GroupId groupId =
Env.getCurrentColocateIndex().getGroupNoLock(tableId);
HashCode hashCode = Hashing.murmur3_128().hashLong(groupId.grpId);
if (availableBes.size() != bes.size()) {
// some be is dead recently, still hash tablets on all backends.
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/ReplicasProcNode.java
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/ReplicasProcNode.java
index d7958f75504..3d8d1c46fa0 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/ReplicasProcNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/ReplicasProcNode.java
@@ -91,7 +91,8 @@ public class ReplicasProcNode implements ProcNodeInterface {
}
for (Replica replica : replicas) {
- Backend be =
backendMap.get(replica.getBackendIdWithoutException());
+ long beId = replica.getBackendIdWithoutException();
+ Backend be = backendMap.get(beId);
String host = (be == null ? Backend.DUMMY_IP : be.getHost());
int port = (be == null ? 0 : be.getHttpPort());
String hostPort = NetUtils.getHostPortInAccessibleFormat(host,
port);
@@ -117,7 +118,7 @@ public class ReplicasProcNode implements ProcNodeInterface {
queryHits =
QueryStatsUtil.getMergedReplicaStats(replica.getId());
}
List<String> replicaInfo =
Lists.newArrayList(String.valueOf(replica.getId()),
- String.valueOf(replica.getBackendIdWithoutException()),
+ String.valueOf(beId),
String.valueOf(replica.getVersion()),
String.valueOf(replica.getLastSuccessVersion()),
String.valueOf(replica.getLastFailedVersion()),
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletsProcDir.java
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletsProcDir.java
index 8eca5f84faa..2a8aa7e35b5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletsProcDir.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletsProcDir.java
@@ -141,8 +141,9 @@ public class TabletsProcDir implements ProcDirInterface {
tabletInfos.add(tabletInfo);
} else {
for (Replica replica : tablet.getReplicas()) {
+ long beId = replica.getBackendIdWithoutException();
if ((version > -1 && replica.getVersion() != version)
- || (backendId > -1 &&
replica.getBackendIdWithoutException() != backendId)
+ || (backendId > -1 && beId != backendId)
|| (state != null && replica.getState() !=
state)) {
continue;
}
@@ -150,7 +151,7 @@ public class TabletsProcDir implements ProcDirInterface {
// tabletId -- replicaId -- backendId -- version --
dataSize -- rowCount -- state
tabletInfo.add(tabletId);
tabletInfo.add(replica.getId());
- tabletInfo.add(replica.getBackendIdWithoutException());
+ tabletInfo.add(beId);
tabletInfo.add(replica.getSchemaHash());
tabletInfo.add(replica.getVersion());
tabletInfo.add(replica.getLastSuccessVersion());
@@ -168,7 +169,7 @@ public class TabletsProcDir implements ProcDirInterface {
tabletInfo.add(replicaIdToQueryHits.getOrDefault(replica.getId(), 0L));
tabletInfo.add(replica.getPathHash());
tabletInfo.add(pathHashToRoot.getOrDefault(replica.getPathHash(), ""));
- Backend be =
backendMap.get(replica.getBackendIdWithoutException());
+ Backend be = backendMap.get(beId);
String host = (be == null ? Backend.DUMMY_IP :
be.getHost());
int port = (be == null ? 0 : be.getHttpPort());
String hostPort =
NetUtils.getHostPortInAccessibleFormat(host, port);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java
index 664896693cb..b83fd878e08 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java
@@ -352,7 +352,7 @@ public class DeleteJob extends
AbstractTxnStateChangeCallback implements DeleteJ
// signature, adding 10 billion to `getNextId`. We are
confident that the old signature
// generated will not exceed this number.
PushTask pushTask = new PushTask(null,
- replica.getBackendId(), targetDb.getId(),
targetTbl.getId(),
+ backendId, targetDb.getId(), targetTbl.getId(),
partition.getId(), indexId,
tabletId, replicaId, schemaHash,
-1, "", -1, 0,
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
index 33965e30a73..5a1aaec5085 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
@@ -838,11 +838,12 @@ public class OlapScanNode extends ScanNode {
replicas.sort(Replica.ID_COMPARATOR);
Replica replica = replicas.get(useFixReplica >=
replicas.size() ? replicas.size() - 1 : useFixReplica);
if
(context.getSessionVariable().fallbackOtherReplicaWhenFixedCorrupt) {
- Backend backend =
Env.getCurrentSystemInfo().getBackend(replica.getBackendId());
+ long beId = replica.getBackendId();
+ Backend backend =
Env.getCurrentSystemInfo().getBackend(beId);
// If the fixed replica is bad, then not clear the
replicas using random replica
if (backend == null || !backend.isAlive()) {
if (LOG.isDebugEnabled()) {
- LOG.debug("backend {} not exists or is not alive
for replica {}", replica.getBackendId(),
+ LOG.debug("backend {} not exists or is not alive
for replica {}", beId,
replica.getId());
}
Collections.shuffle(replicas);
@@ -928,7 +929,7 @@ public class OlapScanNode extends ScanNode {
String ip = backend.getHost();
int port = backend.getBePort();
TScanRangeLocation scanRangeLocation = new
TScanRangeLocation(new TNetworkAddress(ip, port));
- scanRangeLocation.setBackendId(replica.getBackendId());
+ scanRangeLocation.setBackendId(backendId);
locations.addToLocations(scanRangeLocation);
paloRange.addToHosts(new TNetworkAddress(ip, port));
tabletIsNull = false;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
index cc2a4b1a90f..59d5bc571f0 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
@@ -2260,10 +2260,11 @@ public class DatabaseTransactionMgr {
if (newVersion == Partition.PARTITION_INIT_VERSION
+ 1) {
index.setRowCountReported(false);
}
- Set<Long> partitionIds =
backendPartitions.get(replica.getBackendIdWithoutException());
+ long beId = replica.getBackendIdWithoutException();
+ Set<Long> partitionIds =
backendPartitions.get(beId);
if (partitionIds == null) {
partitionIds = Sets.newHashSet();
-
backendPartitions.put(replica.getBackendIdWithoutException(), partitionIds);
+ backendPartitions.put(beId, partitionIds);
}
partitionIds.add(partitionId);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]