This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 93b6d20 Fixed initialization order of acknowledgmentsGroupingTracker in ConsumerImpl (#2399) 93b6d20 is described below commit 93b6d209e8c1bfdc3c84eb066dc46da0bed165c5 Author: Matteo Merli <mme...@apache.org> AuthorDate: Mon Aug 20 10:39:54 2018 -0700 Fixed initialization order of acknowledgmentsGroupingTracker in ConsumerImpl (#2399) ### Motivation With delayed acks enabled (the default), there is a potential race condition that lead to a NPE: ``` java.lang.NullPointerException at org.apache.pulsar.client.impl.ConsumerImpl.getClientCnx(ConsumerImpl.java:1446) at org.apache.pulsar.client.impl.PersistentAcknowledgmentsGroupingTracker.flush(PersistentAcknowledgmentsGroupingTracker.java:154) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ... ``` The reason is that the delayed ack commit task gets scheduled (eg: in 100ms) and might be executed before the the main thread has finished initializing the `ConsumerImpl` instance. ### Modifications Reordered the initialization in `ConsumerImpl` constructor to make sure `connectionHandler` is already set when we create the `PersistentAcknowledgmentsGroupingTracker` instance. --- .../org/apache/pulsar/client/impl/ConsumerImpl.java | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 1c69c75..1a7b67b 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -160,15 +160,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle this.readCompacted = conf.isReadCompacted(); this.subscriptionInitialPosition = conf.getSubscriptionInitialPosition(); - TopicName topicName = TopicName.get(topic); - if (topicName.isPersistent()) { - this.acknowledgmentsGroupingTracker = - new PersistentAcknowledgmentsGroupingTracker(this, conf, client.eventLoopGroup()); - } else { - this.acknowledgmentsGroupingTracker = - NonPersistentAcknowledgmentGroupingTracker.of(); - } - if (client.getConfiguration().getStatsIntervalSeconds() > 0) { stats = new ConsumerStatsRecorderImpl(client, conf, this); } else { @@ -203,6 +194,15 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle new Backoff(100, TimeUnit.MILLISECONDS, 60, TimeUnit.SECONDS, 0, TimeUnit.MILLISECONDS), this); + TopicName topicName = TopicName.get(topic); + if (topicName.isPersistent()) { + this.acknowledgmentsGroupingTracker = + new PersistentAcknowledgmentsGroupingTracker(this, conf, client.eventLoopGroup()); + } else { + this.acknowledgmentsGroupingTracker = + NonPersistentAcknowledgmentGroupingTracker.of(); + } + grabCnx(); }