codelipenghui commented on code in PR #15137:
URL: https://github.com/apache/pulsar/pull/15137#discussion_r860431130


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java:
##########
@@ -292,6 +250,69 @@ public void addFailed(ManagedLedgerException exception, 
Object ctx) {
         return completableFuture;
     }
 
+    private void handleMetadataEntry(PositionImpl logPosition, 
PendingAckMetadataEntry pendingAckMetadataEntry) {
+        // store the persistent position in to memory
+        // store the max position of this entry retain
+        if (pendingAckMetadataEntry.getPendingAckOp() != PendingAckOp.ABORT
+                && pendingAckMetadataEntry.getPendingAckOp() != 
PendingAckOp.COMMIT) {
+            Optional<PendingAckMetadata> optional = 
pendingAckMetadataEntry.getPendingAckMetadatasList()
+                    .stream().max((o1, o2) -> 
ComparisonChain.start().compare(o1.getLedgerId(),
+                            o2.getLedgerId()).compare(o1.getEntryId(), 
o2.getEntryId()).result());
+
+            optional.ifPresent(pendingAckMetadata -> {
+                PositionImpl nowPosition = 
PositionImpl.get(pendingAckMetadata.getLedgerId(),
+                        pendingAckMetadata.getEntryId());
+
+                if (nowPosition.compareTo(maxAckPosition) > 0) {
+                    maxAckPosition = nowPosition;
+                }
+                if (currentIndexLag.get() >= maxIndexLag) {
+                    pendingAckLogIndex.compute(maxAckPosition,
+                            (thisPosition, otherPosition) -> logPosition);
+                    maxIndexLag = 
logIndexBackoff.next(pendingAckLogIndex.size());
+                    currentIndexLag.set(0);
+                }
+            });
+        }
+    }
+
+    private void clearUselessLogData() {
+        if (!pendingAckLogIndex.isEmpty()) {
+            PositionImpl deletePosition = null;
+            while (!pendingAckLogIndex.isEmpty()
+                    && pendingAckLogIndex.firstKey() != null
+                    && subManagedCursor.getPersistentMarkDeletedPosition() != 
null
+                    && pendingAckLogIndex.firstEntry().getKey()
+                    .compareTo((PositionImpl) 
subManagedCursor.getPersistentMarkDeletedPosition()) <= 0) {
+                deletePosition = pendingAckLogIndex.firstEntry().getValue();
+                pendingAckLogIndex.remove(pendingAckLogIndex.firstKey());
+            }
+
+            if (deletePosition != null) {
+                maxIndexLag = logIndexBackoff.next(pendingAckLogIndex.size());

Review Comment:
   Do we need to update the `maxIndexLag` here? We can just rely on 
`handleMetadataEntry()` to update the `maxIndexLag`?
   
   Making only one place to update the `maxIndexLag` will keep the 
implementation simple.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java:
##########
@@ -292,6 +250,69 @@ public void addFailed(ManagedLedgerException exception, 
Object ctx) {
         return completableFuture;
     }
 
+    private void handleMetadataEntry(PositionImpl logPosition, 
PendingAckMetadataEntry pendingAckMetadataEntry) {
+        // store the persistent position in to memory
+        // store the max position of this entry retain
+        if (pendingAckMetadataEntry.getPendingAckOp() != PendingAckOp.ABORT
+                && pendingAckMetadataEntry.getPendingAckOp() != 
PendingAckOp.COMMIT) {
+            Optional<PendingAckMetadata> optional = 
pendingAckMetadataEntry.getPendingAckMetadatasList()
+                    .stream().max((o1, o2) -> 
ComparisonChain.start().compare(o1.getLedgerId(),
+                            o2.getLedgerId()).compare(o1.getEntryId(), 
o2.getEntryId()).result());
+
+            optional.ifPresent(pendingAckMetadata -> {
+                PositionImpl nowPosition = 
PositionImpl.get(pendingAckMetadata.getLedgerId(),
+                        pendingAckMetadata.getEntryId());
+
+                if (nowPosition.compareTo(maxAckPosition) > 0) {
+                    maxAckPosition = nowPosition;
+                }
+                if (currentIndexLag.get() >= maxIndexLag) {
+                    pendingAckLogIndex.compute(maxAckPosition,
+                            (thisPosition, otherPosition) -> logPosition);
+                    maxIndexLag = 
logIndexBackoff.next(pendingAckLogIndex.size());
+                    currentIndexLag.set(0);
+                }

Review Comment:
   From the current implementation, do we have multiple threads access the 
`handleMetadataEntry()` at the same time? We only allow the new incoming 
requests after the pending ack change to `Ready` state. For adding entries to 
the managed ledger, the callback thread is always the same thread. So looks 
like we don't need to consider thread-safe problems. @congbobo184 Please help 
correct if I missed something.
   
   If there are indeed multiple threads accessing this method, the 
implementation is incorrect. The `maxAckPosition` will changed by another 
thread before put into the `pendingAckLogIndex`



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java:
##########
@@ -292,6 +250,69 @@ public void addFailed(ManagedLedgerException exception, 
Object ctx) {
         return completableFuture;
     }
 
+    private void handleMetadataEntry(PositionImpl logPosition, 
PendingAckMetadataEntry pendingAckMetadataEntry) {
+        // store the persistent position in to memory
+        // store the max position of this entry retain
+        if (pendingAckMetadataEntry.getPendingAckOp() != PendingAckOp.ABORT
+                && pendingAckMetadataEntry.getPendingAckOp() != 
PendingAckOp.COMMIT) {
+            Optional<PendingAckMetadata> optional = 
pendingAckMetadataEntry.getPendingAckMetadatasList()
+                    .stream().max((o1, o2) -> 
ComparisonChain.start().compare(o1.getLedgerId(),
+                            o2.getLedgerId()).compare(o1.getEntryId(), 
o2.getEntryId()).result());
+
+            optional.ifPresent(pendingAckMetadata -> {
+                PositionImpl nowPosition = 
PositionImpl.get(pendingAckMetadata.getLedgerId(),
+                        pendingAckMetadata.getEntryId());
+
+                if (nowPosition.compareTo(maxAckPosition) > 0) {
+                    maxAckPosition = nowPosition;
+                }
+                if (currentIndexLag.get() >= maxIndexLag) {
+                    pendingAckLogIndex.compute(maxAckPosition,
+                            (thisPosition, otherPosition) -> logPosition);
+                    maxIndexLag = 
logIndexBackoff.next(pendingAckLogIndex.size());
+                    currentIndexLag.set(0);
+                }
+            });
+        }
+    }
+
+    private void clearUselessLogData() {
+        if (!pendingAckLogIndex.isEmpty()) {
+            PositionImpl deletePosition = null;
+            while (!pendingAckLogIndex.isEmpty()
+                    && pendingAckLogIndex.firstKey() != null
+                    && subManagedCursor.getPersistentMarkDeletedPosition() != 
null
+                    && pendingAckLogIndex.firstEntry().getKey()
+                    .compareTo((PositionImpl) 
subManagedCursor.getPersistentMarkDeletedPosition()) <= 0) {
+                deletePosition = pendingAckLogIndex.firstEntry().getValue();
+                pendingAckLogIndex.remove(pendingAckLogIndex.firstKey());

Review Comment:
   ```suggestion
                   deletePosition = 
pendingAckLogIndex.remove(pendingAckLogIndex.firstKey());
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to