This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 9ca52411ea665d2292d12a84b0e598475792515f Author: ken <[email protected]> AuthorDate: Wed Dec 10 22:56:28 2025 +0800 [fix][broker] fix prepareInitPoliciesCacheAsync in SystemTopicBasedTopicPoliciesService (#24980) Co-authored-by: fanjianye <[email protected]> (cherry picked from commit 47b8d5d86f1d19c324483d25ad8ed01679389eb9) --- .../SystemTopicBasedTopicPoliciesService.java | 77 +++++---- .../SystemTopicBasedTopicPoliciesServiceTest.java | 192 +++++++++++++++++++++ 2 files changed, 232 insertions(+), 37 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java index 8287583a3d7..c3d88b9c723 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java @@ -570,42 +570,53 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic return CompletableFuture.completedFuture(false); } return pulsarService.getPulsarResources().getNamespaceResources().getPoliciesAsync(namespace) - .thenCompose(namespacePolicies -> { - if (namespacePolicies.isEmpty() || namespacePolicies.get().deleted) { - log.info("[{}] skip prepare init policies cache since the namespace is deleted", - namespace); - return CompletableFuture.completedFuture(false); - } + .thenCompose(namespacePolicies -> { + if (namespacePolicies.isEmpty() || namespacePolicies.get().deleted) { + log.info("[{}] skip prepare init policies cache since the namespace is deleted", + namespace); + return CompletableFuture.completedFuture(false); + } - return policyCacheInitMap.computeIfAbsent(namespace, (k) -> { - final CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture = - newReader(namespace); - final CompletableFuture<Void> initFuture = readerCompletableFuture - .thenCompose(reader -> { - final CompletableFuture<Void> stageFuture = new CompletableFuture<>(); - initPolicesCache(reader, stageFuture); - return stageFuture - // Read policies in background - .thenAccept(__ -> readMorePoliciesAsync(reader)); - }); - initFuture.exceptionallyAsync(ex -> { + CompletableFuture<Void> initNamespacePolicyFuture = new CompletableFuture<>(); + CompletableFuture<Void> existingFuture = + policyCacheInitMap.putIfAbsent(namespace, initNamespacePolicyFuture); + if (existingFuture == null) { + final CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture = + newReader(namespace); + readerCompletableFuture + .thenCompose(reader -> { + final CompletableFuture<Void> stageFuture = new CompletableFuture<>(); + initPolicesCache(reader, stageFuture); + return stageFuture + // Read policies in background + .thenAccept(__ -> readMorePoliciesAsync(reader)); + }).thenApply(__ -> { + initNamespacePolicyFuture.complete(null); + return null; + }).exceptionally(ex -> { try { - if (closed.get()) { - return null; + if (readerCompletableFuture.isCompletedExceptionally()) { + log.error("[{}] Failed to create reader on __change_events topic", + namespace, ex); + initNamespacePolicyFuture.completeExceptionally(ex); + cleanPoliciesCacheInitMap(namespace, true); + } else { + initNamespacePolicyFuture.completeExceptionally(ex); + cleanPoliciesCacheInitMap(namespace, isAlreadyClosedException(ex)); } - cleanPoliciesCacheInitMap( - namespace, readerCompletableFuture.isCompletedExceptionally()); } catch (Throwable cleanupEx) { // Adding this catch to avoid break callback chain log.error("[{}] Failed to cleanup reader on __change_events topic", namespace, cleanupEx); } return null; - }, pulsarService.getExecutor()); - // let caller know we've got an exception. - return initFuture; - }).thenApply(__ -> true); - }); + }); + + return initNamespacePolicyFuture.thenApply(__ -> true); + } else { + return existingFuture.thenApply(__ -> true); + } + }); } private CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> newReader(NamespaceName ns) { @@ -614,10 +625,6 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic return createSystemTopicClient(ns); } - if (existingFuture.isDone() && existingFuture.isCompletedExceptionally()) { - return existingFuture.exceptionallyCompose(ex -> - isAlreadyClosedException(ex) ? existingFuture : createSystemTopicClient(ns)); - } return existingFuture; }); } @@ -687,8 +694,6 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic log.error("[{}] Failed to check the move events for the system topic", reader.getSystemTopic().getTopicName(), ex); future.completeExceptionally(ex); - cleanPoliciesCacheInitMap(reader.getSystemTopic().getTopicName().getNamespaceObject(), - isAlreadyClosedException(ex)); return; } if (hasMore) { @@ -707,8 +712,6 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic log.error("[{}] Failed to read event from the system topic.", reader.getSystemTopic().getTopicName(), e); future.completeExceptionally(e); - cleanPoliciesCacheInitMap(reader.getSystemTopic().getTopicName().getNamespaceObject(), - isAlreadyClosedException(ex)); return null; }); } else { @@ -734,8 +737,8 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic }); } - - private void cleanPoliciesCacheInitMap(@NonNull NamespaceName namespace, boolean closeReader) { + @VisibleForTesting + void cleanPoliciesCacheInitMap(@NonNull NamespaceName namespace, boolean closeReader) { if (!closeReader) { policyCacheInitMap.remove(namespace); return; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java index 4326f83d763..a14d268e8ac 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java @@ -19,11 +19,18 @@ package org.apache.pulsar.broker.service; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.testng.AssertJUnit.assertEquals; +import static org.testng.AssertJUnit.assertFalse; import static org.testng.AssertJUnit.assertNotNull; import static org.testng.AssertJUnit.assertNull; +import static org.testng.AssertJUnit.assertTrue; import java.time.Duration; +import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -38,6 +45,10 @@ import java.util.concurrent.Executors; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.reflect.FieldUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.core.LogEvent; +import org.apache.logging.log4j.core.Logger; +import org.apache.logging.log4j.core.appender.AbstractAppender; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.systopic.SystemTopicClient; import org.apache.pulsar.client.admin.PulsarAdminException; @@ -469,4 +480,185 @@ public class SystemTopicBasedTopicPoliciesServiceTest extends MockedPulsarServic Assert.assertNotNull(topicPolicies); Assert.assertEquals(topicPolicies.getMaxConsumerPerTopic(), 10); } + + @Test + public void testPrepareInitPoliciesCacheAsyncThrowExceptionAfterCreateReader() throws Exception { + // catch the log output in SystemTopicBasedTopicPoliciesService + Logger logger = (Logger) LogManager.getLogger(SystemTopicBasedTopicPoliciesService.class); + List<String> logMessages = new ArrayList<>(); + AbstractAppender appender = new AbstractAppender("TestAppender", null, null) { + @Override + public void append(LogEvent event) { + logMessages.add(event.getMessage().getFormattedMessage()); + } + }; + appender.start(); + logger.addAppender(appender); + + // create namespace-5 and topic + SystemTopicBasedTopicPoliciesService spyService = + Mockito.spy(new SystemTopicBasedTopicPoliciesService(pulsar)); + FieldUtils.writeField(pulsar, "topicPoliciesService", spyService, true); + + + admin.namespaces().createNamespace(NAMESPACE5); + final String topic = "persistent://" + NAMESPACE5 + "/test" + UUID.randomUUID(); + admin.topics().createPartitionedTopic(topic, 1); + + CompletableFuture<Void> future = spyService.getPoliciesCacheInit(NamespaceName.get(NAMESPACE5)); + Assert.assertNull(future); + + // mock readerCache and new a reader, then put this reader in readerCache. + // when new reader, would trigger __change_event topic of namespace-5 created + // and would trigger prepareInitPoliciesCacheAsync() + ConcurrentHashMap<NamespaceName, CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>> + spyReaderCaches = new ConcurrentHashMap<>(); + CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture = + spyService.createSystemTopicClient(NamespaceName.get(NAMESPACE5)); + spyReaderCaches.put(NamespaceName.get(NAMESPACE5), readerCompletableFuture); + FieldUtils.writeDeclaredField(spyService, "readerCaches", spyReaderCaches, true); + + // set topic policy. create producer for __change_event topic + admin.topicPolicies().setMaxConsumersPerSubscription(topic, 1); + future = spyService.getPoliciesCacheInit(NamespaceName.get(NAMESPACE5)); + Assert.assertNotNull(future); + + // trigger close reader of __change_event directly, simulate that reader + // is closed for some reason, such as topic unload or broker restart. + // since prepareInitPoliciesCacheAsync() has been executed, it would go into readMorePoliciesAsync(), + // throw exception, output "Closing the topic policies reader for" and do cleanPoliciesCacheInitMap() + SystemTopicClient.Reader<PulsarEvent> reader = readerCompletableFuture.get(); + reader.close(); + log.info("successfully close spy reader"); + Awaitility.await().untilAsserted(() -> { + boolean logFound = logMessages.stream() + .anyMatch(msg -> msg.contains("Closing the topic policies reader for")); + assertTrue(logFound); + }); + + + // Since cleanPoliciesCacheInitMap() is executed, should add the failed reader into readerCache again. + // Then in SystemTopicBasedTopicPoliciesService, readerCache has a closed reader, + // and policyCacheInitMap do not contain a future. + // To simulate the situation: when getTopicPolicy() execute, it will do prepareInitPoliciesCacheAsync() and + // use a closed reader to read the __change_event topic. Then throw exception + spyReaderCaches.put(NamespaceName.get(NAMESPACE5), readerCompletableFuture); + FieldUtils.writeDeclaredField(spyService, "readerCaches", spyReaderCaches, true); + + CompletableFuture<Boolean> prepareFuture = new CompletableFuture<>(); + try { + prepareFuture = spyService.prepareInitPoliciesCacheAsync(NamespaceName.get(NAMESPACE5)); + prepareFuture.get(); + Assert.fail(); + } catch (Exception e) { + // that is ok + } + + // since prepareInitPoliciesCacheAsync() throw exception when initPolicesCache(), + // would clean readerCache and policyCacheInitMap. + Assert.assertTrue(prepareFuture.isCompletedExceptionally()); + Awaitility.await().untilAsserted(() -> { + CompletableFuture<Void> future1 = spyService.getPoliciesCacheInit(NamespaceName.get(NAMESPACE5)); + Assert.assertNull(future1); + CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture1 = + spyReaderCaches.get(NamespaceName.get(NAMESPACE5)); + Assert.assertNull(readerCompletableFuture1); + }); + + + // make sure not do cleanPoliciesCacheInitMap() twice + // totally trigger prepareInitPoliciesCacheAsync() twice, so the time of cleanPoliciesCacheInitMap() is 2. + // in previous code, the time would be 3 + boolean logFound = logMessages.stream() + .anyMatch(msg -> msg.contains("Failed to create reader on __change_events topic")); + assertFalse(logFound); + boolean logFound2 = logMessages.stream() + .anyMatch(msg -> msg.contains("Failed to check the move events for the system topic")); + assertTrue(logFound2); + verify(spyService, times(2)).cleanPoliciesCacheInitMap(any(), anyBoolean()); + + // make sure not occur Recursive update + boolean logFound3 = logMessages.stream() + .anyMatch(msg -> msg.contains("Recursive update")); + assertFalse(logFound3); + + // clean log appender + appender.stop(); + logger.removeAppender(appender); + } + + @Test + public void testPrepareInitPoliciesCacheAsyncThrowExceptionInCreateReader() throws Exception { + // catch the log output in SystemTopicBasedTopicPoliciesService + Logger logger = (Logger) LogManager.getLogger(SystemTopicBasedTopicPoliciesService.class); + List<String> logMessages = new ArrayList<>(); + AbstractAppender appender = new AbstractAppender("TestAppender", null, null) { + @Override + public void append(LogEvent event) { + logMessages.add(event.getMessage().getFormattedMessage()); + } + }; + appender.start(); + logger.addAppender(appender); + + // create namespace-5 and topic + SystemTopicBasedTopicPoliciesService spyService = + Mockito.spy(new SystemTopicBasedTopicPoliciesService(pulsar)); + FieldUtils.writeField(pulsar, "topicPoliciesService", spyService, true); + + + admin.namespaces().createNamespace(NAMESPACE5); + final String topic = "persistent://" + NAMESPACE5 + "/test" + UUID.randomUUID(); + admin.topics().createPartitionedTopic(topic, 1); + + CompletableFuture<Void> future = spyService.getPoliciesCacheInit(NamespaceName.get(NAMESPACE5)); + Assert.assertNull(future); + + // mock readerCache and put a failed readerCreateFuture in readerCache. + // simulate that when trigger prepareInitPoliciesCacheAsync(), + // it would use this failed readerFuture and go into corresponding logic + ConcurrentHashMap<NamespaceName, CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>> + spyReaderCaches = new ConcurrentHashMap<>(); + CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture = new CompletableFuture<>(); + readerCompletableFuture.completeExceptionally(new Exception("create reader fail")); + spyReaderCaches.put(NamespaceName.get(NAMESPACE5), readerCompletableFuture); + FieldUtils.writeDeclaredField(spyService, "readerCaches", spyReaderCaches, true); + + // trigger prepareInitPoliciesCacheAsync() + CompletableFuture<Boolean> prepareFuture = new CompletableFuture<>(); + try { + prepareFuture = spyService.prepareInitPoliciesCacheAsync(NamespaceName.get(NAMESPACE5)); + prepareFuture.get(); + Assert.fail(); + } catch (Exception e) { + // that is ok + } + + // since prepareInitPoliciesCacheAsync() throw exception when createReader, + // would clean readerCache and policyCacheInitMap. + Assert.assertTrue(prepareFuture.isCompletedExceptionally()); + Awaitility.await().untilAsserted(() -> { + CompletableFuture<Void> future1 = spyService.getPoliciesCacheInit(NamespaceName.get(NAMESPACE5)); + Assert.assertNull(future1); + CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture1 = + spyReaderCaches.get(NamespaceName.get(NAMESPACE5)); + Assert.assertNull(readerCompletableFuture1); + }); + + + // make sure not do cleanPoliciesCacheInitMap() twice + // totally trigger prepareInitPoliciesCacheAsync() once, so the time of cleanPoliciesCacheInitMap() is 1. + boolean logFound = logMessages.stream() + .anyMatch(msg -> msg.contains("Failed to create reader on __change_events topic")); + assertTrue(logFound); + boolean logFound2 = logMessages.stream() + .anyMatch(msg -> msg.contains("Failed to check the move events for the system topic") + || msg.contains("Failed to read event from the system topic")); + assertFalse(logFound2); + verify(spyService, times(1)).cleanPoliciesCacheInitMap(any(), anyBoolean()); + + // clean log appender + appender.stop(); + logger.removeAppender(appender); + } }
