This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 00161516c25fa5646cd9007d5232c80e5c058b51 Author: Jiwei Guo <techno...@apache.org> AuthorDate: Thu Nov 18 13:44:42 2021 +0800 Fix TopicPoliciesCacheNotInitException issue. (#12773) Sometimes, we may get `TopicPoliciesCacheNotInitException` with below stack trace: ``` 15:45:47.020 [pulsar-web-41-3] INFO org.eclipse.jetty.server.RequestLog - 10.0.0.42 - - [10/Nov/2021:15:45:47 +0000] "GET /status.html HTTP/1.1" 200 2 "-" "kube-probe/1.19+" 1 15:45:51.221 [pulsar-2-15] ERROR org.apache.pulsar.broker.admin.impl.PersistentTopicsBase - [null] Failed to perform getRetention on topic persistent://public/default/UpdateNodeCharts java.lang.RuntimeException: org.apache.pulsar.broker.service.BrokerServiceException$TopicPoliciesCacheNotInitException: Topic policies cache have not init. at org.apache.pulsar.broker.service.TopicPoliciesService.lambda$getTopicPoliciesAsyncWithRetry$0(TopicPoliciesService.java:84) ~[io.streamnative-pulsar-broker-2.8.1.21.jar:2.8.1.21] at org.apache.pulsar.client.util.RetryUtil.executeWithRetry(RetryUtil.java:50) ~[io.streamnative-pulsar-client-original-2.8.1.21.jar:2.8.1.21] at org.apache.pulsar.client.util.RetryUtil.lambda$executeWithRetry$1(RetryUtil.java:63) ~[io.streamnative-pulsar-client-original-2.8.1.21.jar:2.8.1.21] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?] at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) [?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?] at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.68.Final.jar:4.1.68.Final] at java.lang.Thread.run(Thread.java:829) [?:?] ``` This is because : https://github.com/apache/pulsar/blob/c3da1452a444c9599cb85562a3faa82ddfdecec8/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java#L298-L312 when `reader.readNextAsync()` throws exceptions, the msg will be null which will throw NPE without any catch block. (cherry picked from commit 11298144ac118cda951deffa092ab17110d254b7) --- .../SystemTopicBasedTopicPoliciesService.java | 42 ++++++++++++++-------- .../SystemTopicBasedTopicPoliciesServiceTest.java | 40 +++++++++++++++++++++ .../coordinator/impl/MLTransactionLogImpl.java | 8 +++-- 3 files changed, 74 insertions(+), 16 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java index 292a35c..bea4e8e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java @@ -171,6 +171,10 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic @Override public TopicPolicies getTopicPolicies(TopicName topicName) throws TopicPoliciesCacheNotInitException { + if (!policyCacheInitMap.containsKey(topicName.getNamespaceObject())) { + NamespaceName namespace = topicName.getNamespaceObject(); + prepareInitPoliciesCache(namespace, new CompletableFuture<>()); + } if (policyCacheInitMap.containsKey(topicName.getNamespaceObject()) && !policyCacheInitMap.get(topicName.getNamespaceObject())) { throw new TopicPoliciesCacheNotInitException(); @@ -208,24 +212,29 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic result.complete(null); } else { ownedBundlesCountPerNamespace.putIfAbsent(namespace, new AtomicInteger(1)); - policyCacheInitMap.put(namespace, false); - CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture = - creatSystemTopicClientWithRetry(namespace); - readerCaches.put(namespace, readerCompletableFuture); - readerCompletableFuture.whenComplete((reader, ex) -> { - if (ex != null) { - log.error("[{}] Failed to create reader on __change_events topic", namespace, ex); - result.completeExceptionally(ex); - } else { - initPolicesCache(reader, result); - result.thenRun(() -> readMorePolicies(reader)); - } - }); + prepareInitPoliciesCache(namespace, result); } } return result; } + private void prepareInitPoliciesCache(NamespaceName namespace, CompletableFuture<Void> result) { + if (policyCacheInitMap.putIfAbsent(namespace, false) == null) { + CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture = + creatSystemTopicClientWithRetry(namespace); + readerCaches.put(namespace, readerCompletableFuture); + readerCompletableFuture.whenComplete((reader, ex) -> { + if (ex != null) { + log.error("[{}] Failed to create reader on __change_events topic", namespace, ex); + result.completeExceptionally(ex); + } else { + initPolicesCache(reader, result); + result.thenRun(() -> readMorePolicies(reader)); + } + }); + } + } + protected CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> creatSystemTopicClientWithRetry( NamespaceName namespace) { SystemTopicClient<PulsarEvent> systemTopicClient = namespaceEventsSystemTopicFactory @@ -292,6 +301,9 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic reader.getSystemTopic().getTopicName(), ex); future.completeExceptionally(ex); readerCaches.remove(reader.getSystemTopic().getTopicName().getNamespaceObject()); + policyCacheInitMap.remove(reader.getSystemTopic().getTopicName().getNamespaceObject()); + reader.closeAsync(); + return; } if (hasMore) { reader.readNextAsync().whenComplete((msg, e) -> { @@ -300,6 +312,9 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic reader.getSystemTopic().getTopicName(), ex); future.completeExceptionally(e); readerCaches.remove(reader.getSystemTopic().getTopicName().getNamespaceObject()); + policyCacheInitMap.remove(reader.getSystemTopic().getTopicName().getNamespaceObject()); + reader.closeAsync(); + return; } refreshTopicPoliciesCache(msg); if (log.isDebugEnabled()) { @@ -314,7 +329,6 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic } policyCacheInitMap.computeIfPresent( reader.getSystemTopic().getTopicName().getNamespaceObject(), (k, v) -> true); - // replay policy message policiesCache.forEach(((topicName, topicPolicies) -> { if (listeners.get(topicName) != null) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java index 49d8699..80f3dc9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java @@ -32,8 +32,12 @@ import com.google.common.collect.Sets; import java.lang.reflect.Field; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.service.BrokerServiceException.TopicPoliciesCacheNotInitException; @@ -279,4 +283,40 @@ public class SystemTopicBasedTopicPoliciesServiceTest extends MockedPulsarServic assertEquals(reader1, reader); } + + @Test + public void testGetTopicPoliciesWithRetry() throws Exception { + Field initMapField = SystemTopicBasedTopicPoliciesService.class.getDeclaredField("policyCacheInitMap"); + initMapField.setAccessible(true); + Map<NamespaceName, Boolean> initMap = (Map)initMapField.get(systemTopicBasedTopicPoliciesService); + initMap.remove(NamespaceName.get(NAMESPACE1)); + Field readerCaches = SystemTopicBasedTopicPoliciesService.class.getDeclaredField("readerCaches"); + readerCaches.setAccessible(true); + Map<NamespaceName, CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>> readers = (Map)readerCaches.get(systemTopicBasedTopicPoliciesService); + readers.remove(NamespaceName.get(NAMESPACE1)); + Backoff backoff = new BackoffBuilder() + .setInitialTime(500, TimeUnit.MILLISECONDS) + .setMandatoryStop(5000, TimeUnit.MILLISECONDS) + .setMax(1000, TimeUnit.MILLISECONDS) + .create(); + TopicPolicies initPolicy = TopicPolicies.builder() + .maxConsumerPerTopic(10) + .build(); + ScheduledExecutorService executors = Executors.newScheduledThreadPool(1); + executors.schedule(new Runnable() { + @Override + public void run() { + try { + systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC1, initPolicy).get(); + } catch (Exception ignore) {} + } + }, 2000, TimeUnit.MILLISECONDS); + Awaitility.await().untilAsserted(() -> { + Optional<TopicPolicies> topicPolicies = systemTopicBasedTopicPoliciesService.getTopicPoliciesAsyncWithRetry(TOPIC1, backoff, pulsar.getExecutor()).get(); + Assert.assertTrue(topicPolicies.isPresent()); + if (topicPolicies.isPresent()) { + Assert.assertEquals(topicPolicies.get(), initPolicy); + } + }); + } } diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java index f2324af..c044275 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java @@ -73,8 +73,7 @@ public class MLTransactionLogImpl implements TransactionLog { public MLTransactionLogImpl(TransactionCoordinatorID tcID, ManagedLedgerFactory managedLedgerFactory, ManagedLedgerConfig managedLedgerConfig) { - this.topicName = TopicName.get(TopicDomain.persistent.value(), - NamespaceName.SYSTEM_NAMESPACE, TRANSACTION_LOG_PREFIX + tcID.getId()); + this.topicName = getMLTransactionLogName(tcID); this.tcId = tcID.getId(); this.mlTransactionLogInterceptor = new MLTransactionLogInterceptor(); managedLedgerConfig.setManagedLedgerInterceptor(this.mlTransactionLogInterceptor); @@ -83,6 +82,11 @@ public class MLTransactionLogImpl implements TransactionLog { this.entryQueue = new SpscArrayQueue<>(2000); } + public static TopicName getMLTransactionLogName(TransactionCoordinatorID tcID) { + return TopicName.get(TopicDomain.persistent.value(), + NamespaceName.SYSTEM_NAMESPACE, TRANSACTION_LOG_PREFIX + tcID.getId()); + } + @Override public CompletableFuture<Void> initialize() { CompletableFuture<Void> future = new CompletableFuture<>();