liangyepianzhou commented on code in PR #22707:
URL: https://github.com/apache/pulsar/pull/22707#discussion_r1623371207


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java:
##########
@@ -377,8 +379,11 @@ public boolean isTxnAborted(TxnID txnID, PositionImpl 
readPosition) {
 
     @Override
     public void syncMaxReadPositionForNormalPublish(PositionImpl position, 
boolean isMarkerMessage) {
-        if (!isMarkerMessage && maxReadPositionCallBack != null) {
-            maxReadPositionCallBack.maxReadPositionMovedForward(null, 
position);
+        if (!isMarkerMessage) {
+            updateLastDispatchablePosition(position);

Review Comment:
   This write approach is strange. Please follow the previous way.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java:
##########
@@ -331,6 +332,7 @@ public CompletableFuture<Void> abortTxn(TxnID txnID, long 
lowWaterMark) {
             TxnBuffer txnBuffer = getTxnBufferOrThrowNotFoundException(txnID);
             txnBuffer.abort();
             buffers.remove(txnID, txnBuffer);
+            updateLastDispatchablePosition(null);

Review Comment:
   We should not update the position to null even if failed to commit a 
transaction.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java:
##########
@@ -307,6 +307,7 @@ public CompletableFuture<Void> commitTxn(TxnID txnID, long 
lowWaterMark) {
                 txnBuffer.commitAt(committedAtLedgerId, committedAtEntryId);
                 addTxnToTxnIdex(txnID, committedAtLedgerId);
             }
+            updateLastDispatchablePosition(null);

Review Comment:
   Why do you update this position to null?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to