liangyepianzhou commented on code in PR #20559:
URL: https://github.com/apache/pulsar/pull/20559#discussion_r1316933551


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java:
##########
@@ -250,7 +231,54 @@ public long getCommittedTxnCount() {
     }
 
     @Override
+/**
+ * If it's the first time using the transaction buffer, the method records the 
max read position of the topic
+ * to metadata of the topic at the time of the first write. If the transaction 
buffer has been used before,
+ * it writes the message after recovering the transaction buffer.
+ */
     public CompletableFuture<Position> appendBufferToTxn(TxnID txnId, long 
sequenceId, ByteBuf buffer) {
+        ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) 
topic.getManagedLedger();
+        if (checkIfReady()) {
+            // Transaction buffer is ready, directly append the buffer
+            return internalAppendBufferToTxn(txnId, sequenceId, buffer);
+        } else if (changeToInitializingState()) {
+            // Transition to initializing state and record max read position 
if it's the first usage
+            Position firstMaxReadPosition = 
topic.getManagedLedger().getLastConfirmedEntry();
+            if (managedLedger.getProperties().get(MAX_READ_POSITION) == null) {
+                HashMap<String, String> hashMap = new HashMap<>();
+                hashMap.put(MAX_READ_POSITION, 
firstMaxReadPosition.toString());
+                managedLedger.asyncSetProperties(hashMap, new 
AsyncCallbacks.UpdatePropertiesCallback() {
+                    @Override
+                    public void updatePropertiesComplete(Map<String, String> 
properties, Object ctx) {
+                        changeToReadyState();

Review Comment:
   We have a scheduled task to do a similar thing in the snapshot processor. 
   We only write the first snapshot in the managedLedger properties, and should 
not add more dependencies to metadata.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java:
##########
@@ -1459,61 +1459,48 @@ protected void handleProducer(final CommandProducer 
cmdProducer) {
                     });
 
                     schemaVersionFuture.thenAccept(schemaVersion -> {
-                        
topic.checkIfTransactionBufferRecoverCompletely(isTxnEnabled).thenAccept(future 
-> {

Review Comment:
   `checkIfTransactionBufferRecoverCompletely` is triggered when building the 
producer.
   We should persist a snapshot when first sending with a transaction, and that 
is implemented in TransactionBuffer.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java:
##########
@@ -66,6 +69,8 @@
 @Slf4j
 public class TopicTransactionBuffer extends TopicTransactionBufferState 
implements TransactionBuffer, TimerTask {
 
+    public static final String MAX_READ_POSITION = "maxReadPosition";

Review Comment:
   Yeah, it is the last confirmed position before the first transaction message.
   When the state of the transaction message is confirmed, this position is the 
max read position. 
   Or if you have any other suggestions?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to