This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 00161516c25fa5646cd9007d5232c80e5c058b51
Author: Jiwei Guo <techno...@apache.org>
AuthorDate: Thu Nov 18 13:44:42 2021 +0800

    Fix TopicPoliciesCacheNotInitException issue. (#12773)
    
    Sometimes, we may get `TopicPoliciesCacheNotInitException` with below stack 
trace:
    ```
    15:45:47.020 [pulsar-web-41-3] INFO  org.eclipse.jetty.server.RequestLog - 
10.0.0.42 - - [10/Nov/2021:15:45:47 +0000] "GET /status.html HTTP/1.1" 200 2 
"-" "kube-probe/1.19+" 1
    15:45:51.221 [pulsar-2-15] ERROR 
org.apache.pulsar.broker.admin.impl.PersistentTopicsBase - [null] Failed to 
perform getRetention on topic persistent://public/default/UpdateNodeCharts
    java.lang.RuntimeException: 
org.apache.pulsar.broker.service.BrokerServiceException$TopicPoliciesCacheNotInitException:
 Topic policies cache have not init.
        at 
org.apache.pulsar.broker.service.TopicPoliciesService.lambda$getTopicPoliciesAsyncWithRetry$0(TopicPoliciesService.java:84)
 ~[io.streamnative-pulsar-broker-2.8.1.21.jar:2.8.1.21]
        at 
org.apache.pulsar.client.util.RetryUtil.executeWithRetry(RetryUtil.java:50) 
~[io.streamnative-pulsar-client-original-2.8.1.21.jar:2.8.1.21]
        at 
org.apache.pulsar.client.util.RetryUtil.lambda$executeWithRetry$1(RetryUtil.java:63)
 ~[io.streamnative-pulsar-client-original-2.8.1.21.jar:2.8.1.21]
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
        at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
 [?:?]
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 
[?:?]
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 
[?:?]
        at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
 [io.netty-netty-common-4.1.68.Final.jar:4.1.68.Final]
        at java.lang.Thread.run(Thread.java:829) [?:?]
    ```
    
    This is because : 
https://github.com/apache/pulsar/blob/c3da1452a444c9599cb85562a3faa82ddfdecec8/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java#L298-L312
    
    when `reader.readNextAsync()` throws exceptions, the msg will be null which 
will throw NPE without any catch block.
    
    (cherry picked from commit 11298144ac118cda951deffa092ab17110d254b7)
---
 .../SystemTopicBasedTopicPoliciesService.java      | 42 ++++++++++++++--------
 .../SystemTopicBasedTopicPoliciesServiceTest.java  | 40 +++++++++++++++++++++
 .../coordinator/impl/MLTransactionLogImpl.java     |  8 +++--
 3 files changed, 74 insertions(+), 16 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 292a35c..bea4e8e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -171,6 +171,10 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
 
     @Override
     public TopicPolicies getTopicPolicies(TopicName topicName) throws 
TopicPoliciesCacheNotInitException {
+        if (!policyCacheInitMap.containsKey(topicName.getNamespaceObject())) {
+            NamespaceName namespace = topicName.getNamespaceObject();
+            prepareInitPoliciesCache(namespace, new CompletableFuture<>());
+        }
         if (policyCacheInitMap.containsKey(topicName.getNamespaceObject())
                 && !policyCacheInitMap.get(topicName.getNamespaceObject())) {
             throw new TopicPoliciesCacheNotInitException();
@@ -208,24 +212,29 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
                 result.complete(null);
             } else {
                 ownedBundlesCountPerNamespace.putIfAbsent(namespace, new 
AtomicInteger(1));
-                policyCacheInitMap.put(namespace, false);
-                CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> 
readerCompletableFuture =
-                        creatSystemTopicClientWithRetry(namespace);
-                readerCaches.put(namespace, readerCompletableFuture);
-                readerCompletableFuture.whenComplete((reader, ex) -> {
-                    if (ex != null) {
-                        log.error("[{}] Failed to create reader on 
__change_events topic", namespace, ex);
-                        result.completeExceptionally(ex);
-                    } else {
-                        initPolicesCache(reader, result);
-                        result.thenRun(() -> readMorePolicies(reader));
-                    }
-                });
+                prepareInitPoliciesCache(namespace, result);
             }
         }
         return result;
     }
 
+    private void prepareInitPoliciesCache(NamespaceName namespace, 
CompletableFuture<Void> result) {
+        if (policyCacheInitMap.putIfAbsent(namespace, false) == null) {
+            CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> 
readerCompletableFuture =
+                    creatSystemTopicClientWithRetry(namespace);
+            readerCaches.put(namespace, readerCompletableFuture);
+            readerCompletableFuture.whenComplete((reader, ex) -> {
+                if (ex != null) {
+                    log.error("[{}] Failed to create reader on __change_events 
topic", namespace, ex);
+                    result.completeExceptionally(ex);
+                } else {
+                    initPolicesCache(reader, result);
+                    result.thenRun(() -> readMorePolicies(reader));
+                }
+            });
+        }
+    }
+
     protected CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> 
creatSystemTopicClientWithRetry(
             NamespaceName namespace) {
         SystemTopicClient<PulsarEvent> systemTopicClient = 
namespaceEventsSystemTopicFactory
@@ -292,6 +301,9 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
                         reader.getSystemTopic().getTopicName(), ex);
                 future.completeExceptionally(ex);
                 
readerCaches.remove(reader.getSystemTopic().getTopicName().getNamespaceObject());
+                
policyCacheInitMap.remove(reader.getSystemTopic().getTopicName().getNamespaceObject());
+                reader.closeAsync();
+                return;
             }
             if (hasMore) {
                 reader.readNextAsync().whenComplete((msg, e) -> {
@@ -300,6 +312,9 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
                                 reader.getSystemTopic().getTopicName(), ex);
                         future.completeExceptionally(e);
                         
readerCaches.remove(reader.getSystemTopic().getTopicName().getNamespaceObject());
+                        
policyCacheInitMap.remove(reader.getSystemTopic().getTopicName().getNamespaceObject());
+                        reader.closeAsync();
+                        return;
                     }
                     refreshTopicPoliciesCache(msg);
                     if (log.isDebugEnabled()) {
@@ -314,7 +329,6 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
                 }
                 policyCacheInitMap.computeIfPresent(
                         
reader.getSystemTopic().getTopicName().getNamespaceObject(), (k, v) -> true);
-
                 // replay policy message
                 policiesCache.forEach(((topicName, topicPolicies) -> {
                     if (listeners.get(topicName) != null) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
index 49d8699..80f3dc9 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
@@ -32,8 +32,12 @@ import com.google.common.collect.Sets;
 import java.lang.reflect.Field;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.TopicPoliciesCacheNotInitException;
@@ -279,4 +283,40 @@ public class SystemTopicBasedTopicPoliciesServiceTest 
extends MockedPulsarServic
 
         assertEquals(reader1, reader);
     }
+
+    @Test
+    public void testGetTopicPoliciesWithRetry() throws Exception {
+        Field initMapField = 
SystemTopicBasedTopicPoliciesService.class.getDeclaredField("policyCacheInitMap");
+        initMapField.setAccessible(true);
+        Map<NamespaceName, Boolean> initMap = 
(Map)initMapField.get(systemTopicBasedTopicPoliciesService);
+        initMap.remove(NamespaceName.get(NAMESPACE1));
+        Field readerCaches = 
SystemTopicBasedTopicPoliciesService.class.getDeclaredField("readerCaches");
+        readerCaches.setAccessible(true);
+        Map<NamespaceName, 
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>> readers = 
(Map)readerCaches.get(systemTopicBasedTopicPoliciesService);
+        readers.remove(NamespaceName.get(NAMESPACE1));
+        Backoff backoff = new BackoffBuilder()
+                .setInitialTime(500, TimeUnit.MILLISECONDS)
+                .setMandatoryStop(5000, TimeUnit.MILLISECONDS)
+                .setMax(1000, TimeUnit.MILLISECONDS)
+                .create();
+        TopicPolicies initPolicy = TopicPolicies.builder()
+                .maxConsumerPerTopic(10)
+                .build();
+        ScheduledExecutorService executors = 
Executors.newScheduledThreadPool(1);
+        executors.schedule(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    
systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC1, 
initPolicy).get();
+                } catch (Exception ignore) {}
+            }
+        }, 2000, TimeUnit.MILLISECONDS);
+        Awaitility.await().untilAsserted(() -> {
+            Optional<TopicPolicies> topicPolicies = 
systemTopicBasedTopicPoliciesService.getTopicPoliciesAsyncWithRetry(TOPIC1, 
backoff, pulsar.getExecutor()).get();
+            Assert.assertTrue(topicPolicies.isPresent());
+            if (topicPolicies.isPresent()) {
+                Assert.assertEquals(topicPolicies.get(), initPolicy);
+            }
+        });
+    }
 }
diff --git 
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
 
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
index f2324af..c044275 100644
--- 
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
+++ 
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
@@ -73,8 +73,7 @@ public class MLTransactionLogImpl implements TransactionLog {
     public MLTransactionLogImpl(TransactionCoordinatorID tcID,
                                 ManagedLedgerFactory managedLedgerFactory,
                                 ManagedLedgerConfig managedLedgerConfig) {
-        this.topicName = TopicName.get(TopicDomain.persistent.value(),
-                NamespaceName.SYSTEM_NAMESPACE, TRANSACTION_LOG_PREFIX + 
tcID.getId());
+        this.topicName = getMLTransactionLogName(tcID);
         this.tcId = tcID.getId();
         this.mlTransactionLogInterceptor = new MLTransactionLogInterceptor();
         
managedLedgerConfig.setManagedLedgerInterceptor(this.mlTransactionLogInterceptor);
@@ -83,6 +82,11 @@ public class MLTransactionLogImpl implements TransactionLog {
         this.entryQueue = new SpscArrayQueue<>(2000);
     }
 
+    public static TopicName getMLTransactionLogName(TransactionCoordinatorID 
tcID) {
+        return TopicName.get(TopicDomain.persistent.value(),
+                NamespaceName.SYSTEM_NAMESPACE, TRANSACTION_LOG_PREFIX + 
tcID.getId());
+    }
+
     @Override
     public CompletableFuture<Void> initialize() {
         CompletableFuture<Void> future = new CompletableFuture<>();

Reply via email to