This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.7 by this push:
     new b68606a  [Java Client] Fix concurrency issue in incrementing epoch 
(#10278) (#10436)
b68606a is described below

commit b68606a2c38323ead2fe2524a97507b3a1654659
Author: Lari Hotari <lhot...@users.noreply.github.com>
AuthorDate: Thu Apr 29 14:50:10 2021 +0300

    [Java Client] Fix concurrency issue in incrementing epoch (#10278) (#10436)
---
 .../org/apache/pulsar/client/impl/ConnectionHandler.java   | 14 +++++++++++---
 .../java/org/apache/pulsar/client/impl/ProducerImpl.java   |  2 +-
 2 files changed, 12 insertions(+), 4 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
index a9a8f7c..8802178 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
@@ -19,6 +19,8 @@
 package org.apache.pulsar.client.impl;
 
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -35,7 +37,9 @@ public class ConnectionHandler {
 
     protected final HandlerState state;
     protected final Backoff backoff;
-    protected long epoch = 0L;
+    private static final AtomicLongFieldUpdater<ConnectionHandler> 
EPOCH_UPDATER = AtomicLongFieldUpdater
+            .newUpdater(ConnectionHandler.class, "epoch");
+    private volatile long epoch = 0L;
     protected volatile long lastConnectionClosedTimestamp = 0L;
 
     interface Connection {
@@ -104,11 +108,15 @@ public class ConnectionHandler {
         state.setState(State.Connecting);
         state.client.timer().newTimeout(timeout -> {
             log.info("[{}] [{}] Reconnecting after connection was closed", 
state.topic, state.getHandlerName());
-            ++epoch;
+            incrementEpoch();
             grabCnx();
         }, delayMs, TimeUnit.MILLISECONDS);
     }
 
+    protected long incrementEpoch() {
+        return EPOCH_UPDATER.incrementAndGet(this);
+    }
+
     @VisibleForTesting
     public void connectionClosed(ClientCnx cnx) {
         lastConnectionClosedTimestamp = System.currentTimeMillis();
@@ -124,7 +132,7 @@ public class ConnectionHandler {
                     delayMs / 1000.0);
             state.client.timer().newTimeout(timeout -> {
                 log.info("[{}] [{}] Reconnecting after timeout", state.topic, 
state.getHandlerName());
-                ++epoch;
+                incrementEpoch();
                 grabCnx();
             }, delayMs, TimeUnit.MILLISECONDS);
         }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 528efca..786af84 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -1287,7 +1287,7 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
 
         cnx.sendRequestWithId(
                 Commands.newProducer(topic, producerId, requestId, 
producerName, conf.isEncryptionEnabled(), metadata,
-                       schemaInfo, connectionHandler.epoch, 
userProvidedProducerName),
+                       schemaInfo, connectionHandler.getEpoch(), 
userProvidedProducerName),
                 requestId).thenAccept(response -> {
                     String producerName = response.getProducerName();
                     long lastSequenceId = response.getLastSequenceId();

Reply via email to