liangyepianzhou commented on code in PR #18273:
URL: https://github.com/apache/pulsar/pull/18273#discussion_r1095339686
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/AbortTxnProcessorTest.java:
##########
@@ -0,0 +1,147 @@
+package org.apache.pulsar.broker.transaction;
+
+import java.lang.reflect.Field;
+import java.util.LinkedList;
+import java.util.NavigableMap;
+import java.util.Queue;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats;
+import org.apache.commons.collections4.map.LinkedMap;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
+import
org.apache.pulsar.broker.transaction.buffer.impl.SnapshotSegmentAbortedTxnProcessorImpl;
+import org.apache.pulsar.client.api.transaction.TxnID;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class AbortTxnProcessorTest extends TransactionTestBase {
+
+ private static final String PROCESSOR_TOPIC = "persistent://" + NAMESPACE1
+ "/abortedTxnProcessor";
+ private static final int SEGMENT_SIZE = 5;
+ private PulsarService pulsarService = null;
+
+ @Override
+ @BeforeClass
+ protected void setup() throws Exception {
+ setUpBase(1, 1, PROCESSOR_TOPIC, 0);
+ this.pulsarService = getPulsarServiceList().get(0);
+
this.pulsarService.getConfig().setTransactionBufferSegmentedSnapshotEnabled(true);
+
this.pulsarService.getConfig().setTransactionBufferSnapshotSegmentSize(8 +
PROCESSOR_TOPIC.length() + 5 * 3);
+ }
+
+ @Override
+ @AfterClass
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ /**
+ * Test api:
+ * 1. putAbortedTxnAndPosition
+ * 2. checkAbortedTransaction
+ * 3. takeAbortedTxnsSnapshot
+ * 4. recoverFromSnapshot
+ * 5. trimExpiredAbortedTxns
+ * @throws Exception
+ */
+ @Test
+ public void testPutAbortedTxnIntoProcessor() throws Exception {
+ PersistentTopic persistentTopic = (PersistentTopic)
pulsarService.getBrokerService()
+ .getTopic(PROCESSOR_TOPIC, false).get().get();
+ AbortedTxnProcessor processor = new
SnapshotSegmentAbortedTxnProcessorImpl(persistentTopic);
+ //1. prepare test data.
+ //1.1 Put 10 aborted txn IDs to persistent two sealed segments.
+ for (int i = 0; i < 10; i++) {
+ TxnID txnID = new TxnID(0, i);
+ PositionImpl position = new PositionImpl(0,i);
+ processor.putAbortedTxnAndPosition(txnID, position);
+ }
+ //1.2 Put 4 aborted txn IDs into the unsealed segment.
+ for (int i = 10; i < 14; i++) {
+ TxnID txnID = new TxnID(0, i);
+ PositionImpl position = new PositionImpl(0,i);
+ processor.putAbortedTxnAndPosition(txnID, position);
+ }
+ //1.3 Verify the common data flow
+ verifyAbortedTxnIDAndSegmentIndex(processor,0,14);
+ //2. Take the latest snapshot and verify recover from snapshot
+ AbortedTxnProcessor newProcessor = new
SnapshotSegmentAbortedTxnProcessorImpl(persistentTopic);
+ PositionImpl maxReadPosition = new PositionImpl(0, 14);
+ //2.1 Avoid update operation being canceled.
+ waitTaskExecuteCompletely(processor);
+ //2.2 take the latest snapshot
+ processor.takeAbortedTxnsSnapshot(maxReadPosition).get();
+ newProcessor.recoverFromSnapshot().get();
+ //Verify the recovery data flow
+ verifyAbortedTxnIDAndSegmentIndex(newProcessor,0,14);
+ //3. Delete the ledgers and then verify the date.
+ Field ledgersField =
ManagedLedgerImpl.class.getDeclaredField("ledgers");
+ ledgersField.setAccessible(true);
+ NavigableMap<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo> ledgers
=
+ (NavigableMap<Long,
MLDataFormats.ManagedLedgerInfo.LedgerInfo>)
+ ledgersField.get(persistentTopic.getManagedLedger());
+ ledgers.forEach((k, v) -> {
Review Comment:
There is a clear snapshot test `clearTransactionBufferSnapshotTest` in
`TopicTransactionBufferRecoverTest`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]