Copilot commented on code in PR #25244:
URL: https://github.com/apache/pulsar/pull/25244#discussion_r2806936295
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java:
##########
@@ -958,6 +961,9 @@ private CompletableFuture<Void>
resetCursorInternal(Position finalPosition, Comp
cursor.asyncResetCursor(finalPosition, forceResetValue, new
AsyncCallbacks.ResetCursorCallback() {
@Override
public void resetComplete(Object ctx) {
+ if (dispatcher != null &&
!dispatcher.getConsumers().isEmpty()) {
+ cursor.updateLastActive(true);
Review Comment:
The call to cursor.updateLastActive(true) on line 965 returns a
CompletableFuture<Void> that is being ignored. If the future completes
exceptionally, the exception will be silently swallowed. Consider handling or
logging errors from this async operation.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java:
##########
@@ -913,6 +915,7 @@ private CompletableFuture<Void>
resetCursorInternal(Position finalPosition, Comp
// Lock the Subscription object before locking the Dispatcher object
to avoid deadlocks
synchronized (this) {
if (dispatcher != null && dispatcher.isConsumerConnected()) {
+ cursor.updateLastActive(true);
Review Comment:
The call to cursor.updateLastActive(true) on line 918 returns a
CompletableFuture<Void> that is being ignored. If the future completes
exceptionally, the exception will be silently swallowed. Consider handling or
logging errors from this async operation.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java:
##########
@@ -241,9 +241,10 @@ public CompletableFuture<Void> addConsumer(Consumer
consumer) {
}
private CompletableFuture<Void> addConsumerInternal(Consumer consumer) {
- return pendingAckHandle.pendingAckHandleFuture().thenCompose(future ->
{
+ return pendingAckHandle.pendingAckHandleFuture()
+ .thenCompose(__ -> cursor.updateLastActive(true))
Review Comment:
The async call to cursor.updateLastActive(true) on line 245 can fail (as
seen in the implementation where internalAsyncMarkDelete might fail). However,
the error from this operation is not explicitly handled, and it would propagate
to the caller of addConsumer. Consider whether this is the intended behavior or
if the error should be logged and handled differently, especially since
updating lastActive is metadata-related and might not warrant failing the
entire consumer addition.
##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -862,6 +870,8 @@ private State changeStateIfNotClosed(State newState) {
void initialize(Position position, Map<String, Long> properties,
Map<String, String> cursorProperties,
final VoidCallback callback) {
recoveredCursor(position, properties, cursorProperties, null);
+ // When a cursor is creating, no consumers has connected, so we mark
it as inactive state.
Review Comment:
Typo: "has" should be "have" to match the subject "consumers". The sentence
should read "When a cursor is creating, no consumers have connected..."
```suggestion
// When a cursor is creating, no consumers have connected, so we
mark it as inactive state.
```
##########
managed-ledger/src/main/proto/MLDataFormats.proto:
##########
@@ -84,6 +84,8 @@ message PositionInfo {
// Store which index in the batch message has been deleted
repeated BatchedEntryDeletionIndexInfo batchedEntryDeletionIndexInfo = 5;
repeated LongListMap individualDeletedMessageRanges = 6;
+ // Last active time of the cursor, it will changes when consumer registers
or unregisters.
Review Comment:
The comment says "it will changes when" but should be "it will change when"
(change, not changes - singular verb form after 'will').
```suggestion
// Last active time of the cursor, it will change when consumer
registers or unregisters.
```
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java:
##########
@@ -363,6 +364,7 @@ public synchronized void removeConsumer(Consumer consumer,
boolean isResetCursor
msgOutFromRemovedConsumer.add(stats.msgOutCounter);
if (dispatcher != null && dispatcher.getConsumers().isEmpty()) {
+ cursor.updateLastActive(false);
Review Comment:
The call to cursor.updateLastActive(false) on line 367 returns a
CompletableFuture<Void> that is being ignored. If the future completes
exceptionally, the exception will be silently swallowed. Consider handling or
logging errors from this async operation.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java:
##########
@@ -433,7 +435,7 @@ public void consumerFlow(Consumer consumer, int
additionalNumberOfMessages) {
@Override
public void acknowledgeMessage(List<Position> positions, AckType ackType,
Map<String, Long> properties) {
- cursor.updateLastActive();
+ cursor.updateLastActive(true);
Review Comment:
The call to cursor.updateLastActive(true) on line 438 returns a
CompletableFuture<Void> that is being ignored. If the future completes
exceptionally, the exception will be silently swallowed. Consider handling or
logging errors from this async operation.
##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -2786,14 +2795,80 @@ public String getName() {
@Override
public long getLastActive() {
+ return Math.abs(lastActive);
+ }
+
+ /**
+ * Get the last active time or inactive time of the cursor.
+ *
+ * @return If the value is negative, it indicates the last inactive time
and has not been active since then.
+ * If the value is positive, it indicates the last active time (the
last time delete was called, markDelete).
+ */
+ @Override
+ public long getLastActiveOrInActive() {
return lastActive;
}
+ @Deprecated
@Override
public void updateLastActive() {
lastActive = System.currentTimeMillis();
}
+ /**
+ * Update the last active time of the cursor.
+ * @param active there are two cases that the active state changes, is
active or not.
+ *
+ */
+ @Override
+ public CompletableFuture<Void> updateLastActive(boolean active) {
+ long previousValue = lastActive;
+ if (active) {
+ lastActive = System.currentTimeMillis();
+ markLastActiveAsUnPersisted();
+ CompletableFuture<Void> res = new CompletableFuture<>();
+ if (previousValue < 0 && isLastActivePersisted(previousValue)) {
+ internalAsyncMarkDelete(markDeletePosition, getProperties(),
new MarkDeleteCallback() {
+ @Override
+ public void markDeleteComplete(Object ctx) {
+ res.complete(null);
+ }
+
+ @Override
+ public void markDeleteFailed(ManagedLedgerException
exception, Object ctx) {
+ res.completeExceptionally(exception);
+ }
+ }, null, null);
+ return res;
+ } else {
+ return CompletableFuture.completedFuture(null);
+ }
+ }
+
+ // Since it still is not active, we should not change the timestamp.
+ if (previousValue < 0) {
+ return CompletableFuture.completedFuture(null);
+ }
+ // "lastActive" will be persisted when cursor closes.
+ lastActive = -System.currentTimeMillis();
+ markLastActiveAsUnPersisted();
+ return CompletableFuture.completedFuture(null);
+ }
+
+ private void markLastActiveAsUnPersisted() {
+ // -2 is 0xfffffffffffffffe.
+ lastActive = (lastActive & -2);
+ }
+
+ private void markLastActiveAsPersisted() {
+ // 1 is 0x1.
+ lastActive = (lastActive | 1);
+ }
Review Comment:
The markLastActiveAsPersisted method performs a read-modify-write operation
on the volatile field 'lastActive' without synchronization. This can lead to
race conditions where concurrent calls could overwrite each other's updates or
interfere with concurrent updateLastActive calls. Consider using
AtomicLongFieldUpdater with compareAndSet operations to ensure atomic updates.
##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java:
##########
@@ -74,10 +74,32 @@ enum IndividualDeletedEntries {
/**
* Update the last active time of the cursor.
- *
+ * @Deprecated call {@link #updateLastActive(boolean)} instead/
*/
+ @Deprecated
void updateLastActive();
+ /**
+ * Update the last active time of the cursor.
+ * @param active when update latest active time, the subscription is not
active(in other words, all consumers are
+ * unregistered), please set the value to "false".
Review Comment:
The comment has a grammatical issue. The parenthetical phrase "in other
words, all consumers are unregistered" clarifies when to set active to false,
but the structure is confusing. Consider rephrasing to: "when updating the
latest active time, if the subscription is not active (in other words, all
consumers are unregistered), please set the value to 'false'."
```suggestion
* @param active when updating the latest active time, if the
subscription is not active (in other words,
* all consumers are unregistered), please set the value
to "false".
```
##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -2786,14 +2795,80 @@ public String getName() {
@Override
public long getLastActive() {
+ return Math.abs(lastActive);
+ }
+
+ /**
+ * Get the last active time or inactive time of the cursor.
+ *
+ * @return If the value is negative, it indicates the last inactive time
and has not been active since then.
+ * If the value is positive, it indicates the last active time (the
last time delete was called, markDelete).
+ */
+ @Override
+ public long getLastActiveOrInActive() {
return lastActive;
}
+ @Deprecated
@Override
public void updateLastActive() {
lastActive = System.currentTimeMillis();
}
+ /**
+ * Update the last active time of the cursor.
+ * @param active there are two cases that the active state changes, is
active or not.
+ *
+ */
+ @Override
+ public CompletableFuture<Void> updateLastActive(boolean active) {
+ long previousValue = lastActive;
+ if (active) {
+ lastActive = System.currentTimeMillis();
+ markLastActiveAsUnPersisted();
+ CompletableFuture<Void> res = new CompletableFuture<>();
+ if (previousValue < 0 && isLastActivePersisted(previousValue)) {
+ internalAsyncMarkDelete(markDeletePosition, getProperties(),
new MarkDeleteCallback() {
+ @Override
+ public void markDeleteComplete(Object ctx) {
+ res.complete(null);
+ }
+
+ @Override
+ public void markDeleteFailed(ManagedLedgerException
exception, Object ctx) {
+ res.completeExceptionally(exception);
+ }
+ }, null, null);
+ return res;
+ } else {
+ return CompletableFuture.completedFuture(null);
+ }
+ }
+
+ // Since it still is not active, we should not change the timestamp.
+ if (previousValue < 0) {
+ return CompletableFuture.completedFuture(null);
+ }
+ // "lastActive" will be persisted when cursor closes.
+ lastActive = -System.currentTimeMillis();
+ markLastActiveAsUnPersisted();
+ return CompletableFuture.completedFuture(null);
+ }
+
+ private void markLastActiveAsUnPersisted() {
+ // -2 is 0xfffffffffffffffe.
+ lastActive = (lastActive & -2);
+ }
+
+ private void markLastActiveAsPersisted() {
+ // 1 is 0x1.
+ lastActive = (lastActive | 1);
+ }
+
+ public static boolean isLastActivePersisted(long timestamp) {
+ return (Math.abs(timestamp) % 2) == 1;
+ }
Review Comment:
The bit manipulation for tracking persistence uses the least significant bit
(LSB) of the timestamp. However, System.currentTimeMillis() returns
milliseconds since epoch, which is always even (LSB = 0) when freshly
generated. After markLastActiveAsUnPersisted() clears the LSB (line 2860), it
remains 0. Then markLastActiveAsPersisted() sets it to 1 (line 2865). This
means isLastActivePersisted() checks if the value is odd. This logic works, but
the timestamp values will be off by 1ms when marked as persisted. While this
may be acceptable for this use case, consider documenting this intentional 1ms
deviation or using a separate field to track persistence state to avoid any
confusion or edge cases.
##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -2786,14 +2795,80 @@ public String getName() {
@Override
public long getLastActive() {
+ return Math.abs(lastActive);
+ }
+
+ /**
+ * Get the last active time or inactive time of the cursor.
+ *
+ * @return If the value is negative, it indicates the last inactive time
and has not been active since then.
+ * If the value is positive, it indicates the last active time (the
last time delete was called, markDelete).
+ */
+ @Override
+ public long getLastActiveOrInActive() {
return lastActive;
}
+ @Deprecated
@Override
public void updateLastActive() {
lastActive = System.currentTimeMillis();
}
+ /**
+ * Update the last active time of the cursor.
+ * @param active there are two cases that the active state changes, is
active or not.
+ *
+ */
+ @Override
+ public CompletableFuture<Void> updateLastActive(boolean active) {
+ long previousValue = lastActive;
+ if (active) {
+ lastActive = System.currentTimeMillis();
+ markLastActiveAsUnPersisted();
+ CompletableFuture<Void> res = new CompletableFuture<>();
+ if (previousValue < 0 && isLastActivePersisted(previousValue)) {
+ internalAsyncMarkDelete(markDeletePosition, getProperties(),
new MarkDeleteCallback() {
+ @Override
+ public void markDeleteComplete(Object ctx) {
+ res.complete(null);
+ }
+
+ @Override
+ public void markDeleteFailed(ManagedLedgerException
exception, Object ctx) {
+ res.completeExceptionally(exception);
+ }
+ }, null, null);
+ return res;
+ } else {
+ return CompletableFuture.completedFuture(null);
+ }
+ }
+
+ // Since it still is not active, we should not change the timestamp.
+ if (previousValue < 0) {
+ return CompletableFuture.completedFuture(null);
+ }
+ // "lastActive" will be persisted when cursor closes.
+ lastActive = -System.currentTimeMillis();
+ markLastActiveAsUnPersisted();
+ return CompletableFuture.completedFuture(null);
+ }
Review Comment:
There's a race condition in the updateLastActive method. The method reads
'previousValue' from the volatile field 'lastActive' (line 2825), then updates
'lastActive' (line 2827 or 2853), and finally calls
markLastActiveAsUnPersisted() which performs a read-modify-write operation on
'lastActive' (line 2860: lastActive & -2). If two threads call updateLastActive
concurrently, they could interleave in a way that causes one thread's update to
overwrite another's, or the persistence flag manipulation could corrupt the
timestamp. Consider using AtomicLongFieldUpdater with compareAndSet operations
or synchronizing the entire method to ensure atomic updates.
##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -550,6 +553,7 @@ void recover(final VoidCallback callback) {
ledger.getStore().asyncGetCursorInfo(ledger.getName(), name, new
MetaStoreCallback<ManagedCursorInfo>() {
@Override
public void operationComplete(ManagedCursorInfo info, Stat stat) {
+ updateLastActive(false);
Review Comment:
During cursor recovery on line 556, updateLastActive(false) is called
unconditionally, which sets lastActive to the negative of the current
timestamp. This overwrites any persisted lastActive value. However, lines
592-594 and 672-674 later restore the persisted lastActive from the
ManagedCursorInfo or PositionInfo. This sequence creates a race condition: if
there are no persisted lastActive values in the metadata (e.g., from old
versions), the cursor will be marked as inactive from the recovery time, which
is correct. But the ordering is confusing: calling updateLastActive(false)
before checking if there's a persisted value to restore. Consider reordering
this logic to only call updateLastActive(false) if no persisted value is found,
to make the intent clearer.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java:
##########
@@ -352,7 +353,7 @@ protected Dispatcher reuseOrCreateDispatcher(Dispatcher
dispatcher, Consumer con
@Override
public synchronized void removeConsumer(Consumer consumer, boolean
isResetCursor) throws BrokerServiceException {
- cursor.updateLastActive();
+ cursor.updateLastActive(true);
Review Comment:
The call to cursor.updateLastActive(true) on line 356 returns a
CompletableFuture<Void> that is being ignored. If the future completes
exceptionally, the exception will be silently swallowed. Since removeConsumer
is a synchronized method that can throw BrokerServiceException, consider
whether the async operation should be awaited or if errors should be handled.
Alternatively, if the operation is intentionally fire-and-forget, add a comment
explaining why.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -3586,7 +3586,16 @@ public void checkInactiveSubscriptions(long
expirationTimeMillis) {
|| isCompactionSubscription(subName)) {
return;
}
- if (System.currentTimeMillis() - sub.cursor.getLastActive() >
expirationTimeMillis) {
+ // lastActiveOrInActive > 0: latest active time, the value
will be update when consumers registered
+ // or acknowledged messages.
+ // lastActiveOrInActive < 0: latest inactive time, the value
will be update when all consumers
Review Comment:
The comment says "latest active time, the value will be update" but it
should be "updated" (past participle). Same issue on line 3591 with "the value
will be update" which should be "the value will be updated". This grammatical
error appears in both comment lines.
```suggestion
// lastActiveOrInActive > 0: latest active time, the value
will be updated when consumers registered
// or acknowledged messages.
// lastActiveOrInActive < 0: latest inactive time, the value
will be updated when all consumers
```
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java:
##########
@@ -241,9 +241,10 @@ public CompletableFuture<Void> addConsumer(Consumer
consumer) {
}
private CompletableFuture<Void> addConsumerInternal(Consumer consumer) {
- return pendingAckHandle.pendingAckHandleFuture().thenCompose(future ->
{
+ return pendingAckHandle.pendingAckHandleFuture()
+ .thenCompose(__ -> cursor.updateLastActive(true))
+ .thenCompose(future -> {
Review Comment:
The lambda parameter name 'future' on line 246 is misleading. The result of
cursor.updateLastActive(true) is a CompletableFuture<Void>, so the
parameter represents a Void value, not a future. Consider renaming it to '__'
or 'ignore' for clarity, similar to the parameter on line 245.
```suggestion
.thenCompose(__ -> {
```
##########
managed-ledger/src/main/proto/MLDataFormats.proto:
##########
@@ -132,7 +134,7 @@ message ManagedCursorInfo {
repeated LongProperty properties = 5;
// deprecated, do not persist this field anymore
Review Comment:
The field 'lastActive' in ManagedCursorInfo was previously marked as
deprecated (line 137 removes the deprecation). While this change is backward
compatible from a protobuf perspective (optional fields can be re-enabled),
there may be confusion if different versions of the code have conflicting
expectations. Consider adding a comment explaining that this field was
temporarily deprecated but is now being used again, to help future maintainers
understand the history.
```suggestion
// NOTE: this field was previously marked as deprecated and some older
// versions of the code may ignore or avoid persisting it. It is now
// intentionally used again; keep this field for backward compatibility.
```
##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java:
##########
@@ -74,10 +74,32 @@ enum IndividualDeletedEntries {
/**
* Update the last active time of the cursor.
- *
+ * @Deprecated call {@link #updateLastActive(boolean)} instead/
Review Comment:
Typo in the deprecation tag: "@Deprecated" should be "@deprecated"
(lowercase 'd' for Javadoc tag). Also, the closing delimiter should be "*/" not
"/".
```suggestion
* @deprecated call {@link #updateLastActive(boolean)} instead.
```
##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -2786,14 +2795,80 @@ public String getName() {
@Override
public long getLastActive() {
+ return Math.abs(lastActive);
+ }
+
+ /**
+ * Get the last active time or inactive time of the cursor.
+ *
+ * @return If the value is negative, it indicates the last inactive time
and has not been active since then.
+ * If the value is positive, it indicates the last active time (the
last time delete was called, markDelete).
Review Comment:
The documentation comment on line 2805 says "the last time delete was
called, markDelete" which is confusing. It's unclear what "delete was called"
means in this context. Consider clarifying this to say "the last time the
cursor was actively used (e.g., when messages were acknowledged or mark-delete
was called)" to make it more understandable.
```suggestion
* If the value is positive, it indicates the last time the cursor
was actively used (for example, when
* messages were acknowledged or {@code markDelete} was called).
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]