This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new c8488713a9d [fix][test][branch-3.3] Fix flaky 
AdminApiTransactionTest.testGetTransactionBufferInternalStats
c8488713a9d is described below

commit c8488713a9db2a6ff89e4c12d32b0ac18d95c89e
Author: Lari Hotari <[email protected]>
AuthorDate: Wed Oct 29 13:25:17 2025 +0200

    [fix][test][branch-3.3] Fix flaky 
AdminApiTransactionTest.testGetTransactionBufferInternalStats
    
    - cherry-picked from 
https://github.com/apache/pulsar/pull/23231/commits/c9e72c45bdc00525e83979b17433fa1f8e65ef39
---
 .../broker/admin/v3/AdminApiTransactionTest.java   | 81 ++++++++++++----------
 1 file changed, 43 insertions(+), 38 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
index 5a192d0159a..e301f051da1 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
@@ -49,8 +49,8 @@ import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.TransactionIsolationLevel;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.TransactionIsolationLevel;
 import org.apache.pulsar.client.api.transaction.Transaction;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.client.impl.BatchMessageIdImpl;
@@ -625,21 +625,23 @@ public class AdminApiTransactionTest extends 
MockedPulsarServiceBaseTest {
         producer.newMessage(transaction).send();
         transaction.abort().get();
 
-        // Get transaction buffer internal stats and verify single snapshot 
stats
-        TransactionBufferInternalStats stats = admin.transactions()
-                .getTransactionBufferInternalStatsAsync(topic2, true).get();
-        assertEquals(stats.snapshotType, 
AbortedTxnProcessor.SnapshotType.Single.toString());
-        assertNotNull(stats.singleSnapshotSystemTopicInternalStats);
-
-        // Get managed ledger internal stats for the transaction buffer 
snapshot topic
-        PersistentTopicInternalStats internalStats = 
admin.topics().getInternalStats(
-                TopicName.get(topic2).getNamespace() + "/" + 
SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT);
-        
verifyManagedLedgerInternalStats(stats.singleSnapshotSystemTopicInternalStats.managedLedgerInternalStats,
-                internalStats);
-        
assertTrue(stats.singleSnapshotSystemTopicInternalStats.managedLedgerName
-                .contains(SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT));
-        assertNull(stats.segmentInternalStats);
-        assertNull(stats.segmentIndexInternalStats);
+        Awaitility.await().untilAsserted(() -> {
+            // Get transaction buffer internal stats and verify single 
snapshot stats
+            TransactionBufferInternalStats stats = admin.transactions()
+                    .getTransactionBufferInternalStatsAsync(topic2, 
true).get();
+            assertEquals(stats.snapshotType, 
AbortedTxnProcessor.SnapshotType.Single.toString());
+            assertNotNull(stats.singleSnapshotSystemTopicInternalStats);
+
+            // Get managed ledger internal stats for the transaction buffer 
snapshot topic
+            PersistentTopicInternalStats internalStats = 
admin.topics().getInternalStats(
+                    TopicName.get(topic2).getNamespace() + "/" + 
SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT);
+            
verifyManagedLedgerInternalStats(stats.singleSnapshotSystemTopicInternalStats.managedLedgerInternalStats,
+                    internalStats);
+            
assertTrue(stats.singleSnapshotSystemTopicInternalStats.managedLedgerName
+                    .contains(SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT));
+            assertNull(stats.segmentInternalStats);
+            assertNull(stats.segmentIndexInternalStats);
+        });
 
         // Configure segmented snapshot and set segment size
         pulsar.getConfig().setTransactionBufferSnapshotSegmentSize(9);
@@ -651,28 +653,31 @@ public class AdminApiTransactionTest extends 
MockedPulsarServiceBaseTest {
         producer.newMessage(transaction).send();
         transaction.abort().get();
 
-        // Get transaction buffer internal stats and verify segmented snapshot 
stats
-        stats = 
admin.transactions().getTransactionBufferInternalStatsAsync(topic3, true).get();
-        assertEquals(stats.snapshotType, 
AbortedTxnProcessor.SnapshotType.Segment.toString());
-        assertNull(stats.singleSnapshotSystemTopicInternalStats);
-        assertNotNull(stats.segmentInternalStats);
-
-        // Get managed ledger internal stats for the transaction buffer 
segments topic
-        internalStats = admin.topics().getInternalStats(
-                TopicName.get(topic2).getNamespace() + "/" +
-                        SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_SEGMENTS);
-        
verifyManagedLedgerInternalStats(stats.segmentInternalStats.managedLedgerInternalStats,
 internalStats);
-        assertTrue(stats.segmentInternalStats.managedLedgerName
-                
.contains(SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_SEGMENTS));
-
-        // Get managed ledger internal stats for the transaction buffer 
indexes topic
-        assertNotNull(stats.segmentIndexInternalStats);
-        internalStats = admin.topics().getInternalStats(
-                TopicName.get(topic2).getNamespace() + "/" +
-                        SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_INDEXES);
-        
verifyManagedLedgerInternalStats(stats.segmentIndexInternalStats.managedLedgerInternalStats,
 internalStats);
-        assertTrue(stats.segmentIndexInternalStats.managedLedgerName
-                
.contains(SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_INDEXES));
+        Awaitility.await().untilAsserted(() -> {
+            // Get transaction buffer internal stats and verify segmented 
snapshot stats
+            TransactionBufferInternalStats stats =
+                    
admin.transactions().getTransactionBufferInternalStatsAsync(topic3, true).get();
+            assertEquals(stats.snapshotType, 
AbortedTxnProcessor.SnapshotType.Segment.toString());
+            assertNull(stats.singleSnapshotSystemTopicInternalStats);
+            assertNotNull(stats.segmentInternalStats);
+
+            // Get managed ledger internal stats for the transaction buffer 
segments topic
+            PersistentTopicInternalStats internalStats = 
admin.topics().getInternalStats(
+                    TopicName.get(topic2).getNamespace() + "/" +
+                            
SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_SEGMENTS);
+            
verifyManagedLedgerInternalStats(stats.segmentInternalStats.managedLedgerInternalStats,
 internalStats);
+            assertTrue(stats.segmentInternalStats.managedLedgerName
+                    
.contains(SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_SEGMENTS));
+
+            // Get managed ledger internal stats for the transaction buffer 
indexes topic
+            assertNotNull(stats.segmentIndexInternalStats);
+            internalStats = admin.topics().getInternalStats(
+                    TopicName.get(topic2).getNamespace() + "/" +
+                            
SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_INDEXES);
+            
verifyManagedLedgerInternalStats(stats.segmentIndexInternalStats.managedLedgerInternalStats,
 internalStats);
+            assertTrue(stats.segmentIndexInternalStats.managedLedgerName
+                    
.contains(SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_INDEXES));
+        });
     }
 
 

Reply via email to