This is an automated email from the ASF dual-hosted git repository.

heesung pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 55e468ee412 [fix][broker] Do not try to clean owned bundles from 
inactive source brokers (ExtensibleLoadManagerImpl only) (#23064)
55e468ee412 is described below

commit 55e468ee412fff9706f6badcd07b5c8f11ed8375
Author: Heesung Sohn <103456639+heesung...@users.noreply.github.com>
AuthorDate: Wed Jul 24 11:19:40 2024 -0700

    [fix][broker] Do not try to clean owned bundles from inactive source 
brokers (ExtensibleLoadManagerImpl only) (#23064)
---
 .../extensions/ExtensibleLoadManagerImpl.java      |  27 +++-
 .../channel/ServiceUnitStateChannelImpl.java       | 178 ++++++++-------------
 .../pulsar/broker/service/BrokerService.java       |   9 ++
 .../channel/ServiceUnitStateChannelTest.java       |  68 ++++++--
 4 files changed, 157 insertions(+), 125 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
index 4a7ba90aad9..a737a94b998 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
@@ -124,6 +124,9 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager, BrokerS
 
     private static final String ELECTION_ROOT = 
"/loadbalance/extension/leader";
 
+    private static final Set<String> INTERNAL_TOPICS =
+            Set.of(BROKER_LOAD_DATA_STORE_TOPIC, 
TOP_BUNDLES_LOAD_DATA_STORE_TOPIC, TOPIC);
+
     private PulsarService pulsar;
 
     private ServiceConfiguration conf;
@@ -828,7 +831,8 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager, BrokerS
     }
 
     public static boolean isInternalTopic(String topic) {
-        return topic.startsWith(TOPIC)
+        return INTERNAL_TOPICS.contains(topic)
+                || topic.startsWith(TOPIC)
                 || topic.startsWith(BROKER_LOAD_DATA_STORE_TOPIC)
                 || topic.startsWith(TOP_BUNDLES_LOAD_DATA_STORE_TOPIC);
     }
@@ -993,5 +997,26 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager, BrokerS
         serviceUnitStateChannel.cleanOwnerships();
         leaderElectionService.close();
         brokerRegistry.unregister();
+        // Close the internal topics (if owned any) after giving up the 
possible leader role,
+        // so that the subsequent lookups could hit the next leader.
+        closeInternalTopics();
+    }
+
+    private void closeInternalTopics() {
+        List<CompletableFuture<Void>> futures = new ArrayList<>();
+        for (String name : INTERNAL_TOPICS) {
+            futures.add(pulsar.getBrokerService().getTopicIfExists(name)
+                    .thenAccept(topicOptional -> topicOptional.ifPresent(topic 
-> topic.close(true)))
+                    .exceptionally(__ -> {
+                        log.warn("Failed to close internal topic:{}", name);
+                        return null;
+                    }));
+        }
+        try {
+            FutureUtil.waitForAll(futures)
+                    
.get(pulsar.getConfiguration().getNamespaceBundleUnloadingTimeoutMs(), 
TimeUnit.MILLISECONDS);
+        } catch (Throwable e) {
+            log.warn("Failed to wait for closing internal topics", e);
+        }
     }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index 1688a892e23..fc4968805f5 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -638,20 +638,13 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
     }
 
     private CompletableFuture<Void> publishOverrideEventAsync(String 
serviceUnit,
-                                           ServiceUnitStateData orphanData,
                                            ServiceUnitStateData override) {
         if (!validateChannelState(Started, true)) {
             throw new IllegalStateException("Invalid channel state:" + 
channelState.name());
         }
         EventType eventType = EventType.Override;
         eventCounters.get(eventType).getTotal().incrementAndGet();
-        return pubAsync(serviceUnit, override).whenComplete((__, e) -> {
-            if (e != null) {
-                eventCounters.get(eventType).getFailure().incrementAndGet();
-                log.error("Failed to override serviceUnit:{} from 
orphanData:{} to overrideData:{}",
-                        serviceUnit, orphanData, override, e);
-            }
-        }).thenApply(__ -> null);
+        return pubAsync(serviceUnit, override).thenApply(__ -> null);
     }
 
     public CompletableFuture<Void> publishUnloadEventAsync(Unload unload) {
@@ -1307,24 +1300,49 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
 
     private void overrideOwnership(String serviceUnit, ServiceUnitStateData 
orphanData, String inactiveBroker) {
         final var version = getNextVersionId(orphanData);
-        final var override = selectBroker(serviceUnit, 
inactiveBroker).map(selectedBroker -> {
-            if (orphanData.state() == Splitting) {
-                return new ServiceUnitStateData(Splitting, 
orphanData.dstBroker(), selectedBroker,
-                        Map.copyOf(orphanData.splitServiceUnitToDestBroker()), 
true, version);
-            } else {
-                return new ServiceUnitStateData(Owned, selectedBroker, 
inactiveBroker, true, version);
-            }
-        }).orElseGet(() -> new ServiceUnitStateData(Free, null, 
inactiveBroker, true, version));
-        log.info("Overriding ownership serviceUnit:{} from orphanData:{} to 
overrideData:{}",
-                serviceUnit, orphanData, override);
-        publishOverrideEventAsync(serviceUnit, orphanData, override)
-                .exceptionally(e -> {
-                    log.error(
-                            "Failed to override the ownership serviceUnit:{} 
orphanData:{}. "
-                                    + "Failed to publish override event. 
totalCleanupErrorCnt:{}",
-                            serviceUnit, orphanData, 
totalCleanupErrorCnt.incrementAndGet());
-                    return null;
-                });
+        try {
+            selectBroker(serviceUnit, inactiveBroker)
+                    .thenApply(selectedOpt ->
+                            selectedOpt.map(selectedBroker -> {
+                                if (orphanData.state() == Splitting) {
+                                    // if Splitting, set orphan.dstBroker() as 
dst to indicate where it was from.
+                                    // (The src broker runs handleSplitEvent.)
+                                    return new ServiceUnitStateData(Splitting, 
orphanData.dstBroker(), selectedBroker,
+                                            
Map.copyOf(orphanData.splitServiceUnitToDestBroker()), true, version);
+                                } else if (orphanData.state() == Owned) {
+                                    // if Owned, set orphan.dstBroker() as 
source to clean it up in case it is still
+                                    // alive.
+                                    return new ServiceUnitStateData(Owned, 
selectedBroker,
+                                            
selectedBroker.equals(orphanData.dstBroker()) ? null :
+                                                    orphanData.dstBroker(),
+                                            true, version);
+                                } else {
+                                    // if Assigning or Releasing, set 
orphan.sourceBroker() as source
+                                    // to clean it up in case it is still 
alive.
+                                    return new ServiceUnitStateData(Owned, 
selectedBroker,
+                                            
selectedBroker.equals(orphanData.sourceBroker()) ? null :
+                                                    orphanData.sourceBroker(),
+                                            true, version);
+                                }
+                                // If no broker is selected(available), free 
the ownership.
+                                // If the previous owner is still active, it 
will close the bundle(topic) ownership.
+                            }).orElseGet(() -> new ServiceUnitStateData(Free, 
null,
+                                    orphanData.state() == Owned ? 
orphanData.dstBroker() : orphanData.sourceBroker(),
+                                    true,
+                                    version)))
+                    .thenCompose(override -> {
+                        log.info(
+                                "Overriding inactiveBroker:{}, ownership 
serviceUnit:{} from orphanData:{} to "
+                                        + "overrideData:{}",
+                                inactiveBroker, serviceUnit, orphanData, 
override);
+                        return publishOverrideEventAsync(serviceUnit, 
override);
+                    }).get(config.getMetadataStoreOperationTimeoutSeconds(), 
SECONDS);
+        } catch (Throwable e) {
+            log.error(
+                    "Failed to override inactiveBroker:{} ownership 
serviceUnit:{} orphanData:{}. "
+                            + "totalCleanupErrorCnt:{}",
+                    inactiveBroker, serviceUnit, orphanData, 
totalCleanupErrorCnt.incrementAndGet(), e);
+        }
     }
 
     private void waitForCleanups(String broker, boolean excludeSystemTopics, 
int maxWaitTimeInMillis) {
@@ -1440,60 +1458,13 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
 
     }
 
-    private Optional<String> selectBroker(String serviceUnit, String 
inactiveBroker) {
-        try {
-            return loadManager.selectAsync(
-                    LoadManagerShared.getNamespaceBundle(pulsar, serviceUnit),
-                            Set.of(inactiveBroker), 
LookupOptions.builder().build())
-                    .get(inFlightStateWaitingTimeInMillis, MILLISECONDS);
-        } catch (Throwable e) {
-            log.error("Failed to select a broker for serviceUnit:{}", 
serviceUnit);
-        }
-        return Optional.empty();
+    private CompletableFuture<Optional<String>> selectBroker(String 
serviceUnit, String inactiveBroker) {
+        return getLoadManager().selectAsync(
+                LoadManagerShared.getNamespaceBundle(pulsar, serviceUnit),
+                inactiveBroker == null ? Set.of() : Set.of(inactiveBroker),
+                LookupOptions.builder().build());
     }
 
-    private Optional<ServiceUnitStateData> getRollForwardStateData(String 
serviceUnit,
-                                                                   String 
inactiveBroker,
-                                                                   long 
nextVersionId) {
-        Optional<String> selectedBroker = selectBroker(serviceUnit, 
inactiveBroker);
-        if (selectedBroker.isEmpty()) {
-            return Optional.empty();
-        }
-        return Optional.of(new ServiceUnitStateData(Owned, 
selectedBroker.get(), true, nextVersionId));
-    }
-
-
-    private Optional<ServiceUnitStateData> getOverrideInFlightStateData(
-            String serviceUnit, ServiceUnitStateData orphanData,
-            Set<String> availableBrokers) {
-        long nextVersionId = getNextVersionId(orphanData);
-        var state = orphanData.state();
-        switch (state) {
-            case Assigning: {
-                return getRollForwardStateData(serviceUnit, 
orphanData.dstBroker(), nextVersionId);
-            }
-            case Splitting: {
-                return Optional.of(new ServiceUnitStateData(Splitting,
-                        orphanData.dstBroker(), orphanData.sourceBroker(),
-                        Map.copyOf(orphanData.splitServiceUnitToDestBroker()),
-                        true, nextVersionId));
-            }
-            case Releasing: {
-                if (availableBrokers.contains(orphanData.sourceBroker())) {
-                    // rollback to the src
-                    return Optional.of(new ServiceUnitStateData(Owned, 
orphanData.sourceBroker(), true, nextVersionId));
-                } else {
-                    return getRollForwardStateData(serviceUnit, 
orphanData.sourceBroker(), nextVersionId);
-                }
-            }
-            default: {
-                var msg = String.format("Failed to get the overrideStateData 
from serviceUnit=%s, orphanData=%s",
-                        serviceUnit, orphanData);
-                log.error(msg);
-                throw new IllegalStateException(msg);
-            }
-        }
-    }
 
     @VisibleForTesting
     protected void monitorOwnerships(List<String> brokers) {
@@ -1521,7 +1492,7 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
         long startTime = System.nanoTime();
         Set<String> inactiveBrokers = new HashSet<>();
         Set<String> activeBrokers = new HashSet<>(brokers);
-        Map<String, ServiceUnitStateData> orphanServiceUnits = new HashMap<>();
+        Map<String, ServiceUnitStateData> timedOutInFlightStateServiceUnits = 
new HashMap<>();
         int serviceUnitTombstoneCleanupCnt = 0;
         int orphanServiceUnitCleanupCnt = 0;
         long totalCleanupErrorCntStart = totalCleanupErrorCnt.get();
@@ -1533,20 +1504,27 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
             String srcBroker = stateData.sourceBroker();
             var state = stateData.state();
 
-            if (isActiveState(state) && StringUtils.isNotBlank(srcBroker) && 
!activeBrokers.contains(srcBroker)) {
+            if (state == Owned && (StringUtils.isBlank(dstBroker) || 
!activeBrokers.contains(dstBroker))) {
+                inactiveBrokers.add(dstBroker);
+                continue;
+            }
+
+            if (isInFlightState(state) && StringUtils.isNotBlank(srcBroker) && 
!activeBrokers.contains(srcBroker)) {
                 inactiveBrokers.add(srcBroker);
                 continue;
             }
-            if (isActiveState(state) && StringUtils.isNotBlank(dstBroker) && 
!activeBrokers.contains(dstBroker)) {
+            if (isInFlightState(state) && StringUtils.isNotBlank(dstBroker) && 
!activeBrokers.contains(dstBroker)) {
                 inactiveBrokers.add(dstBroker);
                 continue;
             }
-            if (isActiveState(state) && isInFlightState(state)
+
+            if (isInFlightState(state)
                     && now - stateData.timestamp() > 
inFlightStateWaitingTimeInMillis) {
-                orphanServiceUnits.put(serviceUnit, stateData);
+                timedOutInFlightStateServiceUnits.put(serviceUnit, stateData);
                 continue;
             }
 
+
             if (!isActiveState(state) && now - stateData.timestamp() > 
stateTombstoneDelayTimeInMillis) {
                 log.info("Found semi-terminal states to tombstone"
                         + " serviceUnit:{}, stateData:{}", serviceUnit, 
stateData);
@@ -1562,37 +1540,21 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
             }
         }
 
-        // Skip cleaning orphan bundles if inactiveBrokers exist. This is a 
bigger problem.
+
         if (!inactiveBrokers.isEmpty()) {
             for (String inactiveBroker : inactiveBrokers) {
                 handleBrokerDeletionEvent(inactiveBroker);
             }
-        } else if (!orphanServiceUnits.isEmpty()) {
-            for (var etr : orphanServiceUnits.entrySet()) {
+        }
+
+        // timedOutInFlightStateServiceUnits are the in-flight ones although 
their src and dst brokers are known to
+        // be active.
+        if (!timedOutInFlightStateServiceUnits.isEmpty()) {
+            for (var etr : timedOutInFlightStateServiceUnits.entrySet()) {
                 var orphanServiceUnit = etr.getKey();
                 var orphanData = etr.getValue();
-                var overrideData = getOverrideInFlightStateData(
-                        orphanServiceUnit, orphanData, activeBrokers);
-                if (overrideData.isPresent()) {
-                    log.info("Overriding in-flight state ownership 
serviceUnit:{} "
-                                    + "from orphanData:{} to overrideData:{}",
-                            orphanServiceUnit, orphanData, overrideData);
-                    publishOverrideEventAsync(orphanServiceUnit, orphanData, 
overrideData.get())
-                            .whenComplete((__, e) -> {
-                                if (e != null) {
-                                    log.error("Failed cleaning the ownership 
orphanServiceUnit:{}, orphanData:{}, "
-                                                    + "cleanupErrorCnt:{}.",
-                                            orphanServiceUnit, orphanData,
-                                            
totalCleanupErrorCnt.incrementAndGet() - totalCleanupErrorCntStart, e);
-                                }
-                            });
-                    orphanServiceUnitCleanupCnt++;
-                } else {
-                    log.warn("Failed get the overrideStateData from 
orphanServiceUnit:{}, orphanData:{},"
-                                    + " cleanupErrorCnt:{}. will retry..",
-                            orphanServiceUnit, orphanData,
-                            totalCleanupErrorCnt.incrementAndGet() - 
totalCleanupErrorCntStart);
-                }
+                overrideOwnership(orphanServiceUnit, orphanData, null);
+                orphanServiceUnitCleanupCnt++;
             }
         }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index c62da22ac68..5dec15fc19b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2282,6 +2282,15 @@ public class BrokerService implements Closeable {
         topics.forEach((name, topicFuture) -> {
             TopicName topicName = TopicName.get(name);
             if (serviceUnit.includes(topicName)) {
+                if 
(ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)
+                        && 
ExtensibleLoadManagerImpl.isInternalTopic(topicName.toString())) {
+                    if 
(ExtensibleLoadManagerImpl.debug(pulsar.getConfiguration(), log)) {
+                        log.info("[{}] Skip unloading ExtensibleLoadManager 
internal topics. Such internal topic "
+                                + "should be closed when shutting down the 
broker.", topicName);
+                    }
+                    return;
+                }
+
                 // Topic needs to be unloaded
                 log.info("[{}] Unloading topic", topicName);
                 if (topicFuture.isCompletedExceptionally()) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
index 837aceca141..aef68aff9a2 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
@@ -110,6 +110,7 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
     private ServiceUnitStateChannel channel2;
     private String brokerId1;
     private String brokerId2;
+    private String brokerId3;
     private String bundle;
     private String bundle1;
     private String bundle2;
@@ -161,6 +162,7 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
                 FieldUtils.readDeclaredField(channel1, "brokerId", true);
         brokerId2 = (String)
                 FieldUtils.readDeclaredField(channel2, "brokerId", true);
+        brokerId3 = "broker-3";
 
         bundle = "public/default/0x00000000_0xffffffff";
         bundle1 = "public/default/0x00000000_0xfffffff0";
@@ -1235,7 +1237,8 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
 
 
         var leader = channel1.isChannelOwnerAsync().get() ? channel1 : 
channel2;
-
+        doReturn(CompletableFuture.completedFuture(Optional.of(brokerId1)))
+                .when(loadManager).selectAsync(any(), any(), any());
         waitUntilStateWithMonitor(leader, bundle, Init);
         waitUntilStateWithMonitor(channel1, bundle, Init);
         waitUntilStateWithMonitor(channel2, bundle, Init);
@@ -1426,6 +1429,8 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
                     assertEquals(3, count.get());
                 });
         var leader = channel1.isChannelOwnerAsync().get() ? channel1 : 
channel2;
+        doReturn(CompletableFuture.completedFuture(Optional.of(brokerId1)))
+                .when(loadManager).selectAsync(any(), any(), any());
         ((ServiceUnitStateChannelImpl) leader)
                 .monitorOwnerships(List.of(brokerId1, brokerId2));
 
@@ -1569,26 +1574,40 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
         String broker = brokerId1;
 
         // test override states
-        String releasingBundle = "public/releasing/0xfffffff0_0xffffffff";
+        String releasingBundle1 = "public/releasing1/0xfffffff0_0xffffffff";
+        String releasingBundle2 = "public/releasing2/0xfffffff0_0xffffffff";
         String splittingBundle = bundle;
-        String assigningBundle = "public/assigning/0xfffffff0_0xffffffff";
+        String assigningBundle1 = "public/assigning1/0xfffffff0_0xffffffff";
+        String assigningBundle2 = "public/assigning2/0xfffffff0_0xffffffff";
         String freeBundle = "public/free/0xfffffff0_0xffffffff";
         String deletedBundle = "public/deleted/0xfffffff0_0xffffffff";
-        String ownedBundle = "public/owned/0xfffffff0_0xffffffff";
-        overrideTableViews(releasingBundle,
-                new ServiceUnitStateData(Releasing, null, broker, 1));
+        String ownedBundle1 = "public/owned1/0xfffffff0_0xffffffff";
+        String ownedBundle2 = 
"public/owned2SourceBundle/0xfffffff0_0xffffffff";
+        String ownedBundle3 = "public/owned3/0xfffffff0_0xffffffff";
+        String inactiveBroker = "broker-inactive-1";
+        overrideTableViews(releasingBundle1,
+                new ServiceUnitStateData(Releasing, broker, brokerId2, 1));
+        overrideTableViews(releasingBundle2,
+                new ServiceUnitStateData(Releasing, brokerId2, brokerId3, 1));
         overrideTableViews(splittingBundle,
                 new ServiceUnitStateData(Splitting, null, broker,
                         Map.of(childBundle1Range, Optional.empty(),
                                 childBundle2Range, Optional.empty()), 1));
-        overrideTableViews(assigningBundle,
+        overrideTableViews(assigningBundle1,
                 new ServiceUnitStateData(Assigning, broker, null, 1));
+        overrideTableViews(assigningBundle2,
+                new ServiceUnitStateData(Assigning, broker, brokerId2, 1));
         overrideTableViews(freeBundle,
                 new ServiceUnitStateData(Free, null, broker, 1));
         overrideTableViews(deletedBundle,
                 new ServiceUnitStateData(Deleted, null, broker, 1));
-        overrideTableViews(ownedBundle,
+        overrideTableViews(ownedBundle1,
                 new ServiceUnitStateData(Owned, broker, null, 1));
+        overrideTableViews(ownedBundle2,
+                new ServiceUnitStateData(Owned, broker, inactiveBroker, 1));
+        overrideTableViews(ownedBundle3,
+                new ServiceUnitStateData(Owned, inactiveBroker, broker, 1));
+
 
         // test stable metadata state
         doReturn(CompletableFuture.completedFuture(Optional.of(brokerId2)))
@@ -1598,16 +1617,33 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
         FieldUtils.writeDeclaredField(followerChannel, 
"inFlightStateWaitingTimeInMillis",
                 -1, true);
         ((ServiceUnitStateChannelImpl) leaderChannel)
-                .monitorOwnerships(List.of(brokerId1, brokerId2));
+                .monitorOwnerships(List.of(brokerId1, brokerId2, "broker-3"));
 
-        waitUntilNewOwner(channel2, releasingBundle, broker);
-        waitUntilNewOwner(channel2, childBundle11, broker);
-        waitUntilNewOwner(channel2, childBundle12, broker);
-        waitUntilNewOwner(channel2, assigningBundle, brokerId2);
-        waitUntilNewOwner(channel2, ownedBundle, broker);
-        assertEquals(Optional.empty(), 
channel2.getOwnerAsync(freeBundle).get());
-        
assertTrue(channel2.getOwnerAsync(deletedBundle).isCompletedExceptionally());
+        ServiceUnitStateChannel finalLeaderChannel = leaderChannel;
+        Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> 
getCleanupJobs(finalLeaderChannel).isEmpty());
+
+
+        waitUntilNewOwner(channel2, releasingBundle1, brokerId2);
+        waitUntilNewOwner(channel2, releasingBundle2, brokerId2);
         assertTrue(channel2.getOwnerAsync(splittingBundle).get().isEmpty());
+        waitUntilNewOwner(channel2, childBundle11, brokerId2);
+        waitUntilNewOwner(channel2, childBundle12, brokerId2);
+        waitUntilNewOwner(channel2, assigningBundle1, brokerId2);
+        waitUntilNewOwner(channel2, assigningBundle2, brokerId2);
+        assertTrue(channel2.getOwnerAsync(freeBundle).get().isEmpty());
+        
assertTrue(channel2.getOwnerAsync(deletedBundle).isCompletedExceptionally());
+        waitUntilNewOwner(channel2, ownedBundle1, broker);
+        waitUntilNewOwner(channel2, ownedBundle2, broker);
+        waitUntilNewOwner(channel2, ownedBundle3, brokerId2);
+
+        validateMonitorCounters(leaderChannel,
+                1,
+                0,
+                6,
+                0,
+                1,
+                0,
+                0);
 
         // clean-up
         FieldUtils.writeDeclaredField(channel1,

Reply via email to