This is an automated email from the ASF dual-hosted git repository.
w41ter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 40b3d58e084 [improvement](cloud) Accelerate cloud rebalance by batch
editlog (#37787)
40b3d58e084 is described below
commit 40b3d58e0840025e9d708da5b2daf4aaea3c883c
Author: deardeng <[email protected]>
AuthorDate: Fri Jul 19 11:29:58 2024 +0800
[improvement](cloud) Accelerate cloud rebalance by batch editlog (#37787)
1. use `JournalBatch` to batch editlogs
2. same partition, different tablets use one editlog
env:
in docker cloud mode, 3fe 3be.
3be expansion to 4be, trigger cloud rebalance
table, 1860 partitions, 48 buckets, every rebalance loop min balance 12
and close pre cache
result:
```
before improvement
2024-07-16 16:51:01,371 INFO (cloud tablet rebalancer|77)
[CloudTabletRebalancer.runAfterCatalogReady():228]
finished to rebalancer. cost: 58471 ms
after imprevement
2024-07-16 17:10:20,699 INFO (cloud tablet rebalancer|77)
[CloudTabletRebalancer.runAfterCatalogReady():235]
finished to rebalancer. cost: 28687 ms
```
---
.../analysis/ShowReplicaDistributionStmt.java | 6 +-
.../apache/doris/cloud/catalog/CloudReplica.java | 6 +-
.../doris/cloud/catalog/CloudTabletRebalancer.java | 158 +++++++++++++++++---
.../cloud/datasource/CloudInternalCatalog.java | 7 +-
.../cloud/persist/UpdateCloudReplicaInfo.java | 53 ++-----
.../org/apache/doris/journal/JournalBatch.java | 6 +
.../java/org/apache/doris/persist/EditLog.java | 24 ++-
.../org/apache/doris/regression/suite/Suite.groovy | 11 ++
.../cloud_p0/multi_cluster/test_rebalance.groovy | 165 +++++++++++++++++++++
9 files changed, 372 insertions(+), 64 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowReplicaDistributionStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowReplicaDistributionStmt.java
index 6d598be727a..58d2ac1052f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowReplicaDistributionStmt.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowReplicaDistributionStmt.java
@@ -82,6 +82,10 @@ public class ShowReplicaDistributionStmt extends ShowStmt {
@Override
public RedirectStatus getRedirectStatus() {
- return RedirectStatus.FORWARD_NO_SYNC;
+ if (ConnectContext.get().getSessionVariable().getForwardToMaster()) {
+ return RedirectStatus.FORWARD_NO_SYNC;
+ } else {
+ return RedirectStatus.NO_FORWARD;
+ }
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java
index d2e173dfd53..be0c510559e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java
@@ -25,6 +25,7 @@ import org.apache.doris.cloud.system.CloudSystemInfoService;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.io.Text;
+import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.system.Backend;
@@ -221,7 +222,10 @@ public class CloudReplica extends Replica {
return backendId;
}
}
-
+ if
(DebugPointUtil.isEnable("CloudReplica.getBackendIdImpl.clusterToBackends")) {
+ LOG.info("Debug Point enable
CloudReplica.getBackendIdImpl.clusterToBackends");
+ return -1;
+ }
return hashReplicaToBe(clusterId, false);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
index 30149de0d56..73ddbe4c455 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
@@ -45,6 +45,7 @@ import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TWarmUpCacheAsyncRequest;
import org.apache.doris.thrift.TWarmUpCacheAsyncResponse;
+import com.google.common.base.Preconditions;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -57,6 +58,7 @@ import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.stream.Collectors;
public class CloudTabletRebalancer extends MasterDaemon {
private static final Logger LOG =
LogManager.getLogger(CloudTabletRebalancer.class);
@@ -181,8 +183,13 @@ public class CloudTabletRebalancer extends MasterDaemon {
// 2 complete route info
replicaInfos = new ArrayList<UpdateCloudReplicaInfo>();
completeRouteInfo();
- for (UpdateCloudReplicaInfo info : replicaInfos) {
- Env.getCurrentEnv().getEditLog().logUpdateCloudReplica(info);
+ LOG.info("collect to editlog route {} infos", replicaInfos.size());
+ try {
+
Env.getCurrentEnv().getEditLog().logUpdateCloudReplicas(replicaInfos);
+ } catch (Exception e) {
+ LOG.warn("failed to update cloud replicas", e);
+ // edit log failed, try next time
+ return;
}
// 3 check whether the inflight preheating task has been completed
@@ -238,9 +245,20 @@ public class CloudTabletRebalancer extends MasterDaemon {
entry.getKey(), entry.getValue().size());
}
+ List<UpdateCloudReplicaInfo> infos = new ArrayList<>();
// balance in partitions/index
for (Map.Entry<String, List<Long>> entry : clusterToBes.entrySet()) {
- balanceInPartition(entry.getValue(), entry.getKey());
+ balanceInPartition(entry.getValue(), entry.getKey(), infos);
+ }
+ long oldSize = infos.size();
+ infos = batchUpdateCloudReplicaInfoEditlogs(infos);
+ LOG.info("collect to editlog partitions before size={} after size={}
infos", oldSize, infos.size());
+ try {
+ Env.getCurrentEnv().getEditLog().logUpdateCloudReplicas(infos);
+ } catch (Exception e) {
+ LOG.warn("failed to update cloud replicas", e);
+ // edit log failed, try next time
+ return;
}
for (Map.Entry<Long, List<Tablet>> entry :
beToTabletsGlobal.entrySet()) {
@@ -263,9 +281,20 @@ public class CloudTabletRebalancer extends MasterDaemon {
entry.getKey(), entry.getValue().size());
}
+ List<UpdateCloudReplicaInfo> infos = new ArrayList<>();
// balance in partitions/index
for (Map.Entry<String, List<Long>> entry : clusterToBes.entrySet()) {
- balanceInTable(entry.getValue(), entry.getKey());
+ balanceInTable(entry.getValue(), entry.getKey(), infos);
+ }
+ long oldSize = infos.size();
+ infos = batchUpdateCloudReplicaInfoEditlogs(infos);
+ LOG.info("collect to editlog table before size={} after size={}
infos", oldSize, infos.size());
+ try {
+ Env.getCurrentEnv().getEditLog().logUpdateCloudReplicas(infos);
+ } catch (Exception e) {
+ LOG.warn("failed to update cloud replicas", e);
+ // edit log failed, try next time
+ return;
}
for (Map.Entry<Long, List<Tablet>> entry :
beToTabletsGlobal.entrySet()) {
@@ -288,8 +317,19 @@ public class CloudTabletRebalancer extends MasterDaemon {
entry.getKey(), entry.getValue().size());
}
+ List<UpdateCloudReplicaInfo> infos = new ArrayList<>();
for (Map.Entry<String, List<Long>> entry : clusterToBes.entrySet()) {
- balanceImpl(entry.getValue(), entry.getKey(),
futureBeToTabletsGlobal, BalanceType.GLOBAL);
+ balanceImpl(entry.getValue(), entry.getKey(),
futureBeToTabletsGlobal, BalanceType.GLOBAL, infos);
+ }
+ long oldSize = infos.size();
+ infos = batchUpdateCloudReplicaInfoEditlogs(infos);
+ LOG.info("collect to editlog global before size={} after size={}
infos", oldSize, infos.size());
+ try {
+ Env.getCurrentEnv().getEditLog().logUpdateCloudReplicas(infos);
+ } catch (Exception e) {
+ LOG.warn("failed to update cloud replicas", e);
+ // edit log failed, try next time
+ return;
}
for (Map.Entry<Long, List<Tablet>> entry :
beToTabletsGlobal.entrySet()) {
@@ -310,6 +350,7 @@ public class CloudTabletRebalancer extends MasterDaemon {
beToTabletIds.get(entry.getValue().destBe).add(entry.getValue().pickedTablet.getId());
}
+ List<UpdateCloudReplicaInfo> infos = new ArrayList<>();
for (Map.Entry<Long, List<Long>> entry : beToTabletIds.entrySet()) {
LOG.info("before pre cache check dest be {} inflight task num {}",
entry.getKey(), entry.getValue().size());
Backend destBackend =
cloudSystemInfoService.getBackend(entry.getKey());
@@ -335,11 +376,21 @@ public class CloudTabletRebalancer extends MasterDaemon {
if (!result.getValue()) {
LOG.info("{} pre cache timeout, forced to change the
mapping", result.getKey());
}
- updateClusterToBeMap(task.pickedTablet, task.destBe,
task.clusterId);
+ updateClusterToBeMap(task.pickedTablet, task.destBe,
task.clusterId, infos);
tabletToInfightTask.remove(result.getKey());
}
}
}
+ long oldSize = infos.size();
+ infos = batchUpdateCloudReplicaInfoEditlogs(infos);
+ LOG.info("collect to editlog warmup before size={} after size={}
infos", oldSize, infos.size());
+ try {
+ Env.getCurrentEnv().getEditLog().logUpdateCloudReplicas(infos);
+ } catch (Exception e) {
+ LOG.warn("failed to update cloud replicas", e);
+ // edit log failed, try next time
+ return;
+ }
// recalculate inflight beToTablets, just for print the log
beToTabletIds = new HashMap<Long, List<Long>>();
@@ -550,22 +601,22 @@ public class CloudTabletRebalancer extends MasterDaemon {
}
}
- public void balanceInPartition(List<Long> bes, String clusterId) {
+ public void balanceInPartition(List<Long> bes, String clusterId,
List<UpdateCloudReplicaInfo> infos) {
// balance all partition
for (Map.Entry<Long, Map<Long, Map<Long, List<Tablet>>>>
partitionEntry : futurePartitionToTablets.entrySet()) {
Map<Long, Map<Long, List<Tablet>>> indexToTablets =
partitionEntry.getValue();
// balance all index of a partition
for (Map.Entry<Long, Map<Long, List<Tablet>>> entry :
indexToTablets.entrySet()) {
// balance a index
- balanceImpl(bes, clusterId, entry.getValue(),
BalanceType.PARTITION);
+ balanceImpl(bes, clusterId, entry.getValue(),
BalanceType.PARTITION, infos);
}
}
}
- public void balanceInTable(List<Long> bes, String clusterId) {
+ public void balanceInTable(List<Long> bes, String clusterId,
List<UpdateCloudReplicaInfo> infos) {
// balance all tables
for (Map.Entry<Long, Map<Long, List<Tablet>>> entry :
futureBeToTabletsInTable.entrySet()) {
- balanceImpl(bes, clusterId, entry.getValue(), BalanceType.TABLE);
+ balanceImpl(bes, clusterId, entry.getValue(), BalanceType.TABLE,
infos);
}
}
@@ -641,7 +692,8 @@ public class CloudTabletRebalancer extends MasterDaemon {
partToTablets);
}
- private void updateClusterToBeMap(Tablet pickedTablet, long destBe, String
clusterId) {
+ private void updateClusterToBeMap(Tablet pickedTablet, long destBe, String
clusterId,
+ List<UpdateCloudReplicaInfo> infos) {
CloudReplica cloudReplica = (CloudReplica)
pickedTablet.getReplicas().get(0);
cloudReplica.updateClusterToBe(clusterId, destBe);
Database db =
Env.getCurrentInternalCatalog().getDbNullable(cloudReplica.getDbId());
@@ -663,7 +715,7 @@ public class CloudTabletRebalancer extends MasterDaemon {
UpdateCloudReplicaInfo info = new
UpdateCloudReplicaInfo(cloudReplica.getDbId(),
cloudReplica.getTableId(), cloudReplica.getPartitionId(),
cloudReplica.getIndexId(),
pickedTablet.getId(), cloudReplica.getId(), clusterId,
destBe);
- Env.getCurrentEnv().getEditLog().logUpdateCloudReplica(info);
+ infos.add(info);
} finally {
table.readUnlock();
}
@@ -765,7 +817,7 @@ public class CloudTabletRebalancer extends MasterDaemon {
}
private void balanceImpl(List<Long> bes, String clusterId, Map<Long,
List<Tablet>> beToTablets,
- BalanceType balanceType) {
+ BalanceType balanceType, List<UpdateCloudReplicaInfo> infos) {
if (bes == null || bes.isEmpty() || beToTablets == null ||
beToTablets.isEmpty()) {
return;
}
@@ -852,7 +904,7 @@ public class CloudTabletRebalancer extends MasterDaemon {
beToTabletsInTable, partitionToTablets);
updateBeToTablets(pickedTablet, srcBe, destBe, balanceType,
futureBeToTabletsGlobal, futureBeToTabletsInTable,
futurePartitionToTablets);
- updateClusterToBeMap(pickedTablet, destBe, clusterId);
+ updateClusterToBeMap(pickedTablet, destBe, clusterId, infos);
}
}
}
@@ -869,17 +921,16 @@ public class CloudTabletRebalancer extends MasterDaemon {
List<Tablet> tablets = new ArrayList<>();
if (!beToTabletsGlobal.containsKey(srcBe)) {
LOG.info("smooth upgrade srcBe={} does not have any tablets, set
inactive", srcBe);
- // TODO(merge-cloud): wait add cloud upgrade mgr
- //
Env.getCurrentEnv().getCloudUpgradeMgr().setBeStateInactive(srcBe);
+ ((CloudEnv)
Env.getCurrentEnv()).getCloudUpgradeMgr().setBeStateInactive(srcBe);
return;
}
tablets = beToTabletsGlobal.get(srcBe);
if (tablets.isEmpty()) {
LOG.info("smooth upgrade srcBe={} does not have any tablets, set
inactive", srcBe);
- // TODO(merge-cloud): wait add cloud upgrade mgr
- //
Env.getCurrentEnv().getCloudUpgradeMgr().setBeStateInactive(srcBe);
+ ((CloudEnv)
Env.getCurrentEnv()).getCloudUpgradeMgr().setBeStateInactive(srcBe);
return;
}
+ List<UpdateCloudReplicaInfo> infos = new ArrayList<>();
for (Tablet tablet : tablets) {
// get replica
CloudReplica cloudReplica = (CloudReplica)
tablet.getReplicas().get(0);
@@ -915,11 +966,21 @@ public class CloudTabletRebalancer extends MasterDaemon {
UpdateCloudReplicaInfo info = new
UpdateCloudReplicaInfo(cloudReplica.getDbId(),
cloudReplica.getTableId(),
cloudReplica.getPartitionId(), cloudReplica.getIndexId(),
tablet.getId(), cloudReplica.getId(), clusterId,
dstBe);
- Env.getCurrentEnv().getEditLog().logUpdateCloudReplica(info);
+ infos.add(info);
} finally {
table.readUnlock();
}
}
+ long oldSize = infos.size();
+ infos = batchUpdateCloudReplicaInfoEditlogs(infos);
+ LOG.info("collect to editlog migrate before size={} after size={}
infos", oldSize, infos.size());
+ try {
+ Env.getCurrentEnv().getEditLog().logUpdateCloudReplicas(infos);
+ } catch (Exception e) {
+ LOG.warn("update cloud replicas failed", e);
+ // edit log failed, try next time
+ throw new RuntimeException(e);
+ }
try {
((CloudEnv)
Env.getCurrentEnv()).getCloudUpgradeMgr().registerWaterShedTxnId(srcBe);
@@ -928,5 +989,64 @@ public class CloudTabletRebalancer extends MasterDaemon {
throw new RuntimeException(e);
}
}
+
+ private List<UpdateCloudReplicaInfo>
batchUpdateCloudReplicaInfoEditlogs(List<UpdateCloudReplicaInfo> infos) {
+ long start = System.currentTimeMillis();
+ List<UpdateCloudReplicaInfo> rets = new ArrayList<>();
+ // clusterId, infos
+ Map<String, List<UpdateCloudReplicaInfo>> clusterIdToInfos =
infos.stream()
+
.collect(Collectors.groupingBy(UpdateCloudReplicaInfo::getClusterId));
+ for (Map.Entry<String, List<UpdateCloudReplicaInfo>> entry :
clusterIdToInfos.entrySet()) {
+ // same cluster
+ String clusterId = entry.getKey();
+ List<UpdateCloudReplicaInfo> infoList = entry.getValue();
+ Map<Long, List<UpdateCloudReplicaInfo>> sameLocationInfos =
infoList.stream()
+ .collect(Collectors.groupingBy(
+ info -> info.getDbId()
+ + info.getTableId() + info.getPartitionId() +
info.getIndexId()));
+ sameLocationInfos.forEach((location, locationInfos) -> {
+ UpdateCloudReplicaInfo newInfo = new UpdateCloudReplicaInfo();
+ long dbId = -1;
+ long tableId = -1;
+ long partitionId = -1;
+ long indexId = -1;
+ for (UpdateCloudReplicaInfo info : locationInfos) {
+
Preconditions.checkState(clusterId.equals(info.getClusterId()),
+ "impossible, cluster id not eq outer=" + clusterId
+ ", inner=" + info.getClusterId());
+
+ dbId = info.getDbId();
+ tableId = info.getTableId();
+ partitionId = info.getPartitionId();
+ indexId = info.getIndexId();
+
+ StringBuilder sb = new StringBuilder("impossible, some
locations do not match location");
+ sb.append(", location=").append(location).append(",
dbId=").append(dbId)
+ .append(", tableId=").append(tableId).append(",
partitionId=").append(partitionId)
+ .append(", indexId=").append(indexId);
+ Preconditions.checkState(location == dbId + tableId +
partitionId + indexId, sb.toString());
+
+ long tabletId = info.getTabletId();
+ long replicaId = info.getReplicaId();
+ long beId = info.getBeId();
+ newInfo.getTabletIds().add(tabletId);
+ newInfo.getReplicaIds().add(replicaId);
+ newInfo.getBeIds().add(beId);
+ }
+ newInfo.setDbId(dbId);
+ newInfo.setTableId(tableId);
+ newInfo.setPartitionId(partitionId);
+ newInfo.setIndexId(indexId);
+ newInfo.setClusterId(clusterId);
+ // APPR: in unprotectUpdateCloudReplica, use batch must set
tabletId = -1
+ newInfo.setTabletId(-1);
+ rets.add(newInfo);
+ });
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("batchUpdateCloudReplicaInfoEditlogs old size {}, cur
size {} cost {} ms",
+ infos.size(), rets.size(), System.currentTimeMillis() -
start);
+ }
+ return rets;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
index 9d5c2825777..ed79486d57b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
@@ -939,7 +939,12 @@ public class CloudInternalCatalog extends InternalCatalog {
List<Long> tabletIds = info.getTabletIds();
for (int i = 0; i < tabletIds.size(); ++i) {
Tablet tablet =
materializedIndex.getTablet(tabletIds.get(i));
- Replica replica = tablet.getReplicas().get(0);
+ Replica replica;
+ if (info.getReplicaIds().isEmpty()) {
+ replica = tablet.getReplicas().get(0);
+ } else {
+ replica =
tablet.getReplicaById(info.getReplicaIds().get(i));
+ }
Preconditions.checkNotNull(replica, info);
String clusterId = info.getClusterId();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/persist/UpdateCloudReplicaInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/persist/UpdateCloudReplicaInfo.java
index 1ff2912a397..c5d6fc0cd64 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/persist/UpdateCloudReplicaInfo.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/persist/UpdateCloudReplicaInfo.java
@@ -17,11 +17,14 @@
package org.apache.doris.cloud.persist;
+
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.persist.gson.GsonUtils;
import com.google.gson.annotations.SerializedName;
+import lombok.Getter;
+import lombok.Setter;
import java.io.DataInput;
import java.io.DataOutput;
@@ -29,6 +32,8 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+@Getter
+@Setter
public class UpdateCloudReplicaInfo implements Writable {
@SerializedName(value = "dbId")
private long dbId;
@@ -54,6 +59,9 @@ public class UpdateCloudReplicaInfo implements Writable {
@SerializedName(value = "beIds")
private List<Long> beIds = new ArrayList<Long>();
+ @SerializedName(value = "rids")
+ private List<Long> replicaIds = new ArrayList<>();
+
public UpdateCloudReplicaInfo() {
}
@@ -97,46 +105,6 @@ public class UpdateCloudReplicaInfo implements Writable {
return GsonUtils.GSON.fromJson(json, UpdateCloudReplicaInfo.class);
}
- public long getDbId() {
- return dbId;
- }
-
- public long getTableId() {
- return tableId;
- }
-
- public long getPartitionId() {
- return partitionId;
- }
-
- public long getIndexId() {
- return indexId;
- }
-
- public long getTabletId() {
- return tabletId;
- }
-
- public long getReplicaId() {
- return replicaId;
- }
-
- public String getClusterId() {
- return clusterId;
- }
-
- public long getBeId() {
- return beId;
- }
-
- public List<Long> getBeIds() {
- return beIds;
- }
-
- public List<Long> getTabletIds() {
- return tabletIds;
- }
-
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("database id: ").append(dbId);
@@ -159,6 +127,11 @@ public class UpdateCloudReplicaInfo implements Writable {
for (long id : tabletIds) {
sb.append(" ").append(id);
}
+
+ sb.append(" replica id list: ");
+ for (long id : replicaIds) {
+ sb.append(" ").append(id);
+ }
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalBatch.java
b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalBatch.java
index 12d62b68717..e56bf34dfe5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalBatch.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalBatch.java
@@ -21,6 +21,8 @@ import org.apache.doris.common.io.DataOutputBuffer;
import org.apache.doris.common.io.Writable;
import org.apache.doris.persist.OperationType;
+import lombok.Getter;
+
import java.io.IOException;
import java.util.ArrayList;
@@ -29,6 +31,9 @@ public class JournalBatch {
private ArrayList<Entity> entities;
+ @Getter
+ private long size = 0;
+
public JournalBatch() {
entities = new ArrayList<>();
}
@@ -56,6 +61,7 @@ public class JournalBatch {
DataOutputBuffer buffer = new
DataOutputBuffer(OUTPUT_BUFFER_INIT_SIZE);
entity.write(buffer);
+ size += buffer.size();
entities.add(new Entity(op, buffer));
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
index eb26bbc04f0..c3a326f6d48 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
@@ -59,6 +59,7 @@ import org.apache.doris.ha.MasterInfo;
import org.apache.doris.insertoverwrite.InsertOverwriteLog;
import org.apache.doris.job.base.AbstractJob;
import org.apache.doris.journal.Journal;
+import org.apache.doris.journal.JournalBatch;
import org.apache.doris.journal.JournalCursor;
import org.apache.doris.journal.JournalEntity;
import org.apache.doris.journal.bdbje.BDBJEJournal;
@@ -1264,6 +1265,21 @@ public class EditLog {
journal.rollJournal();
}
+ private synchronized <T extends Writable> void logEdit(short op, List<T>
entries) throws IOException {
+ JournalBatch batch = new JournalBatch(35);
+ for (T entry : entries) {
+ // the number of batch entities to less than 32 and the batch data
size to less than 640KB
+ if (batch.getJournalEntities().size() >= 32 || batch.getSize() >=
640 * 1024) {
+ journal.write(batch);
+ batch = new JournalBatch(35);
+ }
+ batch.addJournal(op, entry);
+ }
+ if (!batch.getJournalEntities().isEmpty()) {
+ journal.write(batch);
+ }
+ }
+
/**
* Write an operation to the edit log. Do not sync to persistent store yet.
*/
@@ -1576,8 +1592,12 @@ public class EditLog {
logEdit(OperationType.OP_EXPORT_CREATE, job);
}
- public void logUpdateCloudReplica(UpdateCloudReplicaInfo info) {
- logEdit(OperationType.OP_UPDATE_CLOUD_REPLICA, info);
+ public void logUpdateCloudReplicas(List<UpdateCloudReplicaInfo> infos)
throws IOException {
+ long start = System.currentTimeMillis();
+ logEdit(OperationType.OP_UPDATE_CLOUD_REPLICA, infos);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("log update {} cloud replicas. cost: {} ms",
infos.size(), (System.currentTimeMillis() - start));
+ }
}
public void logExportUpdateState(long jobId, ExportJobState newState) {
diff --git
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
index 8397a638c36..a964996c236 100644
---
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
+++
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
@@ -17,6 +17,8 @@
package org.apache.doris.regression.suite
+import org.awaitility.Awaitility
+import static java.util.concurrent.TimeUnit.SECONDS
import groovy.json.JsonOutput
import com.google.common.collect.Maps
import com.google.common.util.concurrent.Futures
@@ -260,6 +262,15 @@ class Suite implements GroovyInterceptable {
return context.connect(user, password, url, actionSupplier)
}
+ public void dockerAwaitUntil(int atMostSeconds, int intervalSecond = 1,
Closure actionSupplier) {
+ def connInfo = context.threadLocalConn.get()
+ Awaitility.await().atMost(atMostSeconds,
SECONDS).pollInterval(intervalSecond, SECONDS).until(
+ {
+ connect(connInfo.username, connInfo.password,
connInfo.conn.getMetaData().getURL(), actionSupplier)
+ }
+ )
+ }
+
public void docker(ClusterOptions options = new ClusterOptions(), Closure
actionSupplier) throws Exception {
if (context.config.excludeDockerTest) {
return
diff --git
a/regression-test/suites/cloud_p0/multi_cluster/test_rebalance.groovy
b/regression-test/suites/cloud_p0/multi_cluster/test_rebalance.groovy
new file mode 100644
index 00000000000..e1735a4acd4
--- /dev/null
+++ b/regression-test/suites/cloud_p0/multi_cluster/test_rebalance.groovy
@@ -0,0 +1,165 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+import groovy.json.JsonSlurper
+import org.awaitility.Awaitility;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+suite('test_rebalance_in_cloud', 'multi_cluster') {
+ if (!isCloudMode()) {
+ return;
+ }
+ def options = new ClusterOptions()
+ options.feConfigs += [
+ 'cloud_cluster_check_interval_second=1',
+ 'enable_cloud_warm_up_for_rebalance=false',
+ 'cloud_tablet_rebalancer_interval_second=1',
+ 'cloud_balance_tablet_percent_per_run=0.5',
+ 'cloud_pre_heating_time_limit_sec=1',
+ 'sys_log_verbose_modules=org',
+ ]
+ options.setFeNum(3)
+ options.setBeNum(1)
+ options.cloudMode = true
+ options.connectToFollower = true
+ options.enableDebugPoints()
+
+ docker(options) {
+ sql """
+ CREATE TABLE table100 (
+ class INT,
+ id INT,
+ score INT SUM
+ )
+ AGGREGATE KEY(class, id)
+ DISTRIBUTED BY HASH(class) BUCKETS 48
+ """
+
+ sql """
+ CREATE TABLE table_p2 ( k1 int(11) NOT NULL, k2 varchar(20) NOT
NULL, k3 int sum NOT NULL )
+ AGGREGATE KEY(k1, k2)
+ PARTITION BY RANGE(k1) (
+ PARTITION p1992 VALUES [("-2147483648"), ("19930101")),
+ PARTITION p1993 VALUES [("19930101"), ("19940101")),
+ PARTITION p1994 VALUES [("19940101"), ("19950101")),
+ PARTITION p1995 VALUES [("19950101"), ("19960101")),
+ PARTITION p1996 VALUES [("19960101"), ("19970101")),
+ PARTITION p1997 VALUES [("19970101"), ("19980101")),
+ PARTITION p1998 VALUES [("19980101"), ("19990101")))
+ DISTRIBUTED BY HASH(k1) BUCKETS 3
+ """
+
GetDebugPoint().enableDebugPointForAllFEs("CloudReplica.getBackendIdImpl.clusterToBackends");
+ sql """set global forward_to_master=false"""
+
+ // add a be
+ cluster.addBackend(1, null)
+
+ dockerAwaitUntil(30) {
+ def bes = sql """show backends"""
+ log.info("bes: {}", bes)
+ bes.size() == 2
+ }
+
+ dockerAwaitUntil(5) {
+ def ret = sql """ADMIN SHOW REPLICA DISTRIBUTION FROM table100"""
+ log.info("replica distribution table100: {}", ret)
+ ret.size() == 2
+ }
+
+ def result = sql_return_maparray """ADMIN SHOW REPLICA DISTRIBUTION
FROM table100; """
+ assertEquals(2, result.size())
+ int replicaNum = 0
+
+ for (def row : result) {
+ log.info("replica distribution: ${row} ".toString())
+ replicaNum = Integer.valueOf((String) row.ReplicaNum)
+ if (replicaNum == 0) {
+ // due to debug point, observer not hash replica
+ } else {
+ assertTrue(replicaNum <= 25 && replicaNum >= 23)
+ }
+ }
+
+ dockerAwaitUntil(5) {
+ def ret = sql """ADMIN SHOW REPLICA DISTRIBUTION FROM table_p2
PARTITION(p1992)"""
+ log.info("replica distribution table_p2: {}", ret)
+ ret.size() == 2
+ }
+
+
+ result = sql_return_maparray """ ADMIN SHOW REPLICA DISTRIBUTION FROM
table_p2 PARTITION(p1992) """
+ assertEquals(2, result.size())
+ for (def row : result) {
+ replicaNum = Integer.valueOf((String) row.ReplicaNum)
+ log.info("replica distribution: ${row} ".toString())
+ if (replicaNum != 0) {
+ assertTrue(replicaNum <= 2 && replicaNum >= 1)
+ }
+ }
+
+ result = sql_return_maparray """ ADMIN SHOW REPLICA DISTRIBUTION FROM
table_p2 PARTITION(p1993) """
+ assertEquals(2, result.size())
+ for (def row : result) {
+ replicaNum = Integer.valueOf((String) row.ReplicaNum)
+ log.info("replica distribution: ${row} ".toString())
+ if (replicaNum != 0) {
+ assertTrue(replicaNum <= 2 && replicaNum >= 1)
+ }
+ }
+
+ result = sql_return_maparray """ ADMIN SHOW REPLICA DISTRIBUTION FROM
table_p2 PARTITION(p1994) """
+ assertEquals(2, result.size())
+ for (def row : result) {
+ replicaNum = Integer.valueOf((String) row.ReplicaNum)
+ log.info("replica distribution: ${row} ".toString())
+ if (replicaNum != 0) {
+ assertTrue(replicaNum <= 2 && replicaNum >= 1)
+ }
+ }
+
+ result = sql_return_maparray """ ADMIN SHOW REPLICA DISTRIBUTION FROM
table_p2 PARTITION(p1995) """
+ assertEquals(2, result.size())
+ for (def row : result) {
+ replicaNum = Integer.valueOf((String) row.ReplicaNum)
+ log.info("replica distribution: ${row} ".toString())
+ if (replicaNum != 0) {
+ assertTrue(replicaNum <= 2 && replicaNum >= 1)
+ }
+ }
+
+ result = sql_return_maparray """ ADMIN SHOW REPLICA DISTRIBUTION FROM
table_p2 PARTITION(p1996) """
+ assertEquals(2, result.size())
+ for (def row : result) {
+ replicaNum = Integer.valueOf((String) row.ReplicaNum)
+ log.info("replica distribution: ${row} ".toString())
+ if (replicaNum != 0) {
+ assertTrue(replicaNum <= 2 && replicaNum >= 1)
+ }
+ }
+
+ result = sql_return_maparray """ ADMIN SHOW REPLICA DISTRIBUTION FROM
table_p2 PARTITION(p1997) """
+ assertEquals(2, result.size())
+ for (def row : result) {
+ replicaNum = Integer.valueOf((String) row.ReplicaNum)
+ log.info("replica distribution: ${row} ".toString())
+ if (replicaNum != 0) {
+ assertTrue(replicaNum <= 2 && replicaNum >= 1)
+ }
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]