This is an automated email from the ASF dual-hosted git repository.
shunzhang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/bifromq.git
The following commit(s) were added to refs/heads/main by this push:
new 0def69f1 Add logic to remove id-conflict KVRanges by considering the
lexicographical order of store-id as a tie-breaker (#162)
0def69f1 is described below
commit 0def69f1a9808305dfdac3c2a0102e3a35e591eb
Author: Yonny(Yu) Hao <[email protected]>
AuthorDate: Wed Aug 6 16:18:13 2025 +0800
Add logic to remove id-conflict KVRanges by considering the lexicographical
order of store-id as a tie-breaker (#162)
---
.../impl/RedundantRangeRemovalBalancer.java | 64 +++++++++++--
.../impl/RedundantRangeRemovalBalancerTest.java | 105 ++++++++++++++++++++-
2 files changed, 159 insertions(+), 10 deletions(-)
diff --git
a/base-kv/base-kv-store-balance-controller/src/main/java/org/apache/bifromq/basekv/balance/impl/RedundantRangeRemovalBalancer.java
b/base-kv/base-kv-store-balance-controller/src/main/java/org/apache/bifromq/basekv/balance/impl/RedundantRangeRemovalBalancer.java
index 216a67dc..bedf16fb 100644
---
a/base-kv/base-kv-store-balance-controller/src/main/java/org/apache/bifromq/basekv/balance/impl/RedundantRangeRemovalBalancer.java
+++
b/base-kv/base-kv-store-balance-controller/src/main/java/org/apache/bifromq/basekv/balance/impl/RedundantRangeRemovalBalancer.java
@@ -23,19 +23,25 @@ import static
org.apache.bifromq.basekv.balance.util.CommandUtil.quit;
import static org.apache.bifromq.basekv.utils.DescriptorUtil.getEffectiveRoute;
import static org.apache.bifromq.basekv.utils.DescriptorUtil.organizeByEpoch;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
import org.apache.bifromq.basekv.balance.BalanceResult;
import org.apache.bifromq.basekv.balance.NoNeedBalance;
import org.apache.bifromq.basekv.balance.StoreBalancer;
import org.apache.bifromq.basekv.proto.Boundary;
import org.apache.bifromq.basekv.proto.KVRangeDescriptor;
+import org.apache.bifromq.basekv.proto.KVRangeId;
import org.apache.bifromq.basekv.proto.KVRangeStoreDescriptor;
import org.apache.bifromq.basekv.raft.proto.RaftNodeStatus;
import org.apache.bifromq.basekv.utils.EffectiveEpoch;
+import org.apache.bifromq.basekv.utils.KVRangeIdUtil;
import org.apache.bifromq.basekv.utils.LeaderRange;
-import java.util.Collections;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.Set;
/**
* The RedundantEpochRemovalBalancer is a specialized StoreBalancer designed
to manage and remove redundant replicas
@@ -72,7 +78,7 @@ public class RedundantRangeRemovalBalancer extends
StoreBalancer {
return NoNeedBalance.INSTANCE;
}
if (latest.size() > 1) {
- // deal with higher epoch redundant replicas generated during
bootstrap at startup time
+ // deal with epoch-conflict ranges
Set<KVRangeStoreDescriptor> storeDescriptors =
latest.lastEntry().getValue();
for (KVRangeStoreDescriptor storeDescriptor : storeDescriptors) {
if (!storeDescriptor.getId().equals(localStoreId)) {
@@ -82,12 +88,33 @@ public class RedundantRangeRemovalBalancer extends
StoreBalancer {
if (rangeDescriptor.getRole() != RaftNodeStatus.Leader) {
continue;
}
+ log.debug("Remove Epoch-Conflict range: {} in store {}",
+ KVRangeIdUtil.toString(rangeDescriptor.getId()),
+ storeDescriptor.getId());
return quit(localStoreId, rangeDescriptor);
}
}
+ return NoNeedBalance.INSTANCE;
}
- // deal with redundant replicas generated within the effective epoch
but not be included in the effective route
Map.Entry<Long, Set<KVRangeStoreDescriptor>> oldestEntry =
latest.firstEntry();
+ Map<KVRangeId, SortedSet<LeaderRange>> conflictingRanges =
findConflictingRanges(oldestEntry.getValue());
+ if (!conflictingRanges.isEmpty()) {
+ // deal with id-conflict ranges
+ for (KVRangeId rangeId : conflictingRanges.keySet()) {
+ SortedSet<LeaderRange> leaderRanges =
conflictingRanges.get(rangeId);
+ for (LeaderRange leaderRange : leaderRanges) {
+ if
(!leaderRange.ownerStoreDescriptor().getId().equals(localStoreId)) {
+ return NoNeedBalance.INSTANCE;
+ }
+ log.debug("Remove Id-Conflict range: {} in store {}",
+
KVRangeIdUtil.toString(leaderRange.descriptor().getId()),
+ leaderRange.ownerStoreDescriptor().getId());
+ return quit(localStoreId, leaderRange.descriptor());
+ }
+ }
+ return NoNeedBalance.INSTANCE;
+ }
+ // deal with boundary-conflict ranges
EffectiveEpoch effectiveEpoch = new
EffectiveEpoch(oldestEntry.getKey(), oldestEntry.getValue());
NavigableMap<Boundary, LeaderRange> effectiveLeaders =
getEffectiveRoute(effectiveEpoch).leaderRanges();
for (KVRangeStoreDescriptor storeDescriptor :
effectiveEpoch.storeDescriptors()) {
@@ -101,10 +128,35 @@ public class RedundantRangeRemovalBalancer extends
StoreBalancer {
Boundary boundary = rangeDescriptor.getBoundary();
LeaderRange leaderRange = effectiveLeaders.get(boundary);
if (leaderRange == null ||
!leaderRange.descriptor().getId().equals(rangeDescriptor.getId())) {
+ log.debug("Remove Boundary-Conflict range: {} in store {}",
+ KVRangeIdUtil.toString(rangeDescriptor.getId()),
+ storeDescriptor.getId());
return quit(localStoreId, rangeDescriptor);
}
}
}
return NoNeedBalance.INSTANCE;
}
+
+ private Map<KVRangeId, SortedSet<LeaderRange>>
findConflictingRanges(Set<KVRangeStoreDescriptor> effectiveEpoch) {
+ Map<KVRangeId, SortedSet<LeaderRange>> leaderRangesByRangeId = new
HashMap<>();
+ Map<KVRangeId, SortedSet<LeaderRange>> conflictingRanges = new
HashMap<>();
+ for (KVRangeStoreDescriptor storeDescriptor : effectiveEpoch) {
+ for (KVRangeDescriptor rangeDescriptor :
storeDescriptor.getRangesList()) {
+ if (rangeDescriptor.getRole() != RaftNodeStatus.Leader) {
+ continue;
+ }
+ KVRangeId rangeId = rangeDescriptor.getId();
+ SortedSet<LeaderRange> leaderRanges =
leaderRangesByRangeId.computeIfAbsent(rangeId, k -> new TreeSet<>(
+ Comparator.comparing((LeaderRange lr) ->
lr.ownerStoreDescriptor().getId(), String::compareTo)
+ .reversed()));
+ leaderRanges.add(new LeaderRange(rangeDescriptor,
storeDescriptor));
+ if (leaderRanges.size() > 1) {
+ // More than one leader for the same range, add to
conflicting ranges
+ conflictingRanges.put(rangeId, leaderRanges);
+ }
+ }
+ }
+ return conflictingRanges;
+ }
}
diff --git
a/base-kv/base-kv-store-balance-controller/src/test/java/org/apache/bifromq/basekv/balance/impl/RedundantRangeRemovalBalancerTest.java
b/base-kv/base-kv-store-balance-controller/src/test/java/org/apache/bifromq/basekv/balance/impl/RedundantRangeRemovalBalancerTest.java
index 83f2a4a4..ecfeca3b 100644
---
a/base-kv/base-kv-store-balance-controller/src/test/java/org/apache/bifromq/basekv/balance/impl/RedundantRangeRemovalBalancerTest.java
+++
b/base-kv/base-kv-store-balance-controller/src/test/java/org/apache/bifromq/basekv/balance/impl/RedundantRangeRemovalBalancerTest.java
@@ -22,6 +22,10 @@ package org.apache.bifromq.basekv.balance.impl;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertSame;
+import com.google.protobuf.ByteString;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
import org.apache.bifromq.basekv.balance.BalanceNow;
import org.apache.bifromq.basekv.balance.BalanceResult;
import org.apache.bifromq.basekv.balance.BalanceResultType;
@@ -32,10 +36,6 @@ import org.apache.bifromq.basekv.proto.KVRangeId;
import org.apache.bifromq.basekv.proto.KVRangeStoreDescriptor;
import org.apache.bifromq.basekv.raft.proto.ClusterConfig;
import org.apache.bifromq.basekv.raft.proto.RaftNodeStatus;
-import com.google.protobuf.ByteString;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -245,4 +245,101 @@ public class RedundantRangeRemovalBalancerTest {
BalanceResult result = balancer.balance();
assertSame(result.type(), BalanceResultType.NoNeedBalance);
}
+
+ @Test
+ public void removeIdConflictingRangeWhenLocalStoreIsLoser() {
+ String peerStoreId = "aStore";
+ KVRangeId kvRangeId =
KVRangeId.newBuilder().setEpoch(1).setId(1).build();
+ Boundary boundary = Boundary.newBuilder()
+ .setStartKey(ByteString.copyFromUtf8("a"))
+ .setEndKey(ByteString.copyFromUtf8("z"))
+ .build();
+
+ // Local store is Leader of the range
+ KVRangeDescriptor localRange = KVRangeDescriptor.newBuilder()
+ .setId(kvRangeId)
+ .setRole(RaftNodeStatus.Leader)
+ .setVer(1)
+ .setBoundary(boundary)
+ .setConfig(ClusterConfig.newBuilder()
+ .addVoters(localStoreId)
+ .build())
+ .build();
+
+ KVRangeDescriptor peerRange = KVRangeDescriptor.newBuilder()
+ .setId(kvRangeId)
+ .setRole(RaftNodeStatus.Leader)
+ .setVer(1)
+ .setBoundary(boundary)
+ .setConfig(ClusterConfig.newBuilder()
+ .addVoters(peerStoreId)
+ .build())
+ .build();
+
+ KVRangeStoreDescriptor localStoreDesc =
KVRangeStoreDescriptor.newBuilder()
+ .setId(localStoreId)
+ .addRanges(localRange)
+ .build();
+
+ KVRangeStoreDescriptor peerStoreDesc =
KVRangeStoreDescriptor.newBuilder()
+ .setId(peerStoreId)
+ .addRanges(peerRange)
+ .build();
+
+ balancer.update(Set.of(localStoreDesc, peerStoreDesc));
+
+ BalanceResult result = balancer.balance();
+ assertEquals(result.type(), BalanceResultType.BalanceNow);
+
+ ChangeConfigCommand cmd = (ChangeConfigCommand) ((BalanceNow<?>)
result).command;
+ assertEquals(cmd.getToStore(), localStoreId);
+ assertEquals(cmd.getKvRangeId(), kvRangeId);
+ assertEquals(cmd.getVoters(), Collections.emptySet());
+ assertEquals(cmd.getLearners(), Collections.emptySet());
+ }
+
+ @Test
+ public void ignoreIdConflictingRangeWhenLocalStoreIsWinner() {
+ String peerStoreId = "zStore"; // larger than "localStore"
+ KVRangeId kvRangeId =
KVRangeId.newBuilder().setEpoch(1).setId(1).build();
+ Boundary boundary = Boundary.newBuilder()
+ .setStartKey(ByteString.copyFromUtf8("a"))
+ .setEndKey(ByteString.copyFromUtf8("z"))
+ .build();
+
+ KVRangeDescriptor localRange = KVRangeDescriptor.newBuilder()
+ .setId(kvRangeId)
+ .setRole(RaftNodeStatus.Leader)
+ .setVer(1)
+ .setBoundary(boundary)
+ .setConfig(ClusterConfig.newBuilder()
+ .addVoters(localStoreId)
+ .build())
+ .build();
+
+ KVRangeDescriptor peerRange = KVRangeDescriptor.newBuilder()
+ .setId(kvRangeId)
+ .setRole(RaftNodeStatus.Leader)
+ .setVer(1)
+ .setBoundary(boundary)
+ .setConfig(ClusterConfig.newBuilder()
+ .addVoters(peerStoreId)
+ .build())
+ .build();
+
+ KVRangeStoreDescriptor localStoreDesc =
KVRangeStoreDescriptor.newBuilder()
+ .setId(localStoreId)
+ .addRanges(localRange)
+ .build();
+
+ KVRangeStoreDescriptor peerStoreDesc =
KVRangeStoreDescriptor.newBuilder()
+ .setId(peerStoreId)
+ .addRanges(peerRange)
+ .build();
+
+ balancer.update(Set.of(localStoreDesc, peerStoreDesc));
+
+ BalanceResult result = balancer.balance();
+ assertSame(result.type(), BalanceResultType.NoNeedBalance);
+ }
}
\ No newline at end of file