This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new bde5ac7fa0e [fix][broker] Make timestamp fields thread safe by using
volatile (#17252)
bde5ac7fa0e is described below
commit bde5ac7fa0ec438430b5fb1912a74bcef0d73445
Author: Lari Hotari <[email protected]>
AuthorDate: Wed Sep 28 18:24:26 2022 +0300
[fix][broker] Make timestamp fields thread safe by using volatile (#17252)
- fixes issue with stats where timestamps might be inconsistent because of
visibility issues
- fields should be volatile to ensure visibility of updated values in a
consistent manner
- in replication, the lastDataMessagePublishedTimestamp field in
PersistentTopic might be inconsistent
unless volatile is used
---
.../org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java | 4 ++--
.../org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java | 8 ++++----
.../main/java/org/apache/pulsar/broker/service/Consumer.java | 6 +++---
.../broker/service/persistent/PersistentSubscription.java | 6 +++---
.../pulsar/broker/service/persistent/PersistentTopic.java | 2 +-
.../service/persistent/ReplicatedSubscriptionsController.java | 2 +-
.../java/org/apache/pulsar/compaction/CompactionRecord.java | 10 +++++-----
.../apache/pulsar/client/impl/auth/AuthenticationAthenz.java | 2 +-
.../org/apache/pulsar/client/impl/AutoClusterFailover.java | 4 ++--
9 files changed, 22 insertions(+), 22 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index fe1798a0d7e..b67c74fdc79 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -260,11 +260,11 @@ public class ManagedCursorImpl implements ManagedCursor {
AtomicIntegerFieldUpdater.newUpdater(ManagedCursorImpl.class,
"pendingMarkDeletedSubmittedCount");
@SuppressWarnings("unused")
private volatile int pendingMarkDeletedSubmittedCount = 0;
- private long lastLedgerSwitchTimestamp;
+ private volatile long lastLedgerSwitchTimestamp;
private final Clock clock;
// The last active time (Unix time, milliseconds) of the cursor
- private long lastActive;
+ private volatile long lastActive;
public enum State {
Uninitialized, // Cursor is being initialized
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index aa98d258bcb..254ee767bc7 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -215,13 +215,13 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
private volatile LedgerHandle currentLedger;
private long currentLedgerEntries = 0;
private long currentLedgerSize = 0;
- private long lastLedgerCreatedTimestamp = 0;
- private long lastLedgerCreationFailureTimestamp = 0;
+ private volatile long lastLedgerCreatedTimestamp = 0;
+ private volatile long lastLedgerCreationFailureTimestamp = 0;
private long lastLedgerCreationInitiationTimestamp = 0;
private long lastOffloadLedgerId = 0;
- private long lastOffloadSuccessTimestamp = 0;
- private long lastOffloadFailureTimestamp = 0;
+ private volatile long lastOffloadSuccessTimestamp = 0;
+ private volatile long lastOffloadFailureTimestamp = 0;
private int minBacklogCursorsForCaching = 0;
private int minBacklogEntriesForCaching = 1000;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index cc47976e82a..767c7bb9274 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -86,9 +86,9 @@ public class Consumer {
private final LongAdder bytesOutCounter;
private final Rate messageAckRate;
- private long lastConsumedTimestamp;
- private long lastAckedTimestamp;
- private long lastConsumedFlowTimestamp;
+ private volatile long lastConsumedTimestamp;
+ private volatile long lastAckedTimestamp;
+ private volatile long lastConsumedFlowTimestamp;
private Rate chunkedMessageRate;
// Represents how many messages we can safely send to the consumer without
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 4afc5b6bdf9..855bec48527 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -107,9 +107,9 @@ public class PersistentSubscription extends
AbstractSubscription implements Subs
private volatile int isFenced = FALSE;
private PersistentMessageExpiryMonitor expiryMonitor;
- private long lastExpireTimestamp = 0L;
- private long lastConsumedFlowTimestamp = 0L;
- private long lastMarkDeleteAdvancedTimestamp = 0L;
+ private volatile long lastExpireTimestamp = 0L;
+ private volatile long lastConsumedFlowTimestamp = 0L;
+ private volatile long lastMarkDeleteAdvancedTimestamp = 0L;
// for connected subscriptions, message expiry will be checked if the
backlog is greater than this threshold
private static final int MINIMUM_BACKLOG_FOR_EXPIRY_CHECK = 1000;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 33d97970569..fdcaaf2ffbd 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -224,7 +224,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
protected final TransactionBuffer transactionBuffer;
// Record the last time a data message (ie: not an internal Pulsar marker)
is published on the topic
- private long lastDataMessagePublishedTimestamp = 0;
+ private volatile long lastDataMessagePublishedTimestamp = 0;
private static class TopicStatsHelper {
public double averageMsgSize;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
index 1e1245ed36b..cf46fac4a24 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
@@ -59,7 +59,7 @@ public class ReplicatedSubscriptionsController implements
AutoCloseable, Topic.P
private final String localCluster;
// The timestamp of when the last snapshot was initiated
- private long lastCompletedSnapshotStartTime = 0;
+ private volatile long lastCompletedSnapshotStartTime = 0;
private String lastCompletedSnapshotId;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactionRecord.java
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactionRecord.java
index 4a8274389eb..b6ae55b00b8 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactionRecord.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactionRecord.java
@@ -30,16 +30,16 @@ public class CompactionRecord {
200_000, 1000_000 };
@Getter
- private long lastCompactionRemovedEventCount = 0L;
+ private volatile long lastCompactionRemovedEventCount = 0L;
@Getter
- private long lastCompactionSucceedTimestamp = 0L;
+ private volatile long lastCompactionSucceedTimestamp = 0L;
@Getter
- private long lastCompactionFailedTimestamp = 0L;
+ private volatile long lastCompactionFailedTimestamp = 0L;
@Getter
- private long lastCompactionDurationTimeInMills = 0L;
+ private volatile long lastCompactionDurationTimeInMills = 0L;
private LongAdder lastCompactionRemovedEventCountOp = new LongAdder();
- private long lastCompactionStartTimeOp;
+ private volatile long lastCompactionStartTimeOp;
private final LongAdder compactionRemovedEventCount = new LongAdder();
private final LongAdder compactionSucceedCount = new LongAdder();
diff --git
a/pulsar-client-auth-athenz/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationAthenz.java
b/pulsar-client-auth-athenz/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationAthenz.java
index a10bee1711c..8c719b861b0 100644
---
a/pulsar-client-auth-athenz/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationAthenz.java
+++
b/pulsar-client-auth-athenz/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationAthenz.java
@@ -65,7 +65,7 @@ public class AuthenticationAthenz implements Authentication,
EncodedAuthenticati
// ZTSClient.cancelPrefetch() is called.
// cf. https://github.com/AthenZ/athenz/issues/544
private boolean autoPrefetchEnabled = false;
- private long cachedRoleTokenTimestamp;
+ private volatile long cachedRoleTokenTimestamp;
private String roleToken;
// athenz will only give this token if it's at least valid for 2hrs
private static final int minValidity = 2 * 60 * 60;
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AutoClusterFailover.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AutoClusterFailover.java
index 1f34de71979..7138c79bd9e 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AutoClusterFailover.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AutoClusterFailover.java
@@ -58,8 +58,8 @@ public class AutoClusterFailover implements
ServiceUrlProvider {
private final long failoverDelayNs;
private final long switchBackDelayNs;
private final ScheduledExecutorService executor;
- private long recoverTimestamp;
- private long failedTimestamp;
+ private volatile long recoverTimestamp;
+ private volatile long failedTimestamp;
private final long intervalMs;
private static final int TIMEOUT = 30_000;
private final PulsarServiceNameResolver resolver;