codelipenghui commented on code in PR #15137:
URL: https://github.com/apache/pulsar/pull/15137#discussion_r860431130
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java:
##########
@@ -292,6 +250,69 @@ public void addFailed(ManagedLedgerException exception,
Object ctx) {
return completableFuture;
}
+ private void handleMetadataEntry(PositionImpl logPosition,
PendingAckMetadataEntry pendingAckMetadataEntry) {
+ // store the persistent position in to memory
+ // store the max position of this entry retain
+ if (pendingAckMetadataEntry.getPendingAckOp() != PendingAckOp.ABORT
+ && pendingAckMetadataEntry.getPendingAckOp() !=
PendingAckOp.COMMIT) {
+ Optional<PendingAckMetadata> optional =
pendingAckMetadataEntry.getPendingAckMetadatasList()
+ .stream().max((o1, o2) ->
ComparisonChain.start().compare(o1.getLedgerId(),
+ o2.getLedgerId()).compare(o1.getEntryId(),
o2.getEntryId()).result());
+
+ optional.ifPresent(pendingAckMetadata -> {
+ PositionImpl nowPosition =
PositionImpl.get(pendingAckMetadata.getLedgerId(),
+ pendingAckMetadata.getEntryId());
+
+ if (nowPosition.compareTo(maxAckPosition) > 0) {
+ maxAckPosition = nowPosition;
+ }
+ if (currentIndexLag.get() >= maxIndexLag) {
+ pendingAckLogIndex.compute(maxAckPosition,
+ (thisPosition, otherPosition) -> logPosition);
+ maxIndexLag =
logIndexBackoff.next(pendingAckLogIndex.size());
+ currentIndexLag.set(0);
+ }
+ });
+ }
+ }
+
+ private void clearUselessLogData() {
+ if (!pendingAckLogIndex.isEmpty()) {
+ PositionImpl deletePosition = null;
+ while (!pendingAckLogIndex.isEmpty()
+ && pendingAckLogIndex.firstKey() != null
+ && subManagedCursor.getPersistentMarkDeletedPosition() !=
null
+ && pendingAckLogIndex.firstEntry().getKey()
+ .compareTo((PositionImpl)
subManagedCursor.getPersistentMarkDeletedPosition()) <= 0) {
+ deletePosition = pendingAckLogIndex.firstEntry().getValue();
+ pendingAckLogIndex.remove(pendingAckLogIndex.firstKey());
+ }
+
+ if (deletePosition != null) {
+ maxIndexLag = logIndexBackoff.next(pendingAckLogIndex.size());
Review Comment:
Do we need to update the `maxIndexLag` here? We can just rely on
`handleMetadataEntry()` to update the `maxIndexLag`?
Making only one place to update the `maxIndexLag` will keep the
implementation simple.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java:
##########
@@ -292,6 +250,69 @@ public void addFailed(ManagedLedgerException exception,
Object ctx) {
return completableFuture;
}
+ private void handleMetadataEntry(PositionImpl logPosition,
PendingAckMetadataEntry pendingAckMetadataEntry) {
+ // store the persistent position in to memory
+ // store the max position of this entry retain
+ if (pendingAckMetadataEntry.getPendingAckOp() != PendingAckOp.ABORT
+ && pendingAckMetadataEntry.getPendingAckOp() !=
PendingAckOp.COMMIT) {
+ Optional<PendingAckMetadata> optional =
pendingAckMetadataEntry.getPendingAckMetadatasList()
+ .stream().max((o1, o2) ->
ComparisonChain.start().compare(o1.getLedgerId(),
+ o2.getLedgerId()).compare(o1.getEntryId(),
o2.getEntryId()).result());
+
+ optional.ifPresent(pendingAckMetadata -> {
+ PositionImpl nowPosition =
PositionImpl.get(pendingAckMetadata.getLedgerId(),
+ pendingAckMetadata.getEntryId());
+
+ if (nowPosition.compareTo(maxAckPosition) > 0) {
+ maxAckPosition = nowPosition;
+ }
+ if (currentIndexLag.get() >= maxIndexLag) {
+ pendingAckLogIndex.compute(maxAckPosition,
+ (thisPosition, otherPosition) -> logPosition);
+ maxIndexLag =
logIndexBackoff.next(pendingAckLogIndex.size());
+ currentIndexLag.set(0);
+ }
Review Comment:
From the current implementation, do we have multiple threads access the
`handleMetadataEntry()` at the same time? We only allow the new incoming
requests after the pending ack change to `Ready` state. For adding entries to
the managed ledger, the callback thread is always the same thread. So looks
like we don't need to consider thread-safe problems. @congbobo184 Please help
correct if I missed something.
If there are indeed multiple threads accessing this method, the
implementation is incorrect. The `maxAckPosition` will changed by another
thread before put into the `pendingAckLogIndex`
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java:
##########
@@ -292,6 +250,69 @@ public void addFailed(ManagedLedgerException exception,
Object ctx) {
return completableFuture;
}
+ private void handleMetadataEntry(PositionImpl logPosition,
PendingAckMetadataEntry pendingAckMetadataEntry) {
+ // store the persistent position in to memory
+ // store the max position of this entry retain
+ if (pendingAckMetadataEntry.getPendingAckOp() != PendingAckOp.ABORT
+ && pendingAckMetadataEntry.getPendingAckOp() !=
PendingAckOp.COMMIT) {
+ Optional<PendingAckMetadata> optional =
pendingAckMetadataEntry.getPendingAckMetadatasList()
+ .stream().max((o1, o2) ->
ComparisonChain.start().compare(o1.getLedgerId(),
+ o2.getLedgerId()).compare(o1.getEntryId(),
o2.getEntryId()).result());
+
+ optional.ifPresent(pendingAckMetadata -> {
+ PositionImpl nowPosition =
PositionImpl.get(pendingAckMetadata.getLedgerId(),
+ pendingAckMetadata.getEntryId());
+
+ if (nowPosition.compareTo(maxAckPosition) > 0) {
+ maxAckPosition = nowPosition;
+ }
+ if (currentIndexLag.get() >= maxIndexLag) {
+ pendingAckLogIndex.compute(maxAckPosition,
+ (thisPosition, otherPosition) -> logPosition);
+ maxIndexLag =
logIndexBackoff.next(pendingAckLogIndex.size());
+ currentIndexLag.set(0);
+ }
+ });
+ }
+ }
+
+ private void clearUselessLogData() {
+ if (!pendingAckLogIndex.isEmpty()) {
+ PositionImpl deletePosition = null;
+ while (!pendingAckLogIndex.isEmpty()
+ && pendingAckLogIndex.firstKey() != null
+ && subManagedCursor.getPersistentMarkDeletedPosition() !=
null
+ && pendingAckLogIndex.firstEntry().getKey()
+ .compareTo((PositionImpl)
subManagedCursor.getPersistentMarkDeletedPosition()) <= 0) {
+ deletePosition = pendingAckLogIndex.firstEntry().getValue();
+ pendingAckLogIndex.remove(pendingAckLogIndex.firstKey());
Review Comment:
```suggestion
deletePosition =
pendingAckLogIndex.remove(pendingAckLogIndex.firstKey());
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]