This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new bde5ac7fa0e [fix][broker] Make timestamp fields thread safe by using 
volatile (#17252)
bde5ac7fa0e is described below

commit bde5ac7fa0ec438430b5fb1912a74bcef0d73445
Author: Lari Hotari <[email protected]>
AuthorDate: Wed Sep 28 18:24:26 2022 +0300

    [fix][broker] Make timestamp fields thread safe by using volatile (#17252)
    
    - fixes issue with stats where timestamps might be inconsistent because of 
visibility issues
      - fields should be volatile to ensure visibility of updated values in a 
consistent manner
    
    - in replication, the lastDataMessagePublishedTimestamp field in 
PersistentTopic might be inconsistent
      unless volatile is used
---
 .../org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java  |  4 ++--
 .../org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java  |  8 ++++----
 .../main/java/org/apache/pulsar/broker/service/Consumer.java   |  6 +++---
 .../broker/service/persistent/PersistentSubscription.java      |  6 +++---
 .../pulsar/broker/service/persistent/PersistentTopic.java      |  2 +-
 .../service/persistent/ReplicatedSubscriptionsController.java  |  2 +-
 .../java/org/apache/pulsar/compaction/CompactionRecord.java    | 10 +++++-----
 .../apache/pulsar/client/impl/auth/AuthenticationAthenz.java   |  2 +-
 .../org/apache/pulsar/client/impl/AutoClusterFailover.java     |  4 ++--
 9 files changed, 22 insertions(+), 22 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index fe1798a0d7e..b67c74fdc79 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -260,11 +260,11 @@ public class ManagedCursorImpl implements ManagedCursor {
         AtomicIntegerFieldUpdater.newUpdater(ManagedCursorImpl.class, 
"pendingMarkDeletedSubmittedCount");
     @SuppressWarnings("unused")
     private volatile int pendingMarkDeletedSubmittedCount = 0;
-    private long lastLedgerSwitchTimestamp;
+    private volatile long lastLedgerSwitchTimestamp;
     private final Clock clock;
 
     // The last active time (Unix time, milliseconds) of the cursor
-    private long lastActive;
+    private volatile long lastActive;
 
     public enum State {
         Uninitialized, // Cursor is being initialized
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index aa98d258bcb..254ee767bc7 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -215,13 +215,13 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
     private volatile LedgerHandle currentLedger;
     private long currentLedgerEntries = 0;
     private long currentLedgerSize = 0;
-    private long lastLedgerCreatedTimestamp = 0;
-    private long lastLedgerCreationFailureTimestamp = 0;
+    private volatile long lastLedgerCreatedTimestamp = 0;
+    private volatile long lastLedgerCreationFailureTimestamp = 0;
     private long lastLedgerCreationInitiationTimestamp = 0;
 
     private long lastOffloadLedgerId = 0;
-    private long lastOffloadSuccessTimestamp = 0;
-    private long lastOffloadFailureTimestamp = 0;
+    private volatile long lastOffloadSuccessTimestamp = 0;
+    private volatile long lastOffloadFailureTimestamp = 0;
 
     private int minBacklogCursorsForCaching = 0;
     private int minBacklogEntriesForCaching = 1000;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index cc47976e82a..767c7bb9274 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -86,9 +86,9 @@ public class Consumer {
     private final LongAdder bytesOutCounter;
     private final Rate messageAckRate;
 
-    private long lastConsumedTimestamp;
-    private long lastAckedTimestamp;
-    private long lastConsumedFlowTimestamp;
+    private volatile long lastConsumedTimestamp;
+    private volatile long lastAckedTimestamp;
+    private volatile long lastConsumedFlowTimestamp;
     private Rate chunkedMessageRate;
 
     // Represents how many messages we can safely send to the consumer without
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 4afc5b6bdf9..855bec48527 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -107,9 +107,9 @@ public class PersistentSubscription extends 
AbstractSubscription implements Subs
     private volatile int isFenced = FALSE;
     private PersistentMessageExpiryMonitor expiryMonitor;
 
-    private long lastExpireTimestamp = 0L;
-    private long lastConsumedFlowTimestamp = 0L;
-    private long lastMarkDeleteAdvancedTimestamp = 0L;
+    private volatile long lastExpireTimestamp = 0L;
+    private volatile long lastConsumedFlowTimestamp = 0L;
+    private volatile long lastMarkDeleteAdvancedTimestamp = 0L;
 
     // for connected subscriptions, message expiry will be checked if the 
backlog is greater than this threshold
     private static final int MINIMUM_BACKLOG_FOR_EXPIRY_CHECK = 1000;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 33d97970569..fdcaaf2ffbd 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -224,7 +224,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
     protected final TransactionBuffer transactionBuffer;
 
     // Record the last time a data message (ie: not an internal Pulsar marker) 
is published on the topic
-    private long lastDataMessagePublishedTimestamp = 0;
+    private volatile long lastDataMessagePublishedTimestamp = 0;
 
     private static class TopicStatsHelper {
         public double averageMsgSize;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
index 1e1245ed36b..cf46fac4a24 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
@@ -59,7 +59,7 @@ public class ReplicatedSubscriptionsController implements 
AutoCloseable, Topic.P
     private final String localCluster;
 
     // The timestamp of when the last snapshot was initiated
-    private long lastCompletedSnapshotStartTime = 0;
+    private volatile long lastCompletedSnapshotStartTime = 0;
 
     private String lastCompletedSnapshotId;
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactionRecord.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactionRecord.java
index 4a8274389eb..b6ae55b00b8 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactionRecord.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactionRecord.java
@@ -30,16 +30,16 @@ public class CompactionRecord {
             200_000, 1000_000 };
 
     @Getter
-    private long lastCompactionRemovedEventCount = 0L;
+    private volatile long lastCompactionRemovedEventCount = 0L;
     @Getter
-    private long lastCompactionSucceedTimestamp = 0L;
+    private volatile long lastCompactionSucceedTimestamp = 0L;
     @Getter
-    private long lastCompactionFailedTimestamp = 0L;
+    private volatile long lastCompactionFailedTimestamp = 0L;
     @Getter
-    private long lastCompactionDurationTimeInMills = 0L;
+    private volatile long lastCompactionDurationTimeInMills = 0L;
 
     private LongAdder lastCompactionRemovedEventCountOp = new LongAdder();
-    private long lastCompactionStartTimeOp;
+    private volatile long lastCompactionStartTimeOp;
 
     private final LongAdder compactionRemovedEventCount = new LongAdder();
     private final LongAdder compactionSucceedCount = new LongAdder();
diff --git 
a/pulsar-client-auth-athenz/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationAthenz.java
 
b/pulsar-client-auth-athenz/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationAthenz.java
index a10bee1711c..8c719b861b0 100644
--- 
a/pulsar-client-auth-athenz/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationAthenz.java
+++ 
b/pulsar-client-auth-athenz/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationAthenz.java
@@ -65,7 +65,7 @@ public class AuthenticationAthenz implements Authentication, 
EncodedAuthenticati
     // ZTSClient.cancelPrefetch() is called.
     // cf. https://github.com/AthenZ/athenz/issues/544
     private boolean autoPrefetchEnabled = false;
-    private long cachedRoleTokenTimestamp;
+    private volatile long cachedRoleTokenTimestamp;
     private String roleToken;
     // athenz will only give this token if it's at least valid for 2hrs
     private static final int minValidity = 2 * 60 * 60;
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AutoClusterFailover.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AutoClusterFailover.java
index 1f34de71979..7138c79bd9e 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AutoClusterFailover.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AutoClusterFailover.java
@@ -58,8 +58,8 @@ public class AutoClusterFailover implements 
ServiceUrlProvider {
     private final long failoverDelayNs;
     private final long switchBackDelayNs;
     private final ScheduledExecutorService executor;
-    private long recoverTimestamp;
-    private long failedTimestamp;
+    private volatile long recoverTimestamp;
+    private volatile long failedTimestamp;
     private final long intervalMs;
     private static final int TIMEOUT = 30_000;
     private final PulsarServiceNameResolver resolver;

Reply via email to