Demogorgon314 commented on code in PR #21406:
URL: https://github.com/apache/pulsar/pull/21406#discussion_r1366731435


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java:
##########
@@ -179,4 +182,57 @@ public void testCloseTransactionBufferWhenTimeout() throws 
Exception {
         Assert.assertTrue(f.isCompletedExceptionally());
     }
 
+    /**
+     * This test verifies the state changes of a TransactionBuffer within a 
topic under different conditions.
+     * Initially, the TransactionBuffer is in a NoSnapshot state upon topic 
creation.
+     * It remains in the NoSnapshot state even after a normal message is sent.
+     * The state changes to Ready only after a transactional message is sent.
+     * The test also ensures that the TransactionBuffer can be correctly 
recovered after the topic is unloaded.
+     */
+    @Test
+    public void testWriteSnapshotWhenFirstTxnMessageSend() throws Exception {
+        String topic = "persistent://" + NAMESPACE1 + 
"/testWriteSnapshotWhenFirstTxnMessageSend";
+        admin.topics().createNonPartitionedTopic(topic);
+        PersistentTopic persistentTopic = (PersistentTopic) 
pulsarServiceList.get(0).getBrokerService()
+                .getTopic(topic, false)
+                .get()
+                .get();
+        persistentTopic.checkIfTransactionBufferRecoverCompletely(true).get();
+        TopicTransactionBuffer topicTransactionBuffer = 
(TopicTransactionBuffer) persistentTopic.getTransactionBuffer();
+        // The TransactionBuffer should be in NoSnapshot state when the topic 
is initially created
+        Assert.assertEquals(topicTransactionBuffer.getState(), 
TopicTransactionBufferState.State.NoSnapshot);

Review Comment:
   Maybe use Awaitility will better?
   
   ```java
   TopicTransactionBuffer topicTransactionBuffer = (TopicTransactionBuffer) 
persistentTopic.getTransactionBuffer();
   Awaitility.await().untilAsserted(() -> {
       Assert.assertEquals(topicTransactionBuffer.getState(), 
TopicTransactionBufferState.State.NoSnapshot);
   });
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java:
##########
@@ -255,6 +230,40 @@ public long getCommittedTxnCount() {
 
     @Override
     public CompletableFuture<Position> appendBufferToTxn(TxnID txnId, long 
sequenceId, ByteBuf buffer) {
+        buffer.retain();
+        return transactionBufferFuture.thenCompose(ignore -> {
+            if (checkIfNoSnapshot()) {
+                CompletableFuture<Void> completableFuture = new 
CompletableFuture<>();
+                
snapshotAbortedTxnProcessor.takeAbortedTxnsSnapshot(maxReadPosition).thenRun(() 
-> {
+                    if (changeToReadyStateFromNoSnapshot()) {
+                        timer.newTimeout(TopicTransactionBuffer.this,
+                                takeSnapshotIntervalTime, 
TimeUnit.MILLISECONDS);
+                        completableFuture.complete(null);
+                    } else {
+                        //This case should not happen.
+                        log.error("[{} ]Failed to change state of transaction 
buffer to Ready from NoSnapshot",

Review Comment:
   ```suggestion
                           log.error("[{}]Failed to change state of transaction 
buffer to Ready from NoSnapshot",
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to