This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 271c6ee  Fixed initialization order of acknowledgmentsGroupingTracker 
in ConsumerImpl (#2399)
271c6ee is described below

commit 271c6ee56a007506020ecf317c88df95e4511714
Author: Matteo Merli <mme...@apache.org>
AuthorDate: Mon Aug 20 10:39:54 2018 -0700

    Fixed initialization order of acknowledgmentsGroupingTracker in 
ConsumerImpl (#2399)
    
    ### Motivation
    
    With delayed acks enabled (the default), there is a potential race 
condition that lead to a NPE:
    
    ```
    java.lang.NullPointerException
        at 
org.apache.pulsar.client.impl.ConsumerImpl.getClientCnx(ConsumerImpl.java:1446)
        at 
org.apache.pulsar.client.impl.PersistentAcknowledgmentsGroupingTracker.flush(PersistentAcknowledgmentsGroupingTracker.java:154)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        ...
    ```
    
    The reason is that the delayed ack commit task gets scheduled (eg: in 
100ms) and might be executed before the the main thread has finished 
initializing the `ConsumerImpl` instance.
    
    ### Modifications
    
    Reordered the initialization in `ConsumerImpl` constructor to make sure 
`connectionHandler` is already set when we create the 
`PersistentAcknowledgmentsGroupingTracker` instance.
---
 .../org/apache/pulsar/client/impl/ConsumerImpl.java    | 18 +++++++++---------
 1 file changed, 9 insertions(+), 9 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 3e94724..da04534 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -160,15 +160,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         this.readCompacted = conf.isReadCompacted();
         this.subscriptionInitialPosition = 
conf.getSubscriptionInitialPosition();
 
-        TopicName topicName = TopicName.get(topic);
-        if (topicName.isPersistent()) {
-            this.acknowledgmentsGroupingTracker =
-                new PersistentAcknowledgmentsGroupingTracker(this, conf, 
client.eventLoopGroup());
-        } else {
-            this.acknowledgmentsGroupingTracker =
-                NonPersistentAcknowledgmentGroupingTracker.of();
-        }
-
         if (client.getConfiguration().getStatsIntervalSeconds() > 0) {
             stats = new ConsumerStatsRecorderImpl(client, conf, this);
         } else {
@@ -203,6 +194,15 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
             new Backoff(100, TimeUnit.MILLISECONDS, 60, TimeUnit.SECONDS, 0, 
TimeUnit.MILLISECONDS),
             this);
 
+        TopicName topicName = TopicName.get(topic);
+        if (topicName.isPersistent()) {
+            this.acknowledgmentsGroupingTracker =
+                new PersistentAcknowledgmentsGroupingTracker(this, conf, 
client.eventLoopGroup());
+        } else {
+            this.acknowledgmentsGroupingTracker =
+                NonPersistentAcknowledgmentGroupingTracker.of();
+        }
+
         grabCnx();
     }
 

Reply via email to