This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e66ba27  [transaction][acknowledge] Introduce in-memory PENDING_ACK 
state in acknowledgement path (#4265)
e66ba27 is described below

commit e66ba275df7a608a7cd61c91248dd543b1e1edbb
Author: Marvin Cai <[email protected]>
AuthorDate: Mon Jun 17 12:40:33 2019 -0700

    [transaction][acknowledge] Introduce in-memory PENDING_ACK state in 
acknowledgement path (#4265)
    
    Master Issue: #2664
    
    Motivation:
    Add acknowledgeMessage, commit, abort for transaction in 
PersistentSubscription.
    
    Changes:
    Will put message in Pending_ACK status when acknowledgeMessage is called 
with TxnID.
    No real status class introduced, only added collection to hold messages in 
Pending_ACK status.
    Current PR only keep Pending_ACK state in memory, in subsequent PR will 
also persistent these pending acks so we can recover from broker failure.
    
    Add commitTxn to put message to Deleted status.
    Add abortTxn to put message to Pending status.
    
    For normal acknowledgeMessage and redeliverUnacknowledgedMessages, will 
check to see if
    message if message is in Pending_ACK first. If true, will **ignore** those 
acks/redeliverys.
    
    Add unit test.
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |   6 +
 pulsar-broker/pom.xml                              |   6 +
 .../service/persistent/PersistentSubscription.java | 338 ++++++++++++++++++++-
 .../broker/service/PersistentTopicE2ETest.java     |  27 +-
 .../persistent/PersistentSubscriptionTest.java     | 284 +++++++++++++++++
 pulsar-transaction/common/pom.xml                  |   2 +-
 .../exception/TransactionConflictException.java}   |  32 +-
 .../exception/package-info.java}                   |  30 +-
 .../pulsar/transaction/impl/common/TxnID.java      |   4 +
 pulsar-transaction/pom.xml                         |   2 +-
 10 files changed, 662 insertions(+), 69 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 124f82a..a936954 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -2518,6 +2518,12 @@ public class ManagedCursorImpl implements ManagedCursor {
         return individualDeletedMessages;
     }
 
+    public boolean isMessageDeleted(Position position) {
+        checkArgument(position instanceof PositionImpl);
+        return individualDeletedMessages.contains(((PositionImpl) 
position).getLedgerId(),
+                ((PositionImpl) position).getEntryId()) || ((PositionImpl) 
position).compareTo(markDeletePosition) <= 0 ;
+    }
+
     /**
      * Checks given position is part of deleted-range and returns next 
position of upper-end as all the messages are
      * deleted up to that point.
diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml
index 85b88ac..c4f9ae1 100644
--- a/pulsar-broker/pom.xml
+++ b/pulsar-broker/pom.xml
@@ -110,6 +110,12 @@
 
     <dependency>
       <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-transaction-common</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>${project.groupId}</groupId>
       <artifactId>managed-ledger-original</artifactId>
       <version>${project.version}</version>
       <type>test-jar</type>
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 70be6ba..af2dde9 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -20,12 +20,20 @@ package org.apache.pulsar.broker.service.persistent;
 
 import com.google.common.base.MoreObjects;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.LongStream;
 
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ClearBacklogCallback;
@@ -39,7 +47,9 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import 
org.apache.bookkeeper.mledger.ManagedLedgerException.ConcurrentFindCursorPositionException;
 import 
org.apache.bookkeeper.mledger.ManagedLedgerException.InvalidCursorPositionException;
 import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap;
 import org.apache.pulsar.broker.service.BrokerServiceException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
@@ -56,10 +66,16 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ConsumerStats;
 import org.apache.pulsar.common.policies.data.SubscriptionStats;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
+import 
org.apache.pulsar.transaction.common.exception.TransactionConflictException;
+import org.apache.pulsar.transaction.impl.common.TxnID;
 import org.apache.pulsar.utils.CopyOnWriteArrayList;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static com.google.common.base.Preconditions.checkArgument;
+
 public class PersistentSubscription implements Subscription {
     protected final PersistentTopic topic;
     protected final ManagedCursor cursor;
@@ -77,6 +93,29 @@ public class PersistentSubscription implements Subscription {
     // for connected subscriptions, message expiry will be checked if the 
backlog is greater than this threshold
     private static final int MINIMUM_BACKLOG_FOR_EXPIRY_CHECK = 1000;
 
+    // Map to keep track of message ack by each txn.
+    private ConcurrentOpenHashMap<TxnID, ConcurrentOpenHashSet<Position>> 
pendingAckMessagesMap;
+
+    // Messages acked by ongoing transaction, pending transaction commit to 
materialize the acks. For faster look up.
+    // Using hashset as a message should only be acked once by one transaction.
+    private ConcurrentOpenHashSet<Position> pendingAckMessages;
+
+    // Message cumulative acked by ongoing transaction, pending transaction 
commit to materialize the ack.
+    // Only one transaction can cumulative ack.
+    // This parameter only keep the the largest Position it cumulative ack,as 
any Position smaller will also be covered.
+    private volatile Position pendingCumulativeAckMessage;
+
+    private static final AtomicReferenceFieldUpdater<PersistentSubscription, 
Position> POSITION_UPDATER =
+            
AtomicReferenceFieldUpdater.newUpdater(PersistentSubscription.class, 
Position.class,
+                    "pendingCumulativeAckMessage");
+
+    // ID of transaction currently using cumulative ack.
+    private volatile TxnID pendingCumulativeAckTxnId;
+
+    private static final AtomicReferenceFieldUpdater<PersistentSubscription, 
TxnID> PENDING_CUMULATIVE_ACK_TXNID_UPDATER =
+            
AtomicReferenceFieldUpdater.newUpdater(PersistentSubscription.class, 
TxnID.class,
+                    "pendingCumulativeAckTxnId");
+
     private static final String REPLICATED_SUBSCRIPTION_PROPERTY = 
"pulsar.replicated.subscription";
 
     // Map of properties that is used to mark this subscription as 
"replicated".
@@ -225,8 +264,15 @@ public class PersistentSubscription implements 
Subscription {
         Position previousMarkDeletePosition = cursor.getMarkDeletedPosition();
 
         if (ackType == AckType.Cumulative) {
+            if (this.pendingCumulativeAckTxnId != null) {
+                log.warn("[{}][{}] An ongoing transaction:{} is doing 
cumulative ack, " +
+                         "new cumulative ack is not allowed till the 
transaction is committed.",
+                          topicName, subName, 
this.pendingCumulativeAckTxnId.toString());
+                return;
+            }
+
             if (positions.size() != 1) {
-                log.warn("[{}][{}] Invalid cumulative ack received with 
multiple message ids", topicName, subName);
+                log.warn("[{}][{}] Invalid cumulative ack received with 
multiple message ids.", topicName, subName);
                 return;
             }
 
@@ -239,7 +285,35 @@ public class PersistentSubscription implements 
Subscription {
             if (log.isDebugEnabled()) {
                 log.debug("[{}][{}] Individual acks on {}", topicName, 
subName, positions);
             }
-            cursor.asyncDelete(positions, deleteCallback, positions);
+            // Check if message is acknowledged by ongoing transaction.
+            if ((pendingAckMessages != null && pendingAckMessages.size() != 0) 
|| pendingCumulativeAckMessage != null) {
+                List<Position> positionsSafeToAck;
+                synchronized (PersistentSubscription.this) {
+                    positionsSafeToAck = positions.stream().filter(position -> 
{
+                        checkArgument(position instanceof PositionImpl);
+                        // If single ack try to ack message in pending_ack 
status, skip this ack.
+                        if (pendingAckMessages != null && 
this.pendingAckMessages.contains(position)) {
+                            log.warn("[{}][{}] Invalid acks position conflict 
with an ongoing transaction:{}.",
+                                    topicName, subName, 
this.pendingCumulativeAckTxnId.toString());
+                            return false;
+                        }
+
+                        // If single ack is within range of cumulative ack of 
an ongoing transaction, skip this ack.
+                        if (null != this.pendingCumulativeAckMessage &&
+                                ((PositionImpl) 
position).compareTo((PositionImpl) this.pendingCumulativeAckMessage) <= 0) {
+                            log.warn("[{}][{}] Invalid acks position within 
cumulative ack position of an ongoing " +
+                                    "transaction:{}.", topicName, subName, 
this.pendingCumulativeAckTxnId.toString());
+                            return false;
+                        }
+
+                        return true;
+                    }).collect(Collectors.toList());
+                }
+                cursor.asyncDelete(positionsSafeToAck, deleteCallback, 
positionsSafeToAck);
+            } else {
+                cursor.asyncDelete(positions, deleteCallback, positions);
+            }
+
             dispatcher.getRedeliveryTracker().removeBatch(positions);
         }
 
@@ -262,6 +336,110 @@ public class PersistentSubscription implements 
Subscription {
         }
     }
 
+    /**
+     * Acknowledge message(s) for an ongoing transaction.
+     * <p>
+     * It can be of {@link AckType#Individual} or {@link AckType#Cumulative}. 
Single messages acked by ongoing
+     * transaction will be put in pending_ack state and only marked as deleted 
after transaction is committed.
+     * <p>
+     * Only one transaction is allowed to do cumulative ack on a subscription 
at a given time.
+     * If a transaction do multiple cumulative ack, only the one with largest 
position determined by
+     * {@link PositionImpl#compareTo(PositionImpl)} will be kept as it cover 
all position smaller than it.
+     * <p>
+     * If an ongoing transaction cumulative acked a message and then try to 
ack single message which is
+     * smaller than that one it cumulative acked, it'll succeed.
+     * <p>
+     * If transaction is aborted all messages acked by it will be put back to 
pending state.
+     *
+     * @param txnId                  TransactionID of an ongoing transaction 
trying to sck message.
+     * @param positions              {@link Position}(s) it try to ack.
+     * @param ackType                {@link AckType}.
+     * @throws TransactionConflictException if try to do cumulative ack when 
another ongoing transaction already doing
+     *  cumulative ack or try to single ack message already acked by any 
ongoing transaction.
+     * @throws IllegalArgumentException if try to cumulative ack but passed in 
multiple positions.
+     */
+    public synchronized void acknowledgeMessage(TxnID txnId, List<Position> 
positions, AckType ackType) throws TransactionConflictException {
+        checkArgument(txnId != null, "TransactionID can not be null.");
+        if (AckType.Cumulative == ackType) {
+            // Check if another transaction is already using cumulative ack on 
this subscription.
+            if (this.pendingCumulativeAckTxnId != null && 
this.pendingCumulativeAckTxnId != txnId) {
+                String errorMsg = "[" + topicName + "][" + subName + "] 
Transaction:" + txnId +
+                                  " try to cumulative ack message while 
transaction:" + this.pendingCumulativeAckTxnId +
+                                  " already cumulative acked messages.";
+                log.error(errorMsg);
+                throw new TransactionConflictException(errorMsg);
+            }
+
+            if (positions.size() != 1) {
+                String errorMsg = "[" + topicName + "][" + subName + "] 
Transaction:" + txnId +
+                                  " invalid cumulative ack received with 
multiple message ids.";
+                log.error(errorMsg);
+                throw new IllegalArgumentException(errorMsg);
+            }
+
+            Position position = positions.get(0);
+            checkArgument(position instanceof PositionImpl);
+
+            if (((PositionImpl) position).compareTo((PositionImpl) 
cursor.getMarkDeletedPosition()) <= 0) {
+                String errorMsg = "[" + topicName + "][" + subName + "] 
Transaction:" + txnId +
+                        " try to cumulative ack position: " + position + " 
within range of cursor's " +
+                        "markDeletePosition: " + 
cursor.getMarkDeletedPosition();
+                log.error(errorMsg);
+                throw new TransactionConflictException(errorMsg);
+            }
+
+            if (log.isDebugEnabled()) {
+                log.debug("[{}][{}] TxnID:[{}] Cumulative ack on {}.", 
topicName, subName, txnId.toString(), position);
+            }
+
+             if (this.pendingCumulativeAckTxnId == null) {
+                // Only set pendingCumulativeAckTxnId if no transaction is 
doing cumulative ack.
+                PENDING_CUMULATIVE_ACK_TXNID_UPDATER.set(this, txnId);
+                POSITION_UPDATER.set(this, position);
+            } else if 
(((PositionImpl)position).compareTo((PositionImpl)this.pendingCumulativeAckMessage)
 > 0) {
+                // If new cumulative ack position is greater than current one, 
update it.
+                POSITION_UPDATER.set(this, position);
+            }
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}][{}] TxnID:[{}] Individual acks on {}", 
topicName, subName, txnId.toString(), positions);
+            }
+
+            if (pendingAckMessagesMap == null) {
+                pendingAckMessagesMap = new ConcurrentOpenHashMap<>();
+            }
+
+            if (pendingAckMessages == null) {
+                pendingAckMessages = new ConcurrentOpenHashSet<>();
+            }
+
+            ConcurrentOpenHashSet<Position> pendingAckMessageForCurrentTxn =
+                    pendingAckMessagesMap.computeIfAbsent(txnId, txn -> new 
ConcurrentOpenHashSet<>());
+
+            for (Position position : positions) {
+                // If try to ack message already acked by some ongoign 
transaction(can be itself), throw exception.
+                // Acking single message within range of cumulative ack(if 
exist) is considered valid operation.
+                if (this.pendingAckMessages.contains(position)) {
+                    String errorMsg = "[" + topicName + "][" + subName + "] 
Transaction:" + txnId +
+                                      " try to ack message:" + position + " in 
pending ack status.";
+                    log.error(errorMsg);
+                    throw new TransactionConflictException(errorMsg);
+                }
+
+                // If try to ack message already acked by committed 
transaction or normal acknowledge, throw exception.
+                if (((ManagedCursorImpl) cursor).isMessageDeleted(position)) {
+                    String errorMsg = "[" + topicName + "][" + subName + "] 
Transaction:" + txnId +
+                            " try to ack message:" + position + " already 
acked before.";
+                    log.error(errorMsg);
+                    throw new TransactionConflictException(errorMsg);
+                }
+
+                pendingAckMessageForCurrentTxn.add(position);
+                this.pendingAckMessages.add(position);
+            }
+        }
+    }
+
     private final MarkDeleteCallback markDeleteCallback = new 
MarkDeleteCallback() {
         @Override
         public void markDeleteComplete(Object ctx) {
@@ -706,12 +884,51 @@ public class PersistentSubscription implements 
Subscription {
 
     @Override
     public synchronized void redeliverUnacknowledgedMessages(Consumer 
consumer) {
-        dispatcher.redeliverUnacknowledgedMessages(consumer);
+        ConcurrentLongLongPairHashMap positionMap = consumer.getPendingAcks();
+        // Only check if message is in pending_ack status when there's ongoing 
transaction.
+        if (null != positionMap && ((pendingAckMessages != null && 
pendingAckMessages.size() != 0)
+                                                                            || 
pendingCumulativeAckMessage != null)) {
+            List<PositionImpl> pendingPositions = new ArrayList<>();
+            PositionImpl cumulativeAckPosition = (null == 
this.pendingCumulativeAckMessage) ? null :
+                    (PositionImpl) this.pendingCumulativeAckMessage;
+
+            positionMap.asMap().entrySet().forEach(entry -> {
+                PositionImpl position = new PositionImpl(entry.getKey().first, 
entry.getKey().second);
+                if ((pendingAckMessages == null || (pendingAckMessages != null 
&&
+                        !this.pendingAckMessages.contains(position))) &&
+                        (null == cumulativeAckPosition ||
+                                (null != cumulativeAckPosition && 
position.compareTo(cumulativeAckPosition) > 0))) {
+                    pendingPositions.add(position);
+                }
+            });
+
+            dispatcher.redeliverUnacknowledgedMessages(consumer, 
pendingPositions);
+        } else {
+            dispatcher.redeliverUnacknowledgedMessages(consumer);
+        }
     }
 
     @Override
     public synchronized void redeliverUnacknowledgedMessages(Consumer 
consumer, List<PositionImpl> positions) {
-        dispatcher.redeliverUnacknowledgedMessages(consumer, positions);
+        // If there's ongoing transaction.
+        if ((pendingAckMessages != null && pendingAckMessages.size() != 0) || 
pendingCumulativeAckMessage != null) {
+            // Check if message is in pending_ack status.
+            List<PositionImpl> pendingPositions = new ArrayList<>();
+            PositionImpl cumulativeAckPosition = (null == 
this.pendingCumulativeAckMessage) ? null :
+                    (PositionImpl) this.pendingCumulativeAckMessage;
+
+            positions.forEach(position -> {
+                if ((pendingAckMessages == null || (pendingAckMessages != null 
&&
+                        !this.pendingAckMessages.contains(position))) &&
+                        (null == cumulativeAckPosition ||
+                                (null != cumulativeAckPosition && 
position.compareTo(cumulativeAckPosition) > 0))) {
+                    pendingPositions.add(position);
+                }
+            });
+            dispatcher.redeliverUnacknowledgedMessages(consumer, 
pendingPositions);
+        } else {
+            dispatcher.redeliverUnacknowledgedMessages(consumer, positions);
+        }
     }
 
     @Override
@@ -744,6 +961,119 @@ public class PersistentSubscription implements 
Subscription {
     }
 
     /**
+     * Commit a transaction.
+     *
+     * @param txnId         {@link TxnID} to identify the transaction.
+     * @param properties    Additional user-defined properties that can be 
associated with a particular cursor position.
+     * @throws IllegalArgumentException if given {@link TxnID} is not found in 
this subscription.
+     */
+    public synchronized CompletableFuture<Void> commitTxn(TxnID txnId, 
Map<String,Long> properties) {
+
+        if (pendingAckMessagesMap != null && 
!this.pendingAckMessagesMap.containsKey(txnId)) {
+            String errorMsg = "[" + topicName + "][" + subName + "] 
Transaction with id:" + txnId + " not found.";
+            log.error(errorMsg);
+            throw new IllegalArgumentException(errorMsg);
+        }
+
+        CompletableFuture<Void> commitFuture = new CompletableFuture<>();
+        CompletableFuture<Void> deleteFuture = new CompletableFuture<>();
+        CompletableFuture<Void> marketDeleteFuture = new CompletableFuture<>();
+
+        MarkDeleteCallback markDeleteCallback = new MarkDeleteCallback() {
+            @Override
+            public void markDeleteComplete(Object ctx) {
+                PositionImpl pos = (PositionImpl) ctx;
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}][{}] Mark deleted messages until position 
{}", topicName, subName, pos);
+                }
+                marketDeleteFuture.complete(null);
+            }
+
+            @Override
+            public void markDeleteFailed(ManagedLedgerException exception, 
Object ctx) {
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}][{}] Failed to mark delete for position ", 
topicName, subName, ctx, exception);
+                }
+                marketDeleteFuture.completeExceptionally(exception);
+            }
+        };
+
+        DeleteCallback deleteCallback = new DeleteCallback() {
+            @Override
+            public void deleteComplete(Object position) {
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}][{}] Deleted message at {}", topicName, 
subName, position);
+                }
+                deleteFuture.complete(null);
+            }
+
+            @Override
+            public void deleteFailed(ManagedLedgerException exception, Object 
ctx) {
+                if (log.isDebugEnabled()) {
+                    log.warn("[{}][{}] Failed to delete message at {}", 
topicName, subName, ctx, exception);
+                }
+                deleteFuture.completeExceptionally(exception);
+            }
+        };
+
+        // It's valid to create transaction then commit without doing any 
operation, which will cause
+        // pendingAckMessagesMap to be null.
+        List<Position> positions = pendingAckMessagesMap != null ? 
this.pendingAckMessagesMap.remove(txnId).values() :
+                                                                               
              Collections.emptyList();
+        // Materialize all single acks.
+        cursor.asyncDelete(positions, deleteCallback, positions);
+        if (pendingAckMessages != null) {
+            positions.forEach(position -> 
this.pendingAckMessages.remove(position));
+        }
+
+        // Materialize cumulative ack.
+        cursor.asyncMarkDelete(this.pendingCumulativeAckMessage, (null == 
properties)?
+                Collections.emptyMap() : properties, markDeleteCallback, 
this.pendingCumulativeAckMessage);
+
+        // Reset txdID and position for cumulative ack.
+        PENDING_CUMULATIVE_ACK_TXNID_UPDATER.set(this, null);
+        POSITION_UPDATER.set(this, null);
+        deleteFuture.runAfterBoth(marketDeleteFuture, () -> 
commitFuture.complete(null))
+                    .exceptionally((exception) -> {
+                        commitFuture.completeExceptionally(exception);
+                        return null;
+                    });
+
+        return commitFuture;
+    }
+
+    /**
+     * Abort a transaction.
+     *
+     * @param txnId  {@link TxnID} to identify the transaction.
+     * @param consumer {@link Consumer} which aborting transaction.
+     *
+     * @throws IllegalArgumentException if given {@link TxnID} is not found in 
this subscription.
+     */
+
+    public synchronized CompletableFuture<Void> abortTxn(TxnID txnId, Consumer 
consumer) {
+        if (pendingAckMessagesMap != null && 
!this.pendingAckMessagesMap.containsKey(txnId)) {
+            String errorMsg = "[" + topicName + "][" + subName + "] 
Transaction with id:" + txnId + " not found.";
+            throw new IllegalArgumentException(errorMsg);
+        }
+
+        CompletableFuture<Void> abortFuture = new CompletableFuture<>();
+        ConcurrentOpenHashSet<Position> pendingAckMessageForCurrentTxn = 
pendingAckMessagesMap != null ?
+                this.pendingAckMessagesMap.remove(txnId) : new 
ConcurrentOpenHashSet();
+        if (pendingAckMessages != null) {
+            pendingAckMessageForCurrentTxn.forEach(position -> 
this.pendingAckMessages.remove(position));
+        }
+        // Reset txdID and position for cumulative ack.
+        PENDING_CUMULATIVE_ACK_TXNID_UPDATER.set(this, null);
+        POSITION_UPDATER.set(this, null);
+        dispatcher.redeliverUnacknowledgedMessages(consumer, 
(List<PositionImpl>)
+                                                                    
(List<?>)pendingAckMessageForCurrentTxn.values());
+        abortFuture.complete(null);
+
+        return abortFuture;
+    }
+
+    /**
      * Return a merged map that contains the cursor properties specified by 
used
      * (eg. when using compaction subscription) and the subscription 
properties.
      */
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
index 790505d..e06b249 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
@@ -28,6 +28,7 @@ import static org.testng.Assert.fail;
 import com.google.common.collect.Sets;
 
 import java.lang.reflect.Field;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -1307,7 +1308,8 @@ public class PersistentTopicE2ETest extends 
BrokerTestBase {
         final String subName = "sub2";
 
         Message<String> msg;
-        int totalMessages = 10;
+        List<Message<String>> unackedMessages = new ArrayList<>();
+        int totalMessages = 20;
 
         Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
                 .topic(topicName)
@@ -1326,12 +1328,11 @@ public class PersistentTopicE2ETest extends 
BrokerTestBase {
             producer.send("my-message-" + i);
         }
 
-        // (2) Consume and ack messages except first message
-        Message<String> unAckedMsg = null;
+        // (2) Consume and only ack last 10 messages
         for (int i = 0; i < totalMessages; i++) {
             msg = consumer.receive();
-            if (i == 0) {
-                unAckedMsg = msg;
+            if (i >= 10) {
+                unackedMessages.add(msg);
             } else {
                 consumer.acknowledge(msg);
             }
@@ -1339,13 +1340,17 @@ public class PersistentTopicE2ETest extends 
BrokerTestBase {
 
         consumer.redeliverUnacknowledgedMessages();
 
-        // Verify: msg [L:0] must be redelivered
-        try {
-            msg = consumer.receive(1, TimeUnit.SECONDS);
-            assertEquals(msg.getValue(), unAckedMsg.getValue());
-        } catch (Exception e) {
-            fail("msg should be redelivered ", e);
+        for (int i = 0; i < 10; i++) {
+            // Verify: msg [L:0] must be redelivered
+            try {
+                final Message<String> redeliveredMsg = consumer.receive(1, 
TimeUnit.SECONDS);
+                unackedMessages.removeIf(unackedMessage -> 
unackedMessage.getValue().equals(redeliveredMsg.getValue()));
+            } catch (Exception e) {
+                fail("msg should be redelivered ", e);
+            }
         }
+        // Make sure that first 10 messages that we didn't acknowledge get 
redelivered.
+        assertTrue(unackedMessages.size() == 0);
 
         // Verify no other messages are redelivered
         msg = consumer.receive(100, TimeUnit.MILLISECONDS);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
new file mode 100644
index 0000000..9e65326
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
@@ -0,0 +1,284 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.persistent;
+
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.cache.ConfigurationCacheService;
+import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
+import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.BrokerServiceException;
+import org.apache.pulsar.broker.service.Consumer;
+import org.apache.pulsar.broker.service.PersistentTopicTest;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.api.proto.PulsarApi;
+import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.compaction.Compactor;
+import 
org.apache.pulsar.transaction.common.exception.TransactionConflictException;
+import org.apache.pulsar.transaction.impl.common.TxnID;
+import org.apache.pulsar.zookeeper.ZooKeeperCache;
+import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
+import org.apache.zookeeper.ZooKeeper;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+import static 
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockBookKeeper;
+import static 
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockZooKeeper;
+import static org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyList;
+import static org.mockito.Matchers.anyMap;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+@PrepareForTest({ ZooKeeperDataCache.class, BrokerService.class })
+public class PersistentSubscriptionTest {
+
+    private PulsarService pulsarMock;
+    private BrokerService brokerMock;
+    private ManagedLedgerFactory mlFactoryMock;
+    private ManagedLedger ledgerMock;
+    private ManagedCursorImpl cursorMock;
+    private ConfigurationCacheService configCacheServiceMock;
+    private PersistentTopic topic;
+    private PersistentSubscription persistentSubscription;
+    private Consumer consumerMock;
+
+    final String successTopicName = 
"persistent://prop/use/ns-abc/successTopic";
+    final String subName = "subscriptionName";
+
+    final TxnID txnID1 = new TxnID(1,1);
+    final TxnID txnID2 = new TxnID(1,2);
+
+    private static final Logger log = 
LoggerFactory.getLogger(PersistentTopicTest.class);
+
+    @BeforeMethod
+    public void setup() throws Exception {
+        ServiceConfiguration svcConfig = spy(new ServiceConfiguration());
+        pulsarMock = spy(new PulsarService(svcConfig));
+        doReturn(svcConfig).when(pulsarMock).getConfiguration();
+        doReturn(mock(Compactor.class)).when(pulsarMock).getCompactor();
+
+        mlFactoryMock = mock(ManagedLedgerFactory.class);
+        doReturn(mlFactoryMock).when(pulsarMock).getManagedLedgerFactory();
+
+        ZooKeeper zkMock = createMockZooKeeper();
+        doReturn(zkMock).when(pulsarMock).getZkClient();
+        doReturn(createMockBookKeeper(zkMock, 
pulsarMock.getOrderedExecutor().chooseThread(0)))
+                .when(pulsarMock).getBookKeeperClient();
+
+        ZooKeeperCache cache = mock(ZooKeeperCache.class);
+        doReturn(30).when(cache).getZkOperationTimeoutSeconds();
+        CompletableFuture getDataFuture = new CompletableFuture();
+        getDataFuture.complete(Optional.empty());
+        doReturn(getDataFuture).when(cache).getDataAsync(anyString(), any(), 
any());
+        doReturn(cache).when(pulsarMock).getLocalZkCache();
+
+        configCacheServiceMock = mock(ConfigurationCacheService.class);
+        @SuppressWarnings("unchecked")
+        ZooKeeperDataCache<Policies> zkPoliciesDataCacheMock = 
mock(ZooKeeperDataCache.class);
+        
doReturn(zkPoliciesDataCacheMock).when(configCacheServiceMock).policiesCache();
+        
doReturn(configCacheServiceMock).when(pulsarMock).getConfigurationCache();
+        
doReturn(Optional.empty()).when(zkPoliciesDataCacheMock).get(anyString());
+
+        LocalZooKeeperCacheService zkCacheMock = 
mock(LocalZooKeeperCacheService.class);
+        
doReturn(CompletableFuture.completedFuture(Optional.empty())).when(zkPoliciesDataCacheMock).getAsync(any());
+        doReturn(zkPoliciesDataCacheMock).when(zkCacheMock).policiesCache();
+        doReturn(zkCacheMock).when(pulsarMock).getLocalZkCacheService();
+
+        brokerMock = spy(new BrokerService(pulsarMock));
+        doNothing().when(brokerMock).unloadNamespaceBundlesGracefully();
+        doReturn(brokerMock).when(pulsarMock).getBrokerService();
+
+        ledgerMock = mock(ManagedLedger.class);
+        cursorMock = mock(ManagedCursorImpl.class);
+        doReturn(new ArrayList<Object>()).when(ledgerMock).getCursors();
+        doReturn("mockCursor").when(cursorMock).getName();
+        doReturn(new PositionImpl(1, 
50)).when(cursorMock).getMarkDeletedPosition();
+
+        topic = new PersistentTopic(successTopicName, ledgerMock, brokerMock);
+
+        consumerMock = mock(Consumer.class);
+
+        persistentSubscription = new PersistentSubscription(topic, subName, 
cursorMock, false);
+    }
+
+    @AfterMethod
+    public void teardown() throws Exception {
+        brokerMock.close(); //to clear pulsarStats
+        try {
+            pulsarMock.close();
+        } catch (Exception e) {
+            log.warn("Failed to close pulsar service", e);
+            throw e;
+        }
+    }
+
+    @Test
+    public void testCanAcknowledgeAndCommitForTransaction() throws 
TransactionConflictException {
+        List<Position> expectedSinglePositions = new ArrayList<>();
+        expectedSinglePositions.add(new PositionImpl(1, 1));
+        expectedSinglePositions.add(new PositionImpl(1, 3));
+        expectedSinglePositions.add(new PositionImpl(1, 5));
+
+        doAnswer((invocationOnMock) -> {
+            
assertTrue(((List)invocationOnMock.getArguments()[0]).containsAll(expectedSinglePositions));
+            ((AsyncCallbacks.DeleteCallback) 
invocationOnMock.getArguments()[1])
+                    .deleteComplete(invocationOnMock.getArguments()[2]);
+            return null;
+        }).when(cursorMock).asyncDelete(anyList(), 
any(AsyncCallbacks.DeleteCallback.class), anyObject());
+
+        doAnswer((invocationOnMock) -> {
+            
assertTrue(((PositionImpl)invocationOnMock.getArguments()[0]).compareTo(new 
PositionImpl(3, 100)) == 0);
+            ((AsyncCallbacks.MarkDeleteCallback) 
invocationOnMock.getArguments()[2])
+                    .markDeleteComplete(invocationOnMock.getArguments()[3]);
+            return null;
+        }).when(cursorMock).asyncMarkDelete(anyObject(), anyObject(), 
any(AsyncCallbacks.MarkDeleteCallback.class), anyObject());
+
+        List<Position> positions = new ArrayList<>();
+        positions.add(new PositionImpl(1, 1));
+        positions.add(new PositionImpl(1, 3));
+        positions.add(new PositionImpl(1, 5));
+
+        // Single ack for txn
+        persistentSubscription.acknowledgeMessage(txnID1, positions, 
AckType.Individual);
+
+        positions.clear();
+        positions.add(new PositionImpl(3, 100));
+
+        // Cumulative ack for txn
+        persistentSubscription.acknowledgeMessage(txnID1, positions, 
AckType.Cumulative);
+
+        // Commit txn
+        persistentSubscription.commitTxn(txnID1, Collections.emptyMap());
+
+        // Verify corresponding ledger method was called with expected args.
+        verify(cursorMock, times(1)).asyncDelete(anyList(), any(), any());
+        verify(cursorMock, times(1)).asyncMarkDelete(any(), anyMap(), 
anyObject(), any());
+    }
+
+    @Test
+    public void testCanAcknowledgeAndAbortForTransaction() throws 
TransactionConflictException, BrokerServiceException {
+        List<Position> positions = new ArrayList<>();
+        positions.add(new PositionImpl(2, 1));
+        positions.add(new PositionImpl(2, 3));
+        positions.add(new PositionImpl(2, 5));
+
+        Position[] expectedSinglePositions = {new PositionImpl(3, 1),
+                                        new PositionImpl(3, 3), new 
PositionImpl(3, 5)};
+
+        doAnswer((invocationOnMock) -> {
+            
assertTrue(Arrays.deepEquals(((List)invocationOnMock.getArguments()[0]).toArray(),
 expectedSinglePositions));
+            ((AsyncCallbacks.DeleteCallback) 
invocationOnMock.getArguments()[1])
+                    .deleteComplete(invocationOnMock.getArguments()[2]);
+            return null;
+        }).when(cursorMock).asyncDelete(anyList(), 
any(AsyncCallbacks.DeleteCallback.class), anyObject());
+
+        
doReturn(PulsarApi.CommandSubscribe.SubType.Exclusive).when(consumerMock).subType();
+
+        persistentSubscription.addConsumer(consumerMock);
+
+        // Single ack for txn1
+        persistentSubscription.acknowledgeMessage(txnID1, positions, 
AckType.Individual);
+
+        positions.clear();
+        positions.add(new PositionImpl(1, 100));
+
+        // Cumulative ack for txn1
+        persistentSubscription.acknowledgeMessage(txnID1, positions, 
AckType.Cumulative);
+
+        positions.clear();
+        positions.add(new PositionImpl(2, 1));
+
+        // Can not single ack message already acked.
+        try {
+            persistentSubscription.acknowledgeMessage(txnID2, positions, 
AckType.Individual);
+            fail("Single acknowledge for transaction2 should fail. ");
+        } catch (TransactionConflictException e) {
+            
assertEquals(e.getMessage(),"[persistent://prop/use/ns-abc/successTopic][subscriptionName]
 " +
+                    "Transaction:(1,2) try to ack message:2:1 in pending ack 
status.");
+        }
+
+        positions.clear();
+        positions.add(new PositionImpl(2, 50));
+
+        // Can not cumulative ack message for another txn.
+        try {
+            persistentSubscription.acknowledgeMessage(txnID2, positions, 
AckType.Cumulative);
+            fail("Cumulative acknowledge for transaction2 should fail. ");
+        } catch (TransactionConflictException e) {
+            System.out.println(e.getMessage());
+            
assertEquals(e.getMessage(),"[persistent://prop/use/ns-abc/successTopic][subscriptionName]
 " +
+                "Transaction:(1,2) try to cumulative ack message while 
transaction:(1,1) already cumulative acked messages.");
+        }
+
+        positions.clear();
+        positions.add(new PositionImpl(1, 1));
+        positions.add(new PositionImpl(1, 3));
+        positions.add(new PositionImpl(1, 5));
+        positions.add(new PositionImpl(3, 1));
+        positions.add(new PositionImpl(3, 3));
+        positions.add(new PositionImpl(3, 5));
+
+        // Acknowledge from normal consumer will succeed ignoring message 
acked by ongoing transaction.
+        persistentSubscription.acknowledgeMessage(positions, 
AckType.Individual, Collections.emptyMap());
+
+        //Abort txn.
+        persistentSubscription.abortTxn(txnID1, consumerMock);
+
+        positions.clear();
+        positions.add(new PositionImpl(2, 50));
+
+        // Retry above ack, will succeed. As abort has clear pending_ack for 
those messages.
+        persistentSubscription.acknowledgeMessage(txnID2, positions, 
AckType.Cumulative);
+
+        positions.clear();
+        positions.add(new PositionImpl(2, 1));
+
+        persistentSubscription.acknowledgeMessage(txnID2, positions, 
AckType.Individual);
+    }
+}
diff --git a/pulsar-transaction/common/pom.xml 
b/pulsar-transaction/common/pom.xml
index d09bf2b..5f0d5ef 100644
--- a/pulsar-transaction/common/pom.xml
+++ b/pulsar-transaction/common/pom.xml
@@ -69,4 +69,4 @@
             </plugin>
         </plugins>
     </build>
-</project>
+</project>
\ No newline at end of file
diff --git 
a/pulsar-transaction/common/src/main/java/org/apache/pulsar/transaction/impl/common/TxnID.java
 
b/pulsar-transaction/common/src/main/java/org/apache/pulsar/transaction/common/exception/TransactionConflictException.java
similarity index 63%
copy from 
pulsar-transaction/common/src/main/java/org/apache/pulsar/transaction/impl/common/TxnID.java
copy to 
pulsar-transaction/common/src/main/java/org/apache/pulsar/transaction/common/exception/TransactionConflictException.java
index 16ce4e4..be1350a 100644
--- 
a/pulsar-transaction/common/src/main/java/org/apache/pulsar/transaction/impl/common/TxnID.java
+++ 
b/pulsar-transaction/common/src/main/java/org/apache/pulsar/transaction/common/exception/TransactionConflictException.java
@@ -16,33 +16,17 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.transaction.impl.common;
-
-import com.google.common.annotations.Beta;
-import java.io.Serializable;
-import lombok.Data;
+package org.apache.pulsar.transaction.common.exception;
 
 /**
- * An identifier for representing a transaction.
+ * Exception thrown when a transaction try to acknowledge message when it 
shouldn't.
+ *
  */
-@Beta
-@Data
-public class TxnID implements Serializable {
+public class TransactionConflictException extends Exception {
 
     private static final long serialVersionUID = 0L;
 
-    /*
-     * The most significant 64 bits of this TxnID.
-     *
-     * @serial
-     */
-    private final long mostSigBits;
-
-    /*
-     * The least significant 64 bits of this TxnID.
-     *
-     * @serial
-     */
-    private final long leastSigBits;
-
-}
+    public TransactionConflictException(String message) {
+        super(message);
+    }
+}
\ No newline at end of file
diff --git 
a/pulsar-transaction/common/src/main/java/org/apache/pulsar/transaction/impl/common/TxnID.java
 
b/pulsar-transaction/common/src/main/java/org/apache/pulsar/transaction/common/exception/package-info.java
similarity index 59%
copy from 
pulsar-transaction/common/src/main/java/org/apache/pulsar/transaction/impl/common/TxnID.java
copy to 
pulsar-transaction/common/src/main/java/org/apache/pulsar/transaction/common/exception/package-info.java
index 16ce4e4..70fff2c 100644
--- 
a/pulsar-transaction/common/src/main/java/org/apache/pulsar/transaction/impl/common/TxnID.java
+++ 
b/pulsar-transaction/common/src/main/java/org/apache/pulsar/transaction/common/exception/package-info.java
@@ -16,33 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.transaction.impl.common;
-
-import com.google.common.annotations.Beta;
-import java.io.Serializable;
-import lombok.Data;
-
 /**
- * An identifier for representing a transaction.
+ * Common exception used by pulsar transaction related modules.
  */
-@Beta
-@Data
-public class TxnID implements Serializable {
-
-    private static final long serialVersionUID = 0L;
-
-    /*
-     * The most significant 64 bits of this TxnID.
-     *
-     * @serial
-     */
-    private final long mostSigBits;
-
-    /*
-     * The least significant 64 bits of this TxnID.
-     *
-     * @serial
-     */
-    private final long leastSigBits;
-
-}
+package org.apache.pulsar.transaction.common.exception;
\ No newline at end of file
diff --git 
a/pulsar-transaction/common/src/main/java/org/apache/pulsar/transaction/impl/common/TxnID.java
 
b/pulsar-transaction/common/src/main/java/org/apache/pulsar/transaction/impl/common/TxnID.java
index 16ce4e4..6321ac6 100644
--- 
a/pulsar-transaction/common/src/main/java/org/apache/pulsar/transaction/impl/common/TxnID.java
+++ 
b/pulsar-transaction/common/src/main/java/org/apache/pulsar/transaction/impl/common/TxnID.java
@@ -45,4 +45,8 @@ public class TxnID implements Serializable {
      */
     private final long leastSigBits;
 
+    @Override
+    public String toString() {
+        return "(" + mostSigBits + "," + leastSigBits + ")";
+    }
 }
diff --git a/pulsar-transaction/pom.xml b/pulsar-transaction/pom.xml
index 9d28137..4189805 100644
--- a/pulsar-transaction/pom.xml
+++ b/pulsar-transaction/pom.xml
@@ -60,4 +60,4 @@
     </plugins>
   </build>
 
-</project>
+</project>
\ No newline at end of file

Reply via email to