This is an automated email from the ASF dual-hosted git repository.

shunzhang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/bifromq.git


The following commit(s) were added to refs/heads/main by this push:
     new 0def69f1 Add logic to remove id-conflict KVRanges by considering the 
lexicographical order of store-id as a tie-breaker (#162)
0def69f1 is described below

commit 0def69f1a9808305dfdac3c2a0102e3a35e591eb
Author: Yonny(Yu) Hao <[email protected]>
AuthorDate: Wed Aug 6 16:18:13 2025 +0800

    Add logic to remove id-conflict KVRanges by considering the lexicographical 
order of store-id as a tie-breaker (#162)
---
 .../impl/RedundantRangeRemovalBalancer.java        |  64 +++++++++++--
 .../impl/RedundantRangeRemovalBalancerTest.java    | 105 ++++++++++++++++++++-
 2 files changed, 159 insertions(+), 10 deletions(-)

diff --git 
a/base-kv/base-kv-store-balance-controller/src/main/java/org/apache/bifromq/basekv/balance/impl/RedundantRangeRemovalBalancer.java
 
b/base-kv/base-kv-store-balance-controller/src/main/java/org/apache/bifromq/basekv/balance/impl/RedundantRangeRemovalBalancer.java
index 216a67dc..bedf16fb 100644
--- 
a/base-kv/base-kv-store-balance-controller/src/main/java/org/apache/bifromq/basekv/balance/impl/RedundantRangeRemovalBalancer.java
+++ 
b/base-kv/base-kv-store-balance-controller/src/main/java/org/apache/bifromq/basekv/balance/impl/RedundantRangeRemovalBalancer.java
@@ -23,19 +23,25 @@ import static 
org.apache.bifromq.basekv.balance.util.CommandUtil.quit;
 import static org.apache.bifromq.basekv.utils.DescriptorUtil.getEffectiveRoute;
 import static org.apache.bifromq.basekv.utils.DescriptorUtil.organizeByEpoch;
 
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
 import org.apache.bifromq.basekv.balance.BalanceResult;
 import org.apache.bifromq.basekv.balance.NoNeedBalance;
 import org.apache.bifromq.basekv.balance.StoreBalancer;
 import org.apache.bifromq.basekv.proto.Boundary;
 import org.apache.bifromq.basekv.proto.KVRangeDescriptor;
+import org.apache.bifromq.basekv.proto.KVRangeId;
 import org.apache.bifromq.basekv.proto.KVRangeStoreDescriptor;
 import org.apache.bifromq.basekv.raft.proto.RaftNodeStatus;
 import org.apache.bifromq.basekv.utils.EffectiveEpoch;
+import org.apache.bifromq.basekv.utils.KVRangeIdUtil;
 import org.apache.bifromq.basekv.utils.LeaderRange;
-import java.util.Collections;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.Set;
 
 /**
  * The RedundantEpochRemovalBalancer is a specialized StoreBalancer designed 
to manage and remove redundant replicas
@@ -72,7 +78,7 @@ public class RedundantRangeRemovalBalancer extends 
StoreBalancer {
             return NoNeedBalance.INSTANCE;
         }
         if (latest.size() > 1) {
-            // deal with higher epoch redundant replicas generated during 
bootstrap at startup time
+            // deal with epoch-conflict ranges
             Set<KVRangeStoreDescriptor> storeDescriptors = 
latest.lastEntry().getValue();
             for (KVRangeStoreDescriptor storeDescriptor : storeDescriptors) {
                 if (!storeDescriptor.getId().equals(localStoreId)) {
@@ -82,12 +88,33 @@ public class RedundantRangeRemovalBalancer extends 
StoreBalancer {
                     if (rangeDescriptor.getRole() != RaftNodeStatus.Leader) {
                         continue;
                     }
+                    log.debug("Remove Epoch-Conflict range: {} in store {}",
+                        KVRangeIdUtil.toString(rangeDescriptor.getId()),
+                        storeDescriptor.getId());
                     return quit(localStoreId, rangeDescriptor);
                 }
             }
+            return NoNeedBalance.INSTANCE;
         }
-        // deal with redundant replicas generated within the effective epoch 
but not be included in the effective route
         Map.Entry<Long, Set<KVRangeStoreDescriptor>> oldestEntry = 
latest.firstEntry();
+        Map<KVRangeId, SortedSet<LeaderRange>> conflictingRanges = 
findConflictingRanges(oldestEntry.getValue());
+        if (!conflictingRanges.isEmpty()) {
+            // deal with id-conflict ranges
+            for (KVRangeId rangeId : conflictingRanges.keySet()) {
+                SortedSet<LeaderRange> leaderRanges = 
conflictingRanges.get(rangeId);
+                for (LeaderRange leaderRange : leaderRanges) {
+                    if 
(!leaderRange.ownerStoreDescriptor().getId().equals(localStoreId)) {
+                        return NoNeedBalance.INSTANCE;
+                    }
+                    log.debug("Remove Id-Conflict range: {} in store {}",
+                        
KVRangeIdUtil.toString(leaderRange.descriptor().getId()),
+                        leaderRange.ownerStoreDescriptor().getId());
+                    return quit(localStoreId, leaderRange.descriptor());
+                }
+            }
+            return NoNeedBalance.INSTANCE;
+        }
+        // deal with boundary-conflict ranges
         EffectiveEpoch effectiveEpoch = new 
EffectiveEpoch(oldestEntry.getKey(), oldestEntry.getValue());
         NavigableMap<Boundary, LeaderRange> effectiveLeaders = 
getEffectiveRoute(effectiveEpoch).leaderRanges();
         for (KVRangeStoreDescriptor storeDescriptor : 
effectiveEpoch.storeDescriptors()) {
@@ -101,10 +128,35 @@ public class RedundantRangeRemovalBalancer extends 
StoreBalancer {
                 Boundary boundary = rangeDescriptor.getBoundary();
                 LeaderRange leaderRange = effectiveLeaders.get(boundary);
                 if (leaderRange == null || 
!leaderRange.descriptor().getId().equals(rangeDescriptor.getId())) {
+                    log.debug("Remove Boundary-Conflict range: {} in store {}",
+                        KVRangeIdUtil.toString(rangeDescriptor.getId()),
+                        storeDescriptor.getId());
                     return quit(localStoreId, rangeDescriptor);
                 }
             }
         }
         return NoNeedBalance.INSTANCE;
     }
+
+    private Map<KVRangeId, SortedSet<LeaderRange>> 
findConflictingRanges(Set<KVRangeStoreDescriptor> effectiveEpoch) {
+        Map<KVRangeId, SortedSet<LeaderRange>> leaderRangesByRangeId = new 
HashMap<>();
+        Map<KVRangeId, SortedSet<LeaderRange>> conflictingRanges = new 
HashMap<>();
+        for (KVRangeStoreDescriptor storeDescriptor : effectiveEpoch) {
+            for (KVRangeDescriptor rangeDescriptor : 
storeDescriptor.getRangesList()) {
+                if (rangeDescriptor.getRole() != RaftNodeStatus.Leader) {
+                    continue;
+                }
+                KVRangeId rangeId = rangeDescriptor.getId();
+                SortedSet<LeaderRange> leaderRanges = 
leaderRangesByRangeId.computeIfAbsent(rangeId, k -> new TreeSet<>(
+                    Comparator.comparing((LeaderRange lr) -> 
lr.ownerStoreDescriptor().getId(), String::compareTo)
+                        .reversed()));
+                leaderRanges.add(new LeaderRange(rangeDescriptor, 
storeDescriptor));
+                if (leaderRanges.size() > 1) {
+                    // More than one leader for the same range, add to 
conflicting ranges
+                    conflictingRanges.put(rangeId, leaderRanges);
+                }
+            }
+        }
+        return conflictingRanges;
+    }
 }
diff --git 
a/base-kv/base-kv-store-balance-controller/src/test/java/org/apache/bifromq/basekv/balance/impl/RedundantRangeRemovalBalancerTest.java
 
b/base-kv/base-kv-store-balance-controller/src/test/java/org/apache/bifromq/basekv/balance/impl/RedundantRangeRemovalBalancerTest.java
index 83f2a4a4..ecfeca3b 100644
--- 
a/base-kv/base-kv-store-balance-controller/src/test/java/org/apache/bifromq/basekv/balance/impl/RedundantRangeRemovalBalancerTest.java
+++ 
b/base-kv/base-kv-store-balance-controller/src/test/java/org/apache/bifromq/basekv/balance/impl/RedundantRangeRemovalBalancerTest.java
@@ -22,6 +22,10 @@ package org.apache.bifromq.basekv.balance.impl;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertSame;
 
+import com.google.protobuf.ByteString;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
 import org.apache.bifromq.basekv.balance.BalanceNow;
 import org.apache.bifromq.basekv.balance.BalanceResult;
 import org.apache.bifromq.basekv.balance.BalanceResultType;
@@ -32,10 +36,6 @@ import org.apache.bifromq.basekv.proto.KVRangeId;
 import org.apache.bifromq.basekv.proto.KVRangeStoreDescriptor;
 import org.apache.bifromq.basekv.raft.proto.ClusterConfig;
 import org.apache.bifromq.basekv.raft.proto.RaftNodeStatus;
-import com.google.protobuf.ByteString;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
@@ -245,4 +245,101 @@ public class RedundantRangeRemovalBalancerTest {
         BalanceResult result = balancer.balance();
         assertSame(result.type(), BalanceResultType.NoNeedBalance);
     }
+
+    @Test
+    public void removeIdConflictingRangeWhenLocalStoreIsLoser() {
+        String peerStoreId = "aStore";
+        KVRangeId kvRangeId = 
KVRangeId.newBuilder().setEpoch(1).setId(1).build();
+        Boundary boundary = Boundary.newBuilder()
+            .setStartKey(ByteString.copyFromUtf8("a"))
+            .setEndKey(ByteString.copyFromUtf8("z"))
+            .build();
+
+        // Local store is Leader of the range
+        KVRangeDescriptor localRange = KVRangeDescriptor.newBuilder()
+            .setId(kvRangeId)
+            .setRole(RaftNodeStatus.Leader)
+            .setVer(1)
+            .setBoundary(boundary)
+            .setConfig(ClusterConfig.newBuilder()
+                .addVoters(localStoreId)
+                .build())
+            .build();
+
+        KVRangeDescriptor peerRange = KVRangeDescriptor.newBuilder()
+            .setId(kvRangeId)
+            .setRole(RaftNodeStatus.Leader)
+            .setVer(1)
+            .setBoundary(boundary)
+            .setConfig(ClusterConfig.newBuilder()
+                .addVoters(peerStoreId)
+                .build())
+            .build();
+
+        KVRangeStoreDescriptor localStoreDesc = 
KVRangeStoreDescriptor.newBuilder()
+            .setId(localStoreId)
+            .addRanges(localRange)
+            .build();
+
+        KVRangeStoreDescriptor peerStoreDesc = 
KVRangeStoreDescriptor.newBuilder()
+            .setId(peerStoreId)
+            .addRanges(peerRange)
+            .build();
+
+        balancer.update(Set.of(localStoreDesc, peerStoreDesc));
+
+        BalanceResult result = balancer.balance();
+        assertEquals(result.type(), BalanceResultType.BalanceNow);
+
+        ChangeConfigCommand cmd = (ChangeConfigCommand) ((BalanceNow<?>) 
result).command;
+        assertEquals(cmd.getToStore(), localStoreId);
+        assertEquals(cmd.getKvRangeId(), kvRangeId);
+        assertEquals(cmd.getVoters(), Collections.emptySet());
+        assertEquals(cmd.getLearners(), Collections.emptySet());
+    }
+
+    @Test
+    public void ignoreIdConflictingRangeWhenLocalStoreIsWinner() {
+        String peerStoreId = "zStore";              // larger than "localStore"
+        KVRangeId kvRangeId = 
KVRangeId.newBuilder().setEpoch(1).setId(1).build();
+        Boundary boundary = Boundary.newBuilder()
+            .setStartKey(ByteString.copyFromUtf8("a"))
+            .setEndKey(ByteString.copyFromUtf8("z"))
+            .build();
+
+        KVRangeDescriptor localRange = KVRangeDescriptor.newBuilder()
+            .setId(kvRangeId)
+            .setRole(RaftNodeStatus.Leader)
+            .setVer(1)
+            .setBoundary(boundary)
+            .setConfig(ClusterConfig.newBuilder()
+                .addVoters(localStoreId)
+                .build())
+            .build();
+
+        KVRangeDescriptor peerRange = KVRangeDescriptor.newBuilder()
+            .setId(kvRangeId)
+            .setRole(RaftNodeStatus.Leader)
+            .setVer(1)
+            .setBoundary(boundary)
+            .setConfig(ClusterConfig.newBuilder()
+                .addVoters(peerStoreId)
+                .build())
+            .build();
+
+        KVRangeStoreDescriptor localStoreDesc = 
KVRangeStoreDescriptor.newBuilder()
+            .setId(localStoreId)
+            .addRanges(localRange)
+            .build();
+
+        KVRangeStoreDescriptor peerStoreDesc = 
KVRangeStoreDescriptor.newBuilder()
+            .setId(peerStoreId)
+            .addRanges(peerRange)
+            .build();
+
+        balancer.update(Set.of(localStoreDesc, peerStoreDesc));
+
+        BalanceResult result = balancer.balance();
+        assertSame(result.type(), BalanceResultType.NoNeedBalance);
+    }
 }
\ No newline at end of file

Reply via email to