Copilot commented on code in PR #25244:
URL: https://github.com/apache/pulsar/pull/25244#discussion_r2806936295


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java:
##########
@@ -958,6 +961,9 @@ private CompletableFuture<Void> 
resetCursorInternal(Position finalPosition, Comp
                 cursor.asyncResetCursor(finalPosition, forceResetValue, new 
AsyncCallbacks.ResetCursorCallback() {
                     @Override
                     public void resetComplete(Object ctx) {
+                        if (dispatcher != null && 
!dispatcher.getConsumers().isEmpty()) {
+                            cursor.updateLastActive(true);

Review Comment:
   The call to cursor.updateLastActive(true) on line 965 returns a 
CompletableFuture&lt;Void&gt; that is being ignored. If the future completes 
exceptionally, the exception will be silently swallowed. Consider handling or 
logging errors from this async operation.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java:
##########
@@ -913,6 +915,7 @@ private CompletableFuture<Void> 
resetCursorInternal(Position finalPosition, Comp
         // Lock the Subscription object before locking the Dispatcher object 
to avoid deadlocks
         synchronized (this) {
             if (dispatcher != null && dispatcher.isConsumerConnected()) {
+                cursor.updateLastActive(true);

Review Comment:
   The call to cursor.updateLastActive(true) on line 918 returns a 
CompletableFuture&lt;Void&gt; that is being ignored. If the future completes 
exceptionally, the exception will be silently swallowed. Consider handling or 
logging errors from this async operation.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java:
##########
@@ -241,9 +241,10 @@ public CompletableFuture<Void> addConsumer(Consumer 
consumer) {
     }
 
     private CompletableFuture<Void> addConsumerInternal(Consumer consumer) {
-        return pendingAckHandle.pendingAckHandleFuture().thenCompose(future -> 
{
+        return pendingAckHandle.pendingAckHandleFuture()
+            .thenCompose(__ -> cursor.updateLastActive(true))

Review Comment:
   The async call to cursor.updateLastActive(true) on line 245 can fail (as 
seen in the implementation where internalAsyncMarkDelete might fail). However, 
the error from this operation is not explicitly handled, and it would propagate 
to the caller of addConsumer. Consider whether this is the intended behavior or 
if the error should be logged and handled differently, especially since 
updating lastActive is metadata-related and might not warrant failing the 
entire consumer addition.



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -862,6 +870,8 @@ private State changeStateIfNotClosed(State newState) {
     void initialize(Position position, Map<String, Long> properties, 
Map<String, String> cursorProperties,
                     final VoidCallback callback) {
         recoveredCursor(position, properties, cursorProperties, null);
+        // When a cursor is creating, no consumers has connected, so we mark 
it as inactive state.

Review Comment:
   Typo: "has" should be "have" to match the subject "consumers". The sentence 
should read "When a cursor is creating, no consumers have connected..."
   ```suggestion
           // When a cursor is creating, no consumers have connected, so we 
mark it as inactive state.
   ```



##########
managed-ledger/src/main/proto/MLDataFormats.proto:
##########
@@ -84,6 +84,8 @@ message PositionInfo {
     // Store which index in the batch message has been deleted
     repeated BatchedEntryDeletionIndexInfo batchedEntryDeletionIndexInfo = 5;
     repeated LongListMap individualDeletedMessageRanges = 6;
+    // Last active time of the cursor, it will changes when consumer registers 
or unregisters.

Review Comment:
   The comment says "it will changes when" but should be "it will change when" 
(change, not changes - singular verb form after 'will').
   ```suggestion
       // Last active time of the cursor, it will change when consumer 
registers or unregisters.
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java:
##########
@@ -363,6 +364,7 @@ public synchronized void removeConsumer(Consumer consumer, 
boolean isResetCursor
         msgOutFromRemovedConsumer.add(stats.msgOutCounter);
 
         if (dispatcher != null && dispatcher.getConsumers().isEmpty()) {
+            cursor.updateLastActive(false);

Review Comment:
   The call to cursor.updateLastActive(false) on line 367 returns a 
CompletableFuture&lt;Void&gt; that is being ignored. If the future completes 
exceptionally, the exception will be silently swallowed. Consider handling or 
logging errors from this async operation.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java:
##########
@@ -433,7 +435,7 @@ public void consumerFlow(Consumer consumer, int 
additionalNumberOfMessages) {
 
     @Override
     public void acknowledgeMessage(List<Position> positions, AckType ackType, 
Map<String, Long> properties) {
-        cursor.updateLastActive();
+        cursor.updateLastActive(true);

Review Comment:
   The call to cursor.updateLastActive(true) on line 438 returns a 
CompletableFuture&lt;Void&gt; that is being ignored. If the future completes 
exceptionally, the exception will be silently swallowed. Consider handling or 
logging errors from this async operation.



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -2786,14 +2795,80 @@ public String getName() {
 
     @Override
     public long getLastActive() {
+        return Math.abs(lastActive);
+    }
+
+    /**
+     * Get the last active time or inactive time of the cursor.
+     *
+     * @return If the value is negative, it indicates the last inactive time 
and has not been active since then.
+     *     If the value is positive, it indicates the last active time (the 
last time delete was called, markDelete).
+     */
+    @Override
+    public long getLastActiveOrInActive() {
         return lastActive;
     }
 
+    @Deprecated
     @Override
     public void updateLastActive() {
         lastActive = System.currentTimeMillis();
     }
 
+    /**
+     * Update the last active time of the cursor.
+     * @param active there are two cases that the active state changes, is 
active or not.
+     *
+     */
+    @Override
+    public CompletableFuture<Void> updateLastActive(boolean active) {
+        long previousValue = lastActive;
+        if (active) {
+            lastActive = System.currentTimeMillis();
+            markLastActiveAsUnPersisted();
+            CompletableFuture<Void> res = new CompletableFuture<>();
+            if (previousValue < 0 && isLastActivePersisted(previousValue)) {
+                internalAsyncMarkDelete(markDeletePosition, getProperties(), 
new MarkDeleteCallback() {
+                    @Override
+                    public void markDeleteComplete(Object ctx) {
+                        res.complete(null);
+                    }
+
+                    @Override
+                    public void markDeleteFailed(ManagedLedgerException 
exception, Object ctx) {
+                        res.completeExceptionally(exception);
+                    }
+                }, null, null);
+                return res;
+            } else {
+                return CompletableFuture.completedFuture(null);
+            }
+        }
+
+        // Since it still is not active, we should not change the timestamp.
+        if (previousValue < 0) {
+            return CompletableFuture.completedFuture(null);
+        }
+        // "lastActive" will be persisted when cursor closes.
+        lastActive = -System.currentTimeMillis();
+        markLastActiveAsUnPersisted();
+        return CompletableFuture.completedFuture(null);
+    }
+
+    private void markLastActiveAsUnPersisted() {
+        // -2 is 0xfffffffffffffffe.
+        lastActive = (lastActive & -2);
+    }
+
+    private void markLastActiveAsPersisted() {
+        // 1 is 0x1.
+        lastActive = (lastActive | 1);
+    }

Review Comment:
   The markLastActiveAsPersisted method performs a read-modify-write operation 
on the volatile field 'lastActive' without synchronization. This can lead to 
race conditions where concurrent calls could overwrite each other's updates or 
interfere with concurrent updateLastActive calls. Consider using 
AtomicLongFieldUpdater with compareAndSet operations to ensure atomic updates.



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java:
##########
@@ -74,10 +74,32 @@ enum IndividualDeletedEntries {
 
     /**
      * Update the last active time of the cursor.
-     *
+     * @Deprecated call {@link #updateLastActive(boolean)} instead/
      */
+    @Deprecated
     void updateLastActive();
 
+    /**
+     * Update the last active time of the cursor.
+     * @param active when update latest active time, the subscription is not 
active(in other words, all consumers are
+     *              unregistered), please set the value to "false".

Review Comment:
   The comment has a grammatical issue. The parenthetical phrase "in other 
words, all consumers are unregistered" clarifies when to set active to false, 
but the structure is confusing. Consider rephrasing to: "when updating the 
latest active time, if the subscription is not active (in other words, all 
consumers are unregistered), please set the value to 'false'."
   ```suggestion
        * @param active when updating the latest active time, if the 
subscription is not active (in other words,
        *               all consumers are unregistered), please set the value 
to "false".
   ```



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -2786,14 +2795,80 @@ public String getName() {
 
     @Override
     public long getLastActive() {
+        return Math.abs(lastActive);
+    }
+
+    /**
+     * Get the last active time or inactive time of the cursor.
+     *
+     * @return If the value is negative, it indicates the last inactive time 
and has not been active since then.
+     *     If the value is positive, it indicates the last active time (the 
last time delete was called, markDelete).
+     */
+    @Override
+    public long getLastActiveOrInActive() {
         return lastActive;
     }
 
+    @Deprecated
     @Override
     public void updateLastActive() {
         lastActive = System.currentTimeMillis();
     }
 
+    /**
+     * Update the last active time of the cursor.
+     * @param active there are two cases that the active state changes, is 
active or not.
+     *
+     */
+    @Override
+    public CompletableFuture<Void> updateLastActive(boolean active) {
+        long previousValue = lastActive;
+        if (active) {
+            lastActive = System.currentTimeMillis();
+            markLastActiveAsUnPersisted();
+            CompletableFuture<Void> res = new CompletableFuture<>();
+            if (previousValue < 0 && isLastActivePersisted(previousValue)) {
+                internalAsyncMarkDelete(markDeletePosition, getProperties(), 
new MarkDeleteCallback() {
+                    @Override
+                    public void markDeleteComplete(Object ctx) {
+                        res.complete(null);
+                    }
+
+                    @Override
+                    public void markDeleteFailed(ManagedLedgerException 
exception, Object ctx) {
+                        res.completeExceptionally(exception);
+                    }
+                }, null, null);
+                return res;
+            } else {
+                return CompletableFuture.completedFuture(null);
+            }
+        }
+
+        // Since it still is not active, we should not change the timestamp.
+        if (previousValue < 0) {
+            return CompletableFuture.completedFuture(null);
+        }
+        // "lastActive" will be persisted when cursor closes.
+        lastActive = -System.currentTimeMillis();
+        markLastActiveAsUnPersisted();
+        return CompletableFuture.completedFuture(null);
+    }
+
+    private void markLastActiveAsUnPersisted() {
+        // -2 is 0xfffffffffffffffe.
+        lastActive = (lastActive & -2);
+    }
+
+    private void markLastActiveAsPersisted() {
+        // 1 is 0x1.
+        lastActive = (lastActive | 1);
+    }
+
+    public static boolean isLastActivePersisted(long timestamp) {
+        return (Math.abs(timestamp) % 2) == 1;
+    }

Review Comment:
   The bit manipulation for tracking persistence uses the least significant bit 
(LSB) of the timestamp. However, System.currentTimeMillis() returns 
milliseconds since epoch, which is always even (LSB = 0) when freshly 
generated. After markLastActiveAsUnPersisted() clears the LSB (line 2860), it 
remains 0. Then markLastActiveAsPersisted() sets it to 1 (line 2865). This 
means isLastActivePersisted() checks if the value is odd. This logic works, but 
the timestamp values will be off by 1ms when marked as persisted. While this 
may be acceptable for this use case, consider documenting this intentional 1ms 
deviation or using a separate field to track persistence state to avoid any 
confusion or edge cases.



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -2786,14 +2795,80 @@ public String getName() {
 
     @Override
     public long getLastActive() {
+        return Math.abs(lastActive);
+    }
+
+    /**
+     * Get the last active time or inactive time of the cursor.
+     *
+     * @return If the value is negative, it indicates the last inactive time 
and has not been active since then.
+     *     If the value is positive, it indicates the last active time (the 
last time delete was called, markDelete).
+     */
+    @Override
+    public long getLastActiveOrInActive() {
         return lastActive;
     }
 
+    @Deprecated
     @Override
     public void updateLastActive() {
         lastActive = System.currentTimeMillis();
     }
 
+    /**
+     * Update the last active time of the cursor.
+     * @param active there are two cases that the active state changes, is 
active or not.
+     *
+     */
+    @Override
+    public CompletableFuture<Void> updateLastActive(boolean active) {
+        long previousValue = lastActive;
+        if (active) {
+            lastActive = System.currentTimeMillis();
+            markLastActiveAsUnPersisted();
+            CompletableFuture<Void> res = new CompletableFuture<>();
+            if (previousValue < 0 && isLastActivePersisted(previousValue)) {
+                internalAsyncMarkDelete(markDeletePosition, getProperties(), 
new MarkDeleteCallback() {
+                    @Override
+                    public void markDeleteComplete(Object ctx) {
+                        res.complete(null);
+                    }
+
+                    @Override
+                    public void markDeleteFailed(ManagedLedgerException 
exception, Object ctx) {
+                        res.completeExceptionally(exception);
+                    }
+                }, null, null);
+                return res;
+            } else {
+                return CompletableFuture.completedFuture(null);
+            }
+        }
+
+        // Since it still is not active, we should not change the timestamp.
+        if (previousValue < 0) {
+            return CompletableFuture.completedFuture(null);
+        }
+        // "lastActive" will be persisted when cursor closes.
+        lastActive = -System.currentTimeMillis();
+        markLastActiveAsUnPersisted();
+        return CompletableFuture.completedFuture(null);
+    }

Review Comment:
   There's a race condition in the updateLastActive method. The method reads 
'previousValue' from the volatile field 'lastActive' (line 2825), then updates 
'lastActive' (line 2827 or 2853), and finally calls 
markLastActiveAsUnPersisted() which performs a read-modify-write operation on 
'lastActive' (line 2860: lastActive & -2). If two threads call updateLastActive 
concurrently, they could interleave in a way that causes one thread's update to 
overwrite another's, or the persistence flag manipulation could corrupt the 
timestamp. Consider using AtomicLongFieldUpdater with compareAndSet operations 
or synchronizing the entire method to ensure atomic updates.



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -550,6 +553,7 @@ void recover(final VoidCallback callback) {
         ledger.getStore().asyncGetCursorInfo(ledger.getName(), name, new 
MetaStoreCallback<ManagedCursorInfo>() {
             @Override
             public void operationComplete(ManagedCursorInfo info, Stat stat) {
+                updateLastActive(false);

Review Comment:
   During cursor recovery on line 556, updateLastActive(false) is called 
unconditionally, which sets lastActive to the negative of the current 
timestamp. This overwrites any persisted lastActive value. However, lines 
592-594 and 672-674 later restore the persisted lastActive from the 
ManagedCursorInfo or PositionInfo. This sequence creates a race condition: if 
there are no persisted lastActive values in the metadata (e.g., from old 
versions), the cursor will be marked as inactive from the recovery time, which 
is correct. But the ordering is confusing: calling updateLastActive(false) 
before checking if there's a persisted value to restore. Consider reordering 
this logic to only call updateLastActive(false) if no persisted value is found, 
to make the intent clearer.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java:
##########
@@ -352,7 +353,7 @@ protected Dispatcher reuseOrCreateDispatcher(Dispatcher 
dispatcher, Consumer con
 
     @Override
     public synchronized void removeConsumer(Consumer consumer, boolean 
isResetCursor) throws BrokerServiceException {
-        cursor.updateLastActive();
+        cursor.updateLastActive(true);

Review Comment:
   The call to cursor.updateLastActive(true) on line 356 returns a 
CompletableFuture&lt;Void&gt; that is being ignored. If the future completes 
exceptionally, the exception will be silently swallowed. Since removeConsumer 
is a synchronized method that can throw BrokerServiceException, consider 
whether the async operation should be awaited or if errors should be handled. 
Alternatively, if the operation is intentionally fire-and-forget, add a comment 
explaining why.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -3586,7 +3586,16 @@ public void checkInactiveSubscriptions(long 
expirationTimeMillis) {
                         || isCompactionSubscription(subName)) {
                     return;
                 }
-                if (System.currentTimeMillis() - sub.cursor.getLastActive() > 
expirationTimeMillis) {
+                // lastActiveOrInActive > 0: latest active time, the value 
will be update when consumers registered
+                //                           or acknowledged messages.
+                // lastActiveOrInActive < 0: latest inactive time, the value 
will be update when all consumers

Review Comment:
   The comment says "latest active time, the value will be update" but it 
should be "updated" (past participle). Same issue on line 3591 with "the value 
will be update" which should be "the value will be updated". This grammatical 
error appears in both comment lines.
   ```suggestion
                   // lastActiveOrInActive > 0: latest active time, the value 
will be updated when consumers registered
                   //                           or acknowledged messages.
                   // lastActiveOrInActive < 0: latest inactive time, the value 
will be updated when all consumers
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java:
##########
@@ -241,9 +241,10 @@ public CompletableFuture<Void> addConsumer(Consumer 
consumer) {
     }
 
     private CompletableFuture<Void> addConsumerInternal(Consumer consumer) {
-        return pendingAckHandle.pendingAckHandleFuture().thenCompose(future -> 
{
+        return pendingAckHandle.pendingAckHandleFuture()
+            .thenCompose(__ -> cursor.updateLastActive(true))
+            .thenCompose(future -> {

Review Comment:
   The lambda parameter name 'future' on line 246 is misleading. The result of 
cursor.updateLastActive(true) is a CompletableFuture&lt;Void&gt;, so the 
parameter represents a Void value, not a future. Consider renaming it to '__' 
or 'ignore' for clarity, similar to the parameter on line 245.
   ```suggestion
               .thenCompose(__ -> {
   ```



##########
managed-ledger/src/main/proto/MLDataFormats.proto:
##########
@@ -132,7 +134,7 @@ message ManagedCursorInfo {
     repeated LongProperty properties = 5;
 
     // deprecated, do not persist this field anymore

Review Comment:
   The field 'lastActive' in ManagedCursorInfo was previously marked as 
deprecated (line 137 removes the deprecation). While this change is backward 
compatible from a protobuf perspective (optional fields can be re-enabled), 
there may be confusion if different versions of the code have conflicting 
expectations. Consider adding a comment explaining that this field was 
temporarily deprecated but is now being used again, to help future maintainers 
understand the history.
   ```suggestion
       // NOTE: this field was previously marked as deprecated and some older
       // versions of the code may ignore or avoid persisting it. It is now
       // intentionally used again; keep this field for backward compatibility.
   ```



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java:
##########
@@ -74,10 +74,32 @@ enum IndividualDeletedEntries {
 
     /**
      * Update the last active time of the cursor.
-     *
+     * @Deprecated call {@link #updateLastActive(boolean)} instead/

Review Comment:
   Typo in the deprecation tag: "@Deprecated" should be "@deprecated" 
(lowercase 'd' for Javadoc tag). Also, the closing delimiter should be "*/" not 
"/".
   ```suggestion
        * @deprecated call {@link #updateLastActive(boolean)} instead.
   ```



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -2786,14 +2795,80 @@ public String getName() {
 
     @Override
     public long getLastActive() {
+        return Math.abs(lastActive);
+    }
+
+    /**
+     * Get the last active time or inactive time of the cursor.
+     *
+     * @return If the value is negative, it indicates the last inactive time 
and has not been active since then.
+     *     If the value is positive, it indicates the last active time (the 
last time delete was called, markDelete).

Review Comment:
   The documentation comment on line 2805 says "the last time delete was 
called, markDelete" which is confusing. It's unclear what "delete was called" 
means in this context. Consider clarifying this to say "the last time the 
cursor was actively used (e.g., when messages were acknowledged or mark-delete 
was called)" to make it more understandable.
   ```suggestion
        *     If the value is positive, it indicates the last time the cursor 
was actively used (for example, when
        *     messages were acknowledged or {@code markDelete} was called).
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to