This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 2efef87e5d9 [fix][broker] Fix the deadlock when using 
BookieRackAffinityMapping with rackaware policy (#21481)
2efef87e5d9 is described below

commit 2efef87e5d936df604633ba65e7a0595f1d3bb9e
Author: erobot <[email protected]>
AuthorDate: Fri Nov 10 18:54:01 2023 +0800

    [fix][broker] Fix the deadlock when using BookieRackAffinityMapping with 
rackaware policy (#21481)
---
 .../rackawareness/BookieRackAffinityMapping.java   |  9 +--
 .../BookieRackAffinityMappingTest.java             | 68 ++++++++++++++++++++++
 2 files changed, 73 insertions(+), 4 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java
index d54ef2a5f4c..983822f2294 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java
@@ -245,6 +245,7 @@ public class BookieRackAffinityMapping extends 
AbstractDNSToSwitchMapping
 
         bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH)
                 .thenAccept(optVal -> {
+                    Set<BookieId> bookieIdSet = new HashSet<>();
                     synchronized (this) {
                         LOG.info("Bookie rack info updated to {}. Notifying 
rackaware policy.", optVal);
                         
this.updateRacksWithHost(optVal.orElseGet(BookiesRackConfiguration::new));
@@ -259,12 +260,12 @@ public class BookieRackAffinityMapping extends 
AbstractDNSToSwitchMapping
                             LOG.debug("Bookies with rack update from {} to 
{}", bookieAddressListLastTime,
                                     bookieAddressList);
                         }
-                        Set<BookieId> bookieIdSet = new 
HashSet<>(bookieAddressList);
+                        bookieIdSet.addAll(bookieAddressList);
                         bookieIdSet.addAll(bookieAddressListLastTime);
                         bookieAddressListLastTime = bookieAddressList;
-                        if (rackawarePolicy != null) {
-                            rackawarePolicy.onBookieRackChange(new 
ArrayList<>(bookieIdSet));
-                        }
+                    }
+                    if (rackawarePolicy != null) {
+                        rackawarePolicy.onBookieRackChange(new 
ArrayList<>(bookieIdSet));
                     }
                 });
     }
diff --git 
a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMappingTest.java
 
b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMappingTest.java
index d7df5afb4be..9cd81604442 100644
--- 
a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMappingTest.java
+++ 
b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMappingTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.bookie.rackawareness;
 import static 
org.apache.bookkeeper.feature.SettableFeatureProvider.DISABLE_ALL;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -28,6 +29,7 @@ import io.netty.util.HashedWheelTimer;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -35,7 +37,11 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
 import org.apache.bookkeeper.client.DefaultBookieAddressResolver;
 import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
 import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy;
@@ -46,6 +52,7 @@ import org.apache.bookkeeper.discover.RegistrationClient;
 import org.apache.bookkeeper.net.BookieId;
 import org.apache.bookkeeper.net.BookieNode;
 import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.net.NetworkTopology;
 import org.apache.bookkeeper.proto.BookieAddressResolver;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
@@ -55,6 +62,8 @@ import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.metadata.api.MetadataStore;
 import org.apache.pulsar.metadata.api.MetadataStoreConfig;
 import org.apache.pulsar.metadata.api.MetadataStoreFactory;
+import org.apache.pulsar.metadata.api.Notification;
+import org.apache.pulsar.metadata.api.NotificationType;
 import org.apache.pulsar.metadata.bookkeeper.BookieServiceInfoSerde;
 import org.apache.pulsar.metadata.bookkeeper.PulsarRegistrationClient;
 import org.awaitility.Awaitility;
@@ -342,4 +351,63 @@ public class BookieRackAffinityMappingTest {
 
         timer.stop();
     }
+
+    @Test
+    public void testNoDeadlockWithRackawarePolicy() throws Exception {
+        ClientConfiguration bkClientConf = new ClientConfiguration();
+        
bkClientConf.setProperty(BookieRackAffinityMapping.METADATA_STORE_INSTANCE, 
store);
+
+        BookieRackAffinityMapping mapping = new BookieRackAffinityMapping();
+        
mapping.setBookieAddressResolver(BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
+        mapping.setConf(bkClientConf);
+
+        @Cleanup("stop")
+        HashedWheelTimer timer = new HashedWheelTimer(new 
ThreadFactoryBuilder().setNameFormat("TestTimer-%d").build(),
+                bkClientConf.getTimeoutTimerTickDurationMs(), 
TimeUnit.MILLISECONDS,
+                bkClientConf.getTimeoutTimerNumTicks());
+
+        RackawareEnsemblePlacementPolicy repp = new 
RackawareEnsemblePlacementPolicy();
+        repp.initialize(bkClientConf, Optional.of(mapping), timer,
+                DISABLE_ALL, NullStatsLogger.INSTANCE, 
BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
+        repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
+
+        mapping.registerRackChangeListener(repp);
+
+        @Cleanup("shutdownNow")
+        ExecutorService executor1 = Executors.newSingleThreadExecutor();
+        @Cleanup("shutdownNow")
+        ExecutorService executor2 = Executors.newSingleThreadExecutor();
+
+        CountDownLatch count = new CountDownLatch(2);
+
+        executor1.submit(() -> {
+            try {
+                Method handleUpdates =
+                        
BookieRackAffinityMapping.class.getDeclaredMethod("handleUpdates", 
Notification.class);
+                handleUpdates.setAccessible(true);
+                Notification n =
+                        new Notification(NotificationType.Modified, 
BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH);
+                long start = System.currentTimeMillis();
+                while (System.currentTimeMillis() - start < 2_000) {
+                    handleUpdates.invoke(mapping, n);
+                }
+                count.countDown();
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        });
+
+        executor2.submit(() -> {
+            Set<BookieId> writableBookies = new HashSet<>();
+            writableBookies.add(BOOKIE1.toBookieId());
+            long start = System.currentTimeMillis();
+            while (System.currentTimeMillis() - start < 2_000) {
+                repp.onClusterChanged(writableBookies, Collections.emptySet());
+                repp.onClusterChanged(Collections.emptySet(), 
Collections.emptySet());
+            }
+            count.countDown();
+        });
+
+        assertTrue(count.await(3, TimeUnit.SECONDS));
+    }
 }

Reply via email to