This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new 90b4f86922f [feature][txn] Fix individual ack batch message with 
transaction abort redevlier duplicate messages (#14327)
90b4f86922f is described below

commit 90b4f86922fff37b74e9dc572378cb8ba49f16af
Author: congbo <[email protected]>
AuthorDate: Fri Feb 17 09:18:42 2023 +0800

    [feature][txn] Fix individual ack batch message with transaction abort 
redevlier duplicate messages (#14327)
    
    If individual ack batch message with transaction and abort this 
transaction, we will redeliver this message. but this batch message some bit 
sit are acked by another transaction and re consume this bit sit will produce 
`TransactionConflictException`, we don't need to redeliver this bit sit witch 
is acked by another transaction.
    
    if batch have batch size 5
    
    1. txn1 ack 0, 1     the ackSet is   00111
    2. txn2 ack 2 3 4 the ack Set is  11000
    3. abort txn2 redeliver this position is 00111
    4. but now we don't filter txn1 ackSet so redeliver this position bitSet is 
111111
    When filter the message we should filter the bit sit witch is real ack or 
in pendingAck state
    add the test
    
    (cherry picked from commit e0c0d5e8785ae8933af1bcbb4ddea59f35644c05)
---
 .../mledger/util/PositionAckSetUtil.java           | 12 +++--
 .../broker/service/AbstractBaseDispatcher.java     | 25 +++++++++-
 .../service/persistent/PersistentSubscription.java |  9 +++-
 .../transaction/pendingack/PendingAckHandle.java   | 11 ++++
 .../pendingack/impl/PendingAckHandleDisabled.java  |  5 ++
 .../pendingack/impl/PendingAckHandleImpl.java      | 11 ++++
 .../client/impl/TransactionEndToEndTest.java       | 58 +++++++++++++++++++++-
 7 files changed, 125 insertions(+), 6 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/PositionAckSetUtil.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/PositionAckSetUtil.java
index 47d4bc2eea0..da3043e7458 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/PositionAckSetUtil.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/PositionAckSetUtil.java
@@ -46,12 +46,18 @@ public class PositionAckSetUtil {
         if (currentPosition == null || otherPosition == null) {
             return;
         }
-        BitSetRecyclable thisAckSet = 
BitSetRecyclable.valueOf(currentPosition.getAckSet());
-        BitSetRecyclable otherAckSet = 
BitSetRecyclable.valueOf(otherPosition.getAckSet());
+        currentPosition.setAckSet(andAckSet(currentPosition.getAckSet(), 
otherPosition.getAckSet()));
+    }
+
+    //This method is do `and` operation for ack set
+    public static long[] andAckSet(long[] firstAckSet, long[] secondAckSet) {
+        BitSetRecyclable thisAckSet = BitSetRecyclable.valueOf(firstAckSet);
+        BitSetRecyclable otherAckSet = BitSetRecyclable.valueOf(secondAckSet);
         thisAckSet.and(otherAckSet);
-        currentPosition.setAckSet(thisAckSet.toLongArray());
+        long[] ackSet = thisAckSet.toLongArray();
         thisAckSet.recycle();
         otherAckSet.recycle();
+        return ackSet;
     }
 
     //This method is compare two position which position is bigger than 
another one.
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index c9c0300da05..da6be55f8e1 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -19,6 +19,7 @@
 
 package org.apache.pulsar.broker.service;
 
+import static org.apache.bookkeeper.mledger.util.PositionAckSetUtil.andAckSet;
 import com.google.common.collect.ImmutableList;
 import io.netty.buffer.ByteBuf;
 import java.util.ArrayList;
@@ -39,10 +40,12 @@ import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.intercept.BrokerInterceptor;
 import org.apache.pulsar.broker.service.persistent.CompactorSubscription;
 import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
+import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.service.plugin.EntryFilter;
 import org.apache.pulsar.broker.service.plugin.EntryFilterWithClassLoader;
 import org.apache.pulsar.broker.service.plugin.FilterContext;
+import 
org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleImpl;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.common.api.proto.CommandAck.AckType;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
@@ -212,8 +215,28 @@ public abstract class AbstractBaseDispatcher implements 
Dispatcher {
             batchSizes.setBatchSize(i, batchSize);
             long[] ackSet = null;
             if (indexesAcks != null && cursor != null) {
+                PositionImpl position = PositionImpl.get(entry.getLedgerId(), 
entry.getEntryId());
                 ackSet = cursor
-                        
.getDeletedBatchIndexesAsLongArray(PositionImpl.get(entry.getLedgerId(), 
entry.getEntryId()));
+                        .getDeletedBatchIndexesAsLongArray(position);
+                // some batch messages ack bit sit will be in pendingAck 
state, so don't send all bit sit to consumer
+                if (subscription instanceof PersistentSubscription
+                        && ((PersistentSubscription) subscription)
+                        .getPendingAckHandle() instanceof 
PendingAckHandleImpl) {
+                    PositionImpl positionInPendingAck =
+                            ((PersistentSubscription) 
subscription).getPositionInPendingAck(position);
+                    // if this position not in pendingAck state, don't need to 
do any op
+                    if (positionInPendingAck != null) {
+                        if (positionInPendingAck.hasAckSet()) {
+                            // need to or ackSet in pendingAck state and 
cursor ackSet which bit sit has been acked
+                            if (ackSet != null) {
+                                ackSet = andAckSet(ackSet, 
positionInPendingAck.getAckSet());
+                            } else {
+                                // if actSet is null, use pendingAck ackSet
+                                ackSet = positionInPendingAck.getAckSet();
+                            }
+                        }
+                    }
+                }
                 if (ackSet != null) {
                     indexesAcks.setIndexesAcks(i, Pair.of(batchSize, ackSet));
                 } else {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index c2999c1c741..ff1c654f1d8 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -1091,6 +1091,9 @@ public class PersistentSubscription extends 
AbstractSubscription implements Subs
         return subscriptionProperties;
     }
 
+    public PositionImpl getPositionInPendingAck(PositionImpl position) {
+        return pendingAckHandle.getPositionInPendingAck(position);
+    }
     @Override
     public CompletableFuture<Void> updateSubscriptionProperties(Map<String, 
String> subscriptionProperties) {
         Map<String, String> newSubscriptionProperties;
@@ -1104,7 +1107,6 @@ public class PersistentSubscription extends 
AbstractSubscription implements Subs
                     this.subscriptionProperties = newSubscriptionProperties;
                 });
     }
-
     /**
      * Return a merged map that contains the cursor properties specified by 
used
      * (eg. when using compaction subscription) and the subscription 
properties.
@@ -1187,4 +1189,9 @@ public class PersistentSubscription extends 
AbstractSubscription implements Subs
     }
 
     private static final Logger log = 
LoggerFactory.getLogger(PersistentSubscription.class);
+
+    @VisibleForTesting
+    public PendingAckHandle getPendingAckHandle() {
+        return pendingAckHandle;
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandle.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandle.java
index e9984baf007..2d2418859cd 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandle.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandle.java
@@ -157,4 +157,15 @@ public interface PendingAckHandle {
      * @return if the PendingAckStore is init.
      */
     boolean checkIfPendingAckStoreInit();
+
+    /**
+     * If it returns null, it means this Position is not in pendingAck.
+     * <p>
+     * If it does not return null, it means this Position is in pendingAck and 
if it is batch Position,
+     * it will return the corresponding ackSet in pendingAck
+     *
+     * @param position {@link Position} witch need to get in pendingAck
+     * @return {@link Position} return the position in pendingAck
+     */
+    PositionImpl getPositionInPendingAck(PositionImpl position);
 }
\ No newline at end of file
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleDisabled.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleDisabled.java
index 6a0aca6f9d7..46937b6666f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleDisabled.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleDisabled.java
@@ -100,4 +100,9 @@ public class PendingAckHandleDisabled implements 
PendingAckHandle {
     public boolean checkIfPendingAckStoreInit() {
         return false;
     }
+
+    @Override
+    public PositionImpl getPositionInPendingAck(PositionImpl position) {
+        return null;
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
index d4a23817e63..aa2244a4bd9 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
@@ -987,6 +987,17 @@ public class PendingAckHandleImpl extends 
PendingAckHandleState implements Pendi
         return this.pendingAckStoreFuture != null && 
this.pendingAckStoreFuture.isDone();
     }
 
+    @Override
+    public PositionImpl getPositionInPendingAck(PositionImpl position) {
+        if (individualAckPositions != null) {
+            MutablePair<PositionImpl, Integer> positionPair = 
this.individualAckPositions.get(position);
+            if (positionPair != null) {
+                return positionPair.getLeft();
+            }
+        }
+        return null;
+    }
+
     protected void handleCacheRequest() {
         while (true) {
             Runnable runnable = acceptQueue.poll();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
index e3bbe1ad97b..b0a4b28bbdc 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
@@ -115,6 +115,62 @@ public class TransactionEndToEndTest extends 
TransactionTestBase {
         return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
     }
 
+    @Test
+    private void testIndividualAckAbortFilterAckSetInPendingAckState() throws 
Exception {
+        final String topicName = NAMESPACE1 + 
"/testIndividualAckAbortFilterAckSetInPendingAckState";
+        final int count = 9;
+        Producer<Integer> producer = pulsarClient
+                .newProducer(Schema.INT32)
+                .topic(topicName)
+                .enableBatching(true)
+                .batchingMaxPublishDelay(1, TimeUnit.HOURS)
+                .batchingMaxMessages(count).create();
+
+        @Cleanup
+        Consumer<Integer> consumer = pulsarClient
+                .newConsumer(Schema.INT32)
+                .topic(topicName)
+                .isAckReceiptEnabled(true)
+                .subscriptionName("test")
+                .subscriptionType(SubscriptionType.Shared)
+                .enableBatchIndexAcknowledgment(true)
+                .subscribe();
+
+        for (int i = 0; i < count; i++) {
+            producer.sendAsync(i);
+        }
+
+        Transaction firstTransaction = getTxn();
+
+        Transaction secondTransaction = getTxn();
+
+        // firstTransaction ack the first three messages and don't end the 
firstTransaction
+        for (int i = 0; i < count / 3; i++) {
+            consumer.acknowledgeAsync(consumer.receive().getMessageId(), 
firstTransaction).get();
+        }
+
+        // if secondTransaction abort we only can receive the middle three 
messages
+        for (int i = 0; i < count / 3; i++) {
+            consumer.acknowledgeAsync(consumer.receive().getMessageId(), 
secondTransaction).get();
+        }
+
+        // consumer normal ack the last three messages
+        for (int i = 0; i < count / 3; i++) {
+            consumer.acknowledgeAsync(consumer.receive()).get();
+        }
+
+        // if secondTransaction abort we only can receive the middle three 
messages
+        secondTransaction.abort().get();
+
+        // can receive 3 4 5 bit sit message
+        for (int i = 0; i < count / 3; i++) {
+            assertEquals(consumer.receive().getValue().intValue(), i + 3);
+        }
+
+        // can't receive message anymore
+        assertNull(consumer.receive(2, TimeUnit.SECONDS));
+    }
+
     @Test(dataProvider="enableBatch")
     private void produceCommitTest(boolean enableBatch) throws Exception {
         @Cleanup
@@ -641,7 +697,7 @@ public class TransactionEndToEndTest extends 
TransactionTestBase {
         }
     }
 
-    private Transaction getTxn() throws Exception {
+    public Transaction getTxn() throws Exception {
         return pulsarClient
                 .newTransaction()
                 .withTransactionTimeout(10, TimeUnit.SECONDS)

Reply via email to