This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 0a846ad5d61 [fix][txn] Ack the same batch message different batchIndex with transaction (#16032) 0a846ad5d61 is described below commit 0a846ad5d61df7d4842cbcb78d7dfaa86156fa73 Author: congbo <39078850+congbobo...@users.noreply.github.com> AuthorDate: Wed Jun 15 21:38:45 2022 +0800 [fix][txn] Ack the same batch message different batchIndex with transaction (#16032) --- .../pendingack/impl/PendingAckHandleImpl.java | 14 ++++++ .../pulsar/broker/transaction/TransactionTest.java | 53 ++++++++++++++++++++++ 2 files changed, 67 insertions(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java index 289e3646e48..7fc633dc2a8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java @@ -23,6 +23,7 @@ import static org.apache.bookkeeper.mledger.util.PositionAckSetUtil.compareToWit import static org.apache.bookkeeper.mledger.util.PositionAckSetUtil.isAckSetOverlap; import static org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -769,6 +770,19 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi } if (!individualAckPositions.containsKey(position)) { + /** + * if the position does not exist in individualAckPositions {@link individualAckPositions}, + * should new the same position and put the new position into + * the individualAckPositions {@link individualAckPositions} + * because when another ack the same batch message will change the ackSet with the new transaction + * when the tc commits the first txn will ack all of the ackSet which has in pending ack status + * individualAckPositions{@link individualAckPositions} can't include the same position + * object on individualAckOfTransaction {@link individualAckOfTransaction} + */ + MutablePair<PositionImpl, Integer> positionPair = positions.get(i); + positionPair.left = PositionImpl.get(positionPair.getLeft().getLedgerId(), + positionPair.getLeft().getEntryId(), + Arrays.copyOf(positionPair.left.getAckSet(), positionPair.left.getAckSet().length)); this.individualAckPositions.put(position, positions.get(i)); } else { MutablePair<PositionImpl, Integer> positionPair = diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java index 700536c9f8b..3a94919b275 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java @@ -996,4 +996,57 @@ public class TransactionTest extends TransactionTestBase { transaction.commit().get(); } + + @Test + public void testPendingAckBatchMessageCommit() throws Exception { + String topic = NAMESPACE1 + "/testPendingAckBatchMessageCommit"; + + // enable batch index ack + conf.setAcknowledgmentAtBatchIndexLevelEnabled(true); + + @Cleanup + Producer<byte[]> producer = pulsarClient + .newProducer(Schema.BYTES) + .topic(topic) + .enableBatching(true) + // ensure that batch message is sent + .batchingMaxPublishDelay(3, TimeUnit.SECONDS) + .sendTimeout(0, TimeUnit.SECONDS) + .create(); + + @Cleanup + Consumer<byte[]> consumer = pulsarClient + .newConsumer() + .subscriptionType(SubscriptionType.Shared) + .topic(topic) + .subscriptionName("sub") + .subscribe(); + + // send batch message, the size is 5 + for (int i = 0; i < 5; i++) { + producer.sendAsync(("test" + i).getBytes()); + } + + producer.flush(); + + Transaction txn1 = pulsarClient.newTransaction() + .withTransactionTimeout(10, TimeUnit.MINUTES).build().get(); + // ack the first message with transaction + consumer.acknowledgeAsync(consumer.receive().getMessageId(), txn1).get(); + Transaction txn2 = pulsarClient.newTransaction() + .withTransactionTimeout(10, TimeUnit.MINUTES).build().get(); + // ack the second message with transaction + MessageId messageId = consumer.receive().getMessageId(); + consumer.acknowledgeAsync(messageId, txn2).get(); + + // commit the txn1 + txn1.commit().get(); + // abort the txn2 + txn2.abort().get(); + + Transaction txn3 = pulsarClient.newTransaction() + .withTransactionTimeout(10, TimeUnit.MINUTES).build().get(); + // repeat ack the second message, can ack successful + consumer.acknowledgeAsync(messageId, txn3).get(); + } }