congbobo184 commented on code in PR #17847:
URL: https://github.com/apache/pulsar/pull/17847#discussion_r1008807202


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java:
##########
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.buffer.impl;
+
+import io.netty.util.Timer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.commons.collections4.map.LinkedMap;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.systopic.SystemTopicClient;
+import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
+import org.apache.pulsar.broker.transaction.buffer.metadata.AbortTxnMetadata;
+import 
org.apache.pulsar.broker.transaction.buffer.metadata.TransactionBufferSnapshot;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.FutureUtil;
+
+@Slf4j
+public class SingleSnapshotAbortedTxnProcessorImpl implements 
AbortedTxnProcessor {
+    private final PersistentTopic topic;
+    private final 
CompletableFuture<SystemTopicClient.Writer<TransactionBufferSnapshot>> 
takeSnapshotWriter;
+    private volatile PositionImpl maxReadPosition;

Review Comment:
   can delete right?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/AbortedTxnProcessor.java:
##########
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.buffer;
+
+import io.netty.util.TimerTask;
+import java.util.concurrent.CompletableFuture;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.client.api.transaction.TxnID;
+
+
+public interface AbortedTxnProcessor {
+
+    /**
+     * After the transaction buffer writes a transaction aborted mark to the 
topic,
+     * the transaction buffer will add the aborted transaction ID to 
AbortedTxnProcessor.
+     * @param txnID aborted transaction ID.
+     */
+    void appendAbortedTxn(TxnID txnID, PositionImpl position);
+
+    /**
+     * Pulsar has a configuration for ledger retention time.
+     * If the transaction aborted mark position has been deleted, the 
transaction is valid and can be clear.

Review Comment:
   Clean up invalid aborted transactions.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java:
##########
@@ -0,0 +1,675 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,2
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.buffer.impl;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.util.Timeout;
+import io.netty.util.Timer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.function.Supplier;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.bookkeeper.mledger.impl.ReadOnlyManagedLedgerImpl;
+import org.apache.commons.lang3.tuple.MutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.systopic.SystemTopicClient;
+import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
+import 
org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotIndex;
+import 
org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotIndexes;
+import 
org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotIndexesMetadata;
+import 
org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotSegment;
+import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TxnIDData;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.common.events.EventType;
+import org.apache.pulsar.common.naming.TopicDomain;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.util.FutureUtil;
+
+@Slf4j
+public class SnapshotSegmentAbortedTxnProcessorImpl implements 
AbortedTxnProcessor {
+
+    //Store the latest aborted transaction IDs and the latest max read 
position.
+    private PositionImpl maxReadPosition;
+    private ArrayList<TxnID> unsealedAbortedTxnIdSegment = new ArrayList<>();
+
+    //Store the fixed aborted transaction segment
+    private final ConcurrentSkipListMap<PositionImpl, ArrayList<TxnID>> 
abortTxnSegments
+            = new ConcurrentSkipListMap<>();
+
+    private final ConcurrentSkipListMap<PositionImpl, 
TransactionBufferSnapshotIndex> indexes
+            = new ConcurrentSkipListMap<>();
+    //The latest persistent snapshot index. This is used to combine new 
segment indexes with the latest metadata and
+    // indexes.
+    private TransactionBufferSnapshotIndexes persistentSnapshotIndexes = new 
TransactionBufferSnapshotIndexes();
+
+    private final Timer timer;
+
+    private final PersistentTopic topic;
+
+    //When add abort or change max read position, the count will +1. Take 
snapshot will set 0 into it.
+    private final AtomicLong changeMaxReadPositionAndAddAbortTimes = new 
AtomicLong();
+
+    private volatile long lastSnapshotTimestamps;
+
+    //Configurations
+    private final int takeSnapshotIntervalNumber;
+
+    private final int takeSnapshotIntervalTime;
+
+    private final int transactionBufferMaxAbortedTxnsOfSnapshotSegment;
+    private final PersistentWorker persistentWorker;
+
+    public SnapshotSegmentAbortedTxnProcessorImpl(PersistentTopic topic) {
+        this.topic = topic;
+        this.persistentWorker = new PersistentWorker(topic);
+        this.maxReadPosition = (PositionImpl) 
topic.getManagedLedger().getLastConfirmedEntry();
+        this.takeSnapshotIntervalNumber = topic.getBrokerService().getPulsar()
+                
.getConfiguration().getTransactionBufferSnapshotMaxTransactionCount();
+        this.takeSnapshotIntervalTime = topic.getBrokerService().getPulsar()
+                
.getConfiguration().getTransactionBufferSnapshotMinTimeInMillis();
+        this.transactionBufferMaxAbortedTxnsOfSnapshotSegment =  
topic.getBrokerService().getPulsar()
+                .getConfiguration().getTransactionBufferSnapshotSegmentSize();
+        this.timer = 
topic.getBrokerService().getPulsar().getTransactionTimer();
+    }
+
+    @Override
+    public void appendAbortedTxn(TxnID abortedTxnId, PositionImpl 
maxReadPosition) {
+        unsealedAbortedTxnIdSegment.add(abortedTxnId);
+        //The size of lastAbortedTxns reaches the configuration of the size of 
snapshot segment.
+        if (unsealedAbortedTxnIdSegment.size() == 
transactionBufferMaxAbortedTxnsOfSnapshotSegment) {
+            changeMaxReadPositionAndAddAbortTimes.set(0);
+            abortTxnSegments.put(maxReadPosition, unsealedAbortedTxnIdSegment);
+            
persistentWorker.appendTask(PersistentWorker.OperationType.WriteSegment, () ->
+                    
persistentWorker.takeSnapshotSegmentAsync(unsealedAbortedTxnIdSegment, 
maxReadPosition));
+            unsealedAbortedTxnIdSegment = new ArrayList<>();
+        }
+    }
+
+    @Override
+    public void updateMaxReadPosition(Position position) {
+        if (position != this.maxReadPosition) {
+            this.maxReadPosition = (PositionImpl) position;
+            updateSnapshotIndexMetadataByChangeTimes();
+        }
+    }
+
+    @Override
+    public void updateMaxReadPositionNotIncreaseChangeTimes(Position 
maxReadPosition) {
+        this.maxReadPosition = (PositionImpl) maxReadPosition;
+    }
+
+    @Override
+    public boolean checkAbortedTransaction(TxnID txnID, Position readPosition) 
{
+        if (readPosition == null) {
+            return abortTxnSegments.values().stream()
+                    .anyMatch(list -> list.contains(txnID)) || 
unsealedAbortedTxnIdSegment.contains(txnID);
+        } else {
+            PositionImpl maxReadPosition = 
abortTxnSegments.ceilingKey((PositionImpl) readPosition);

Review Comment:
   If it does not exist in the segment, we directly check unsealed segment is ok



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java:
##########
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.buffer.impl;
+
+import io.netty.util.Timer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.commons.collections4.map.LinkedMap;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.systopic.SystemTopicClient;
+import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
+import org.apache.pulsar.broker.transaction.buffer.metadata.AbortTxnMetadata;
+import 
org.apache.pulsar.broker.transaction.buffer.metadata.TransactionBufferSnapshot;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.FutureUtil;
+
+@Slf4j
+public class SingleSnapshotAbortedTxnProcessorImpl implements 
AbortedTxnProcessor {
+    private final PersistentTopic topic;
+    private final 
CompletableFuture<SystemTopicClient.Writer<TransactionBufferSnapshot>> 
takeSnapshotWriter;
+    private volatile PositionImpl maxReadPosition;
+    /**
+     * Aborts, map for jude message is aborted, linked for remove abort txn in 
memory when this
+     * position have been deleted.
+     */
+    private final LinkedMap<TxnID, PositionImpl> aborts = new LinkedMap<>();
+
+    private volatile long lastSnapshotTimestamps;
+    private final int takeSnapshotIntervalNumber;
+
+    private final int takeSnapshotIntervalTime;
+
+
+    // when add abort or change max read position, the count will +1. Take 
snapshot will set 0 into it.
+    private final AtomicLong changeMaxReadPositionAndAddAbortTimes = new 
AtomicLong();
+
+
+    public SingleSnapshotAbortedTxnProcessorImpl(PersistentTopic topic) {
+        this.topic = topic;
+        this.maxReadPosition = (PositionImpl) 
topic.getManagedLedger().getLastConfirmedEntry();
+        this.takeSnapshotWriter = this.topic.getBrokerService().getPulsar()
+                .getTransactionBufferSnapshotServiceFactory()
+                
.getTxnBufferSnapshotService().createWriter(TopicName.get(topic.getName()));
+        this.takeSnapshotIntervalNumber = topic.getBrokerService().getPulsar()
+                
.getConfiguration().getTransactionBufferSnapshotMaxTransactionCount();
+        this.takeSnapshotIntervalTime = topic.getBrokerService().getPulsar()
+                
.getConfiguration().getTransactionBufferSnapshotMinTimeInMillis();
+    }
+
+    @Override
+    public void appendAbortedTxn(TxnID abortedTxnId, PositionImpl position) {
+        aborts.put(abortedTxnId, position);
+    }
+
+    @Override
+    public void trimExpiredAbortedTxns() {
+        while (!aborts.isEmpty() && !((ManagedLedgerImpl) 
topic.getManagedLedger())
+                .ledgerExists(aborts.get(aborts.firstKey()).getLedgerId())) {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Topic transaction buffer clear aborted 
transaction, TxnId : {}, Position : {}",
+                        topic.getName(), aborts.firstKey(), 
aborts.get(aborts.firstKey()));
+            }
+            aborts.remove(aborts.firstKey());
+        }
+    }
+
+    @Override
+    public boolean checkAbortedTransaction(TxnID txnID, Position readPosition) 
{
+        return aborts.containsKey(txnID);
+    }
+
+
+    @Override
+    public CompletableFuture<PositionImpl> recoverFromSnapshot() {
+        return 
topic.getBrokerService().getPulsar().getTransactionBufferSnapshotServiceFactory()
+                .getTxnBufferSnapshotService()
+                
.createReader(TopicName.get(topic.getName())).thenComposeAsync(reader -> {
+                    PositionImpl startReadCursorPosition = null;
+                    try {
+                        while (reader.hasMoreEvents()) {
+                            Message<TransactionBufferSnapshot> message = 
reader.readNext();
+                            if (topic.getName().equals(message.getKey())) {
+                                TransactionBufferSnapshot 
transactionBufferSnapshot = message.getValue();
+                                if (transactionBufferSnapshot != null) {
+                                    handleSnapshot(transactionBufferSnapshot);
+                                    startReadCursorPosition = PositionImpl.get(
+                                            
transactionBufferSnapshot.getMaxReadPositionLedgerId(),
+                                            
transactionBufferSnapshot.getMaxReadPositionEntryId());
+                                }
+                            }
+                        }
+                        closeReader(reader);
+                        return 
CompletableFuture.completedFuture(startReadCursorPosition);
+                    } catch (Exception ex) {
+                        log.error("[{}] Transaction buffer recover fail when 
read "
+                                + "transactionBufferSnapshot!", 
topic.getName(), ex);
+                        closeReader(reader);
+                        return FutureUtil.failedFuture(ex);
+                    }
+
+                },  
topic.getBrokerService().getPulsar().getTransactionExecutorProvider()
+                        .getExecutor(this));
+    }
+
+    @Override
+    public CompletableFuture<Void> clearAndCloseAsync() {

Review Comment:
   This method is only for deleting snapshots, it doesn't need close right?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/AbortedTxnProcessor.java:
##########
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.buffer;
+
+import io.netty.util.TimerTask;
+import java.util.concurrent.CompletableFuture;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.client.api.transaction.TxnID;
+
+
+public interface AbortedTxnProcessor {
+
+    /**
+     * After the transaction buffer writes a transaction aborted mark to the 
topic,
+     * the transaction buffer will add the aborted transaction ID to 
AbortedTxnProcessor.

Review Comment:
   ```suggestion
        * After the transaction buffer writes a transaction aborted marker to 
the topic, the transaction buffer will put the aborted txnID and the aborted 
marker position to AbortedTxnProcessor.
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java:
##########
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.buffer.impl;
+
+import io.netty.util.Timer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.commons.collections4.map.LinkedMap;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.systopic.SystemTopicClient;
+import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
+import org.apache.pulsar.broker.transaction.buffer.metadata.AbortTxnMetadata;
+import 
org.apache.pulsar.broker.transaction.buffer.metadata.TransactionBufferSnapshot;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.FutureUtil;
+
+@Slf4j
+public class SingleSnapshotAbortedTxnProcessorImpl implements 
AbortedTxnProcessor {
+    private final PersistentTopic topic;
+    private final 
CompletableFuture<SystemTopicClient.Writer<TransactionBufferSnapshot>> 
takeSnapshotWriter;
+    private volatile PositionImpl maxReadPosition;
+    /**
+     * Aborts, map for jude message is aborted, linked for remove abort txn in 
memory when this
+     * position have been deleted.
+     */
+    private final LinkedMap<TxnID, PositionImpl> aborts = new LinkedMap<>();
+
+    private volatile long lastSnapshotTimestamps;
+    private final int takeSnapshotIntervalNumber;
+
+    private final int takeSnapshotIntervalTime;
+
+
+    // when add abort or change max read position, the count will +1. Take 
snapshot will set 0 into it.
+    private final AtomicLong changeMaxReadPositionAndAddAbortTimes = new 
AtomicLong();

Review Comment:
   also can delete



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/AbortedTxnProcessor.java:
##########
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.buffer;
+
+import io.netty.util.TimerTask;
+import java.util.concurrent.CompletableFuture;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.client.api.transaction.TxnID;
+
+
+public interface AbortedTxnProcessor {
+
+    /**
+     * After the transaction buffer writes a transaction aborted mark to the 
topic,
+     * the transaction buffer will add the aborted transaction ID to 
AbortedTxnProcessor.
+     * @param txnID aborted transaction ID.
+     */
+    void appendAbortedTxn(TxnID txnID, PositionImpl position);
+
+    /**
+     * Pulsar has a configuration for ledger retention time.
+     * If the transaction aborted mark position has been deleted, the 
transaction is valid and can be clear.
+     * In the old implementation we clear the invalid aborted txn ID one by 
one.
+     * In the new implementation, we adopt snapshot segments. And then we 
clear invalid segment by its max read position.
+     */
+    void trimExpiredAbortedTxns();
+
+    /**
+     * Check whether the transaction ID is an aborted transaction ID.
+     * @param txnID the transaction ID that needs to be checked.
+     * @param readPosition the read position of the transaction message, can 
be used to find the segment.
+     * @return a boolean, whether the transaction ID is an aborted transaction 
ID.
+     */
+    boolean checkAbortedTransaction(TxnID txnID, Position readPosition);
+
+    /**
+     * Recover transaction buffer by transaction buffer snapshot.
+     * @return a pair consists of a Boolean if the transaction buffer needs to 
recover and a Position (startReadCursorPosition) determiner where to start to 
recover in the original topic.

Review Comment:
   only return the position, not use the pair



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/AbortedTxnProcessor.java:
##########
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.buffer;
+
+import io.netty.util.TimerTask;
+import java.util.concurrent.CompletableFuture;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.client.api.transaction.TxnID;
+
+
+public interface AbortedTxnProcessor {
+
+    /**
+     * After the transaction buffer writes a transaction aborted mark to the 
topic,
+     * the transaction buffer will add the aborted transaction ID to 
AbortedTxnProcessor.
+     * @param txnID aborted transaction ID.

Review Comment:
   ```suggestion
        * @param txnID aborted transaction ID.
        * @param position the position of the abort txnID
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java:
##########
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.buffer.impl;
+
+import io.netty.util.Timer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.commons.collections4.map.LinkedMap;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.systopic.SystemTopicClient;
+import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
+import org.apache.pulsar.broker.transaction.buffer.metadata.AbortTxnMetadata;
+import 
org.apache.pulsar.broker.transaction.buffer.metadata.TransactionBufferSnapshot;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.FutureUtil;
+
+@Slf4j
+public class SingleSnapshotAbortedTxnProcessorImpl implements 
AbortedTxnProcessor {
+    private final PersistentTopic topic;
+    private final 
CompletableFuture<SystemTopicClient.Writer<TransactionBufferSnapshot>> 
takeSnapshotWriter;
+    private volatile PositionImpl maxReadPosition;
+    /**
+     * Aborts, map for jude message is aborted, linked for remove abort txn in 
memory when this
+     * position have been deleted.
+     */
+    private final LinkedMap<TxnID, PositionImpl> aborts = new LinkedMap<>();
+
+    private volatile long lastSnapshotTimestamps;
+    private final int takeSnapshotIntervalNumber;
+
+    private final int takeSnapshotIntervalTime;
+
+
+    // when add abort or change max read position, the count will +1. Take 
snapshot will set 0 into it.
+    private final AtomicLong changeMaxReadPositionAndAddAbortTimes = new 
AtomicLong();
+
+
+    public SingleSnapshotAbortedTxnProcessorImpl(PersistentTopic topic) {
+        this.topic = topic;
+        this.maxReadPosition = (PositionImpl) 
topic.getManagedLedger().getLastConfirmedEntry();
+        this.takeSnapshotWriter = this.topic.getBrokerService().getPulsar()
+                .getTransactionBufferSnapshotServiceFactory()
+                
.getTxnBufferSnapshotService().createWriter(TopicName.get(topic.getName()));
+        this.takeSnapshotIntervalNumber = topic.getBrokerService().getPulsar()
+                
.getConfiguration().getTransactionBufferSnapshotMaxTransactionCount();
+        this.takeSnapshotIntervalTime = topic.getBrokerService().getPulsar()
+                
.getConfiguration().getTransactionBufferSnapshotMinTimeInMillis();
+    }
+
+    @Override
+    public void appendAbortedTxn(TxnID abortedTxnId, PositionImpl position) {
+        aborts.put(abortedTxnId, position);
+    }
+
+    @Override
+    public void trimExpiredAbortedTxns() {
+        while (!aborts.isEmpty() && !((ManagedLedgerImpl) 
topic.getManagedLedger())
+                .ledgerExists(aborts.get(aborts.firstKey()).getLedgerId())) {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Topic transaction buffer clear aborted 
transaction, TxnId : {}, Position : {}",
+                        topic.getName(), aborts.firstKey(), 
aborts.get(aborts.firstKey()));
+            }
+            aborts.remove(aborts.firstKey());
+        }
+    }
+
+    @Override
+    public boolean checkAbortedTransaction(TxnID txnID, Position readPosition) 
{
+        return aborts.containsKey(txnID);
+    }
+
+
+    @Override
+    public CompletableFuture<PositionImpl> recoverFromSnapshot() {
+        return 
topic.getBrokerService().getPulsar().getTransactionBufferSnapshotServiceFactory()
+                .getTxnBufferSnapshotService()
+                
.createReader(TopicName.get(topic.getName())).thenComposeAsync(reader -> {
+                    PositionImpl startReadCursorPosition = null;
+                    try {
+                        while (reader.hasMoreEvents()) {
+                            Message<TransactionBufferSnapshot> message = 
reader.readNext();
+                            if (topic.getName().equals(message.getKey())) {
+                                TransactionBufferSnapshot 
transactionBufferSnapshot = message.getValue();
+                                if (transactionBufferSnapshot != null) {
+                                    handleSnapshot(transactionBufferSnapshot);
+                                    startReadCursorPosition = PositionImpl.get(
+                                            
transactionBufferSnapshot.getMaxReadPositionLedgerId(),
+                                            
transactionBufferSnapshot.getMaxReadPositionEntryId());
+                                }
+                            }
+                        }
+                        closeReader(reader);
+                        return 
CompletableFuture.completedFuture(startReadCursorPosition);
+                    } catch (Exception ex) {
+                        log.error("[{}] Transaction buffer recover fail when 
read "
+                                + "transactionBufferSnapshot!", 
topic.getName(), ex);
+                        closeReader(reader);
+                        return FutureUtil.failedFuture(ex);
+                    }
+
+                },  
topic.getBrokerService().getPulsar().getTransactionExecutorProvider()
+                        .getExecutor(this));
+    }
+
+    @Override
+    public CompletableFuture<Void> clearAndCloseAsync() {
+        return this.takeSnapshotWriter.thenCompose(writer -> {
+            TransactionBufferSnapshot snapshot = new 
TransactionBufferSnapshot();
+            snapshot.setTopicName(topic.getName());
+            return writer.deleteAsync(snapshot.getTopicName(), snapshot);
+        }).thenRun(this::closeAsync);
+    }
+
+    @Override
+    public CompletableFuture<Void> takeAbortedTxnSnapshot(PositionImpl 
maxReadPosition) {
+        return takeAbortedTxnSnapshot(maxReadPosition, aborts);
+    }
+
+    @Override
+    public long getLastSnapshotTimestamps() {
+        return this.lastSnapshotTimestamps;
+    }
+
+    @Override
+    public CompletableFuture<Void> closeAsync() {
+        return 
takeSnapshotWriter.thenCompose(SystemTopicClient.Writer::closeAsync);
+    }
+
+    private void 
closeReader(SystemTopicClient.Reader<TransactionBufferSnapshot> reader) {
+        reader.closeAsync().exceptionally(e -> {
+            log.error("[{}]Transaction buffer reader close error!", 
topic.getName(), e);
+            return null;
+        });
+    }
+
+    private void handleSnapshot(TransactionBufferSnapshot snapshot) {
+        maxReadPosition = 
PositionImpl.get(snapshot.getMaxReadPositionLedgerId(),
+                snapshot.getMaxReadPositionEntryId());
+        if (snapshot.getAborts() != null) {
+            snapshot.getAborts().forEach(abortTxnMetadata ->
+                    aborts.put(new TxnID(abortTxnMetadata.getTxnIdMostBits(),
+                                    abortTxnMetadata.getTxnIdLeastBits()),
+                            PositionImpl.get(abortTxnMetadata.getLedgerId(),
+                                    abortTxnMetadata.getEntryId())));
+        }
+    }
+
+    private CompletableFuture<Void> takeAbortedTxnSnapshot(PositionImpl 
maxReadPosition, LinkedMap<TxnID, PositionImpl> aborts) {
+        changeMaxReadPositionAndAddAbortTimes.set(0);
+        return takeSnapshotWriter.thenCompose(writer -> {
+            TransactionBufferSnapshot snapshot = new 
TransactionBufferSnapshot();
+            synchronized (topic) {
+                snapshot.setTopicName(topic.getName());
+                
snapshot.setMaxReadPositionLedgerId(maxReadPosition.getLedgerId());
+                
snapshot.setMaxReadPositionEntryId(maxReadPosition.getEntryId());
+                List<AbortTxnMetadata> list = new ArrayList<>();
+                aborts.forEach((k, v) -> {
+                    AbortTxnMetadata abortTxnMetadata = new AbortTxnMetadata();
+                    abortTxnMetadata.setTxnIdMostBits(k.getMostSigBits());
+                    abortTxnMetadata.setTxnIdLeastBits(k.getLeastSigBits());
+                    abortTxnMetadata.setLedgerId(v.getLedgerId());
+                    abortTxnMetadata.setEntryId(v.getEntryId());
+                    list.add(abortTxnMetadata);
+                });
+                snapshot.setAborts(list);
+            }
+            return writer.writeAsync(snapshot.getTopicName(), 
snapshot).thenAccept(messageId -> {
+                this.lastSnapshotTimestamps = System.currentTimeMillis();
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}]Transaction buffer take snapshot success! "
+                            + "messageId : {}", topic.getName(), messageId);
+                }
+            }).exceptionally(e -> {
+                log.warn("[{}]Transaction buffer take snapshot fail! ", 
topic.getName(), e);

Review Comment:
   ```suggestion
                   log.warn("[{}]Transaction buffer take snapshot fail! ", 
topic.getName(), e.getCause());
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/AbortedTxnProcessor.java:
##########
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.buffer;
+
+import io.netty.util.TimerTask;
+import java.util.concurrent.CompletableFuture;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.client.api.transaction.TxnID;
+
+
+public interface AbortedTxnProcessor {
+
+    /**
+     * After the transaction buffer writes a transaction aborted mark to the 
topic,
+     * the transaction buffer will add the aborted transaction ID to 
AbortedTxnProcessor.
+     * @param txnID aborted transaction ID.
+     */
+    void appendAbortedTxn(TxnID txnID, PositionImpl position);
+
+    /**
+     * Pulsar has a configuration for ledger retention time.
+     * If the transaction aborted mark position has been deleted, the 
transaction is valid and can be clear.
+     * In the old implementation we clear the invalid aborted txn ID one by 
one.
+     * In the new implementation, we adopt snapshot segments. And then we 
clear invalid segment by its max read position.

Review Comment:
   ```suggestion
   ```
   you can add the note in the implementation class method



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/AbortedTxnProcessor.java:
##########
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.buffer;
+
+import io.netty.util.TimerTask;
+import java.util.concurrent.CompletableFuture;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.client.api.transaction.TxnID;
+
+
+public interface AbortedTxnProcessor {
+
+    /**
+     * After the transaction buffer writes a transaction aborted mark to the 
topic,
+     * the transaction buffer will add the aborted transaction ID to 
AbortedTxnProcessor.
+     * @param txnID aborted transaction ID.
+     */
+    void appendAbortedTxn(TxnID txnID, PositionImpl position);

Review Comment:
   ```suggestion
       void putAbortedTxnAndPosition(TxnID txnID, PositionImpl position);
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/AbortedTxnProcessor.java:
##########
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.buffer;
+
+import io.netty.util.TimerTask;

Review Comment:
   delete this import



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java:
##########
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.buffer.impl;
+
+import io.netty.util.Timer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.commons.collections4.map.LinkedMap;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.systopic.SystemTopicClient;
+import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
+import org.apache.pulsar.broker.transaction.buffer.metadata.AbortTxnMetadata;
+import 
org.apache.pulsar.broker.transaction.buffer.metadata.TransactionBufferSnapshot;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.FutureUtil;
+
+@Slf4j
+public class SingleSnapshotAbortedTxnProcessorImpl implements 
AbortedTxnProcessor {
+    private final PersistentTopic topic;
+    private final 
CompletableFuture<SystemTopicClient.Writer<TransactionBufferSnapshot>> 
takeSnapshotWriter;
+    private volatile PositionImpl maxReadPosition;
+    /**
+     * Aborts, map for jude message is aborted, linked for remove abort txn in 
memory when this
+     * position have been deleted.
+     */
+    private final LinkedMap<TxnID, PositionImpl> aborts = new LinkedMap<>();
+
+    private volatile long lastSnapshotTimestamps;
+    private final int takeSnapshotIntervalNumber;
+
+    private final int takeSnapshotIntervalTime;
+
+
+    // when add abort or change max read position, the count will +1. Take 
snapshot will set 0 into it.
+    private final AtomicLong changeMaxReadPositionAndAddAbortTimes = new 
AtomicLong();
+
+
+    public SingleSnapshotAbortedTxnProcessorImpl(PersistentTopic topic) {
+        this.topic = topic;
+        this.maxReadPosition = (PositionImpl) 
topic.getManagedLedger().getLastConfirmedEntry();
+        this.takeSnapshotWriter = this.topic.getBrokerService().getPulsar()
+                .getTransactionBufferSnapshotServiceFactory()
+                
.getTxnBufferSnapshotService().createWriter(TopicName.get(topic.getName()));
+        this.takeSnapshotIntervalNumber = topic.getBrokerService().getPulsar()
+                
.getConfiguration().getTransactionBufferSnapshotMaxTransactionCount();
+        this.takeSnapshotIntervalTime = topic.getBrokerService().getPulsar()
+                
.getConfiguration().getTransactionBufferSnapshotMinTimeInMillis();
+    }
+
+    @Override
+    public void appendAbortedTxn(TxnID abortedTxnId, PositionImpl position) {
+        aborts.put(abortedTxnId, position);
+    }
+
+    @Override
+    public void trimExpiredAbortedTxns() {
+        while (!aborts.isEmpty() && !((ManagedLedgerImpl) 
topic.getManagedLedger())
+                .ledgerExists(aborts.get(aborts.firstKey()).getLedgerId())) {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Topic transaction buffer clear aborted 
transaction, TxnId : {}, Position : {}",
+                        topic.getName(), aborts.firstKey(), 
aborts.get(aborts.firstKey()));
+            }
+            aborts.remove(aborts.firstKey());
+        }
+    }
+
+    @Override
+    public boolean checkAbortedTransaction(TxnID txnID, Position readPosition) 
{
+        return aborts.containsKey(txnID);
+    }
+
+
+    @Override
+    public CompletableFuture<PositionImpl> recoverFromSnapshot() {
+        return 
topic.getBrokerService().getPulsar().getTransactionBufferSnapshotServiceFactory()
+                .getTxnBufferSnapshotService()
+                
.createReader(TopicName.get(topic.getName())).thenComposeAsync(reader -> {
+                    PositionImpl startReadCursorPosition = null;
+                    try {
+                        while (reader.hasMoreEvents()) {
+                            Message<TransactionBufferSnapshot> message = 
reader.readNext();
+                            if (topic.getName().equals(message.getKey())) {
+                                TransactionBufferSnapshot 
transactionBufferSnapshot = message.getValue();
+                                if (transactionBufferSnapshot != null) {
+                                    handleSnapshot(transactionBufferSnapshot);
+                                    startReadCursorPosition = PositionImpl.get(
+                                            
transactionBufferSnapshot.getMaxReadPositionLedgerId(),
+                                            
transactionBufferSnapshot.getMaxReadPositionEntryId());
+                                }
+                            }
+                        }
+                        closeReader(reader);
+                        return 
CompletableFuture.completedFuture(startReadCursorPosition);
+                    } catch (Exception ex) {
+                        log.error("[{}] Transaction buffer recover fail when 
read "
+                                + "transactionBufferSnapshot!", 
topic.getName(), ex);
+                        closeReader(reader);
+                        return FutureUtil.failedFuture(ex);
+                    }
+
+                },  
topic.getBrokerService().getPulsar().getTransactionExecutorProvider()
+                        .getExecutor(this));
+    }
+
+    @Override
+    public CompletableFuture<Void> clearAndCloseAsync() {
+        return this.takeSnapshotWriter.thenCompose(writer -> {
+            TransactionBufferSnapshot snapshot = new 
TransactionBufferSnapshot();
+            snapshot.setTopicName(topic.getName());
+            return writer.deleteAsync(snapshot.getTopicName(), snapshot);
+        }).thenRun(this::closeAsync);
+    }
+
+    @Override
+    public CompletableFuture<Void> takeAbortedTxnSnapshot(PositionImpl 
maxReadPosition) {
+        return takeAbortedTxnSnapshot(maxReadPosition, aborts);
+    }
+
+    @Override
+    public long getLastSnapshotTimestamps() {
+        return this.lastSnapshotTimestamps;
+    }
+
+    @Override
+    public CompletableFuture<Void> closeAsync() {
+        return 
takeSnapshotWriter.thenCompose(SystemTopicClient.Writer::closeAsync);
+    }
+
+    private void 
closeReader(SystemTopicClient.Reader<TransactionBufferSnapshot> reader) {
+        reader.closeAsync().exceptionally(e -> {
+            log.error("[{}]Transaction buffer reader close error!", 
topic.getName(), e);
+            return null;
+        });
+    }
+
+    private void handleSnapshot(TransactionBufferSnapshot snapshot) {
+        maxReadPosition = 
PositionImpl.get(snapshot.getMaxReadPositionLedgerId(),
+                snapshot.getMaxReadPositionEntryId());
+        if (snapshot.getAborts() != null) {
+            snapshot.getAborts().forEach(abortTxnMetadata ->
+                    aborts.put(new TxnID(abortTxnMetadata.getTxnIdMostBits(),
+                                    abortTxnMetadata.getTxnIdLeastBits()),
+                            PositionImpl.get(abortTxnMetadata.getLedgerId(),
+                                    abortTxnMetadata.getEntryId())));
+        }
+    }
+
+    private CompletableFuture<Void> takeAbortedTxnSnapshot(PositionImpl 
maxReadPosition, LinkedMap<TxnID, PositionImpl> aborts) {
+        changeMaxReadPositionAndAddAbortTimes.set(0);

Review Comment:
   delete



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java:
##########
@@ -0,0 +1,605 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,2
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.buffer.impl;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.util.Timer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.function.Supplier;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.bookkeeper.mledger.impl.ReadOnlyManagedLedgerImpl;
+import org.apache.commons.lang3.tuple.MutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.systopic.SystemTopicClient;
+import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
+import 
org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotIndex;
+import 
org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotIndexes;
+import 
org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotIndexesMetadata;
+import 
org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotSegment;
+import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TxnIDData;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.common.events.EventType;
+import org.apache.pulsar.common.naming.TopicDomain;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.util.FutureUtil;
+
+@Slf4j
+public class SnapshotSegmentAbortedTxnProcessorImpl implements 
AbortedTxnProcessor {
+
+    private ArrayList<TxnID> unsealedAbortedTxnIdSegment = new ArrayList<>();

Review Comment:
   checkAbortTxn uses different threads, we should use thread-safe collections. 
We use `ConcurrentOpenHashSet`, because if we use a list, it will be traversed 
during a query, affecting performance 



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java:
##########
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.buffer.impl;
+
+import io.netty.util.Timer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.commons.collections4.map.LinkedMap;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.systopic.SystemTopicClient;
+import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
+import org.apache.pulsar.broker.transaction.buffer.metadata.AbortTxnMetadata;
+import 
org.apache.pulsar.broker.transaction.buffer.metadata.TransactionBufferSnapshot;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.FutureUtil;
+
+@Slf4j
+public class SingleSnapshotAbortedTxnProcessorImpl implements 
AbortedTxnProcessor {
+    private final PersistentTopic topic;
+    private final 
CompletableFuture<SystemTopicClient.Writer<TransactionBufferSnapshot>> 
takeSnapshotWriter;
+    private volatile PositionImpl maxReadPosition;
+    /**
+     * Aborts, map for jude message is aborted, linked for remove abort txn in 
memory when this
+     * position have been deleted.
+     */
+    private final LinkedMap<TxnID, PositionImpl> aborts = new LinkedMap<>();
+
+    private volatile long lastSnapshotTimestamps;
+    private final int takeSnapshotIntervalNumber;
+
+    private final int takeSnapshotIntervalTime;
+
+
+    // when add abort or change max read position, the count will +1. Take 
snapshot will set 0 into it.
+    private final AtomicLong changeMaxReadPositionAndAddAbortTimes = new 
AtomicLong();
+
+
+    public SingleSnapshotAbortedTxnProcessorImpl(PersistentTopic topic) {
+        this.topic = topic;
+        this.maxReadPosition = (PositionImpl) 
topic.getManagedLedger().getLastConfirmedEntry();
+        this.takeSnapshotWriter = this.topic.getBrokerService().getPulsar()
+                .getTransactionBufferSnapshotServiceFactory()
+                
.getTxnBufferSnapshotService().createWriter(TopicName.get(topic.getName()));
+        this.takeSnapshotIntervalNumber = topic.getBrokerService().getPulsar()
+                
.getConfiguration().getTransactionBufferSnapshotMaxTransactionCount();
+        this.takeSnapshotIntervalTime = topic.getBrokerService().getPulsar()
+                
.getConfiguration().getTransactionBufferSnapshotMinTimeInMillis();
+    }
+
+    @Override
+    public void appendAbortedTxn(TxnID abortedTxnId, PositionImpl position) {
+        aborts.put(abortedTxnId, position);
+    }
+
+    @Override
+    public void trimExpiredAbortedTxns() {
+        while (!aborts.isEmpty() && !((ManagedLedgerImpl) 
topic.getManagedLedger())
+                .ledgerExists(aborts.get(aborts.firstKey()).getLedgerId())) {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Topic transaction buffer clear aborted 
transaction, TxnId : {}, Position : {}",
+                        topic.getName(), aborts.firstKey(), 
aborts.get(aborts.firstKey()));
+            }
+            aborts.remove(aborts.firstKey());
+        }
+    }
+
+    @Override
+    public boolean checkAbortedTransaction(TxnID txnID, Position readPosition) 
{
+        return aborts.containsKey(txnID);
+    }
+
+
+    @Override
+    public CompletableFuture<PositionImpl> recoverFromSnapshot() {
+        return 
topic.getBrokerService().getPulsar().getTransactionBufferSnapshotServiceFactory()
+                .getTxnBufferSnapshotService()
+                
.createReader(TopicName.get(topic.getName())).thenComposeAsync(reader -> {
+                    PositionImpl startReadCursorPosition = null;
+                    try {
+                        while (reader.hasMoreEvents()) {
+                            Message<TransactionBufferSnapshot> message = 
reader.readNext();
+                            if (topic.getName().equals(message.getKey())) {
+                                TransactionBufferSnapshot 
transactionBufferSnapshot = message.getValue();
+                                if (transactionBufferSnapshot != null) {
+                                    handleSnapshot(transactionBufferSnapshot);
+                                    startReadCursorPosition = PositionImpl.get(
+                                            
transactionBufferSnapshot.getMaxReadPositionLedgerId(),
+                                            
transactionBufferSnapshot.getMaxReadPositionEntryId());
+                                }
+                            }
+                        }
+                        closeReader(reader);
+                        return 
CompletableFuture.completedFuture(startReadCursorPosition);
+                    } catch (Exception ex) {
+                        log.error("[{}] Transaction buffer recover fail when 
read "
+                                + "transactionBufferSnapshot!", 
topic.getName(), ex);
+                        closeReader(reader);
+                        return FutureUtil.failedFuture(ex);
+                    }
+
+                },  
topic.getBrokerService().getPulsar().getTransactionExecutorProvider()
+                        .getExecutor(this));
+    }
+
+    @Override
+    public CompletableFuture<Void> clearAndCloseAsync() {
+        return this.takeSnapshotWriter.thenCompose(writer -> {
+            TransactionBufferSnapshot snapshot = new 
TransactionBufferSnapshot();
+            snapshot.setTopicName(topic.getName());
+            return writer.deleteAsync(snapshot.getTopicName(), snapshot);
+        }).thenRun(this::closeAsync);
+    }
+
+    @Override
+    public CompletableFuture<Void> takeAbortedTxnSnapshot(PositionImpl 
maxReadPosition) {
+        return takeAbortedTxnSnapshot(maxReadPosition, aborts);
+    }
+
+    @Override
+    public long getLastSnapshotTimestamps() {
+        return this.lastSnapshotTimestamps;
+    }
+
+    @Override
+    public CompletableFuture<Void> closeAsync() {
+        return 
takeSnapshotWriter.thenCompose(SystemTopicClient.Writer::closeAsync);
+    }
+
+    private void 
closeReader(SystemTopicClient.Reader<TransactionBufferSnapshot> reader) {
+        reader.closeAsync().exceptionally(e -> {
+            log.error("[{}]Transaction buffer reader close error!", 
topic.getName(), e);
+            return null;
+        });
+    }
+
+    private void handleSnapshot(TransactionBufferSnapshot snapshot) {
+        maxReadPosition = 
PositionImpl.get(snapshot.getMaxReadPositionLedgerId(),
+                snapshot.getMaxReadPositionEntryId());
+        if (snapshot.getAborts() != null) {
+            snapshot.getAborts().forEach(abortTxnMetadata ->
+                    aborts.put(new TxnID(abortTxnMetadata.getTxnIdMostBits(),
+                                    abortTxnMetadata.getTxnIdLeastBits()),
+                            PositionImpl.get(abortTxnMetadata.getLedgerId(),
+                                    abortTxnMetadata.getEntryId())));
+        }
+    }
+
+    private CompletableFuture<Void> takeAbortedTxnSnapshot(PositionImpl 
maxReadPosition, LinkedMap<TxnID, PositionImpl> aborts) {
+        changeMaxReadPositionAndAddAbortTimes.set(0);
+        return takeSnapshotWriter.thenCompose(writer -> {
+            TransactionBufferSnapshot snapshot = new 
TransactionBufferSnapshot();
+            synchronized (topic) {
+                snapshot.setTopicName(topic.getName());
+                
snapshot.setMaxReadPositionLedgerId(maxReadPosition.getLedgerId());
+                
snapshot.setMaxReadPositionEntryId(maxReadPosition.getEntryId());
+                List<AbortTxnMetadata> list = new ArrayList<>();
+                aborts.forEach((k, v) -> {
+                    AbortTxnMetadata abortTxnMetadata = new AbortTxnMetadata();
+                    abortTxnMetadata.setTxnIdMostBits(k.getMostSigBits());
+                    abortTxnMetadata.setTxnIdLeastBits(k.getLeastSigBits());
+                    abortTxnMetadata.setLedgerId(v.getLedgerId());
+                    abortTxnMetadata.setEntryId(v.getEntryId());
+                    list.add(abortTxnMetadata);
+                });
+                snapshot.setAborts(list);
+            }

Review Comment:
   ```suggestion
               TransactionBufferSnapshot snapshot = new 
TransactionBufferSnapshot();
               snapshot.setTopicName(topic.getName());
               
snapshot.setMaxReadPositionLedgerId(maxReadPosition.getLedgerId());
               snapshot.setMaxReadPositionEntryId(maxReadPosition.getEntryId());
               List<AbortTxnMetadata> list = new ArrayList<>();
               aborts.forEach((k, v) -> {
                   AbortTxnMetadata abortTxnMetadata = new AbortTxnMetadata();
                   abortTxnMetadata.setTxnIdMostBits(k.getMostSigBits());
                   abortTxnMetadata.setTxnIdLeastBits(k.getLeastSigBits());
                   abortTxnMetadata.setLedgerId(v.getLedgerId());
                   abortTxnMetadata.setEntryId(v.getEntryId());
                   list.add(abortTxnMetadata);
               });
               snapshot.setAborts(list);
           return takeSnapshotWriter.thenCompose(writer -> {
    
               }
   ```
   
   in this way, we just need to make `takeAbortedTxnSnapshot` and 
updateMaxReadPosition and putAborts thread-safe is  OK



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/AbortedTxnProcessor.java:
##########
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.buffer;
+
+import io.netty.util.TimerTask;
+import java.util.concurrent.CompletableFuture;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.client.api.transaction.TxnID;
+
+
+public interface AbortedTxnProcessor {
+
+    /**
+     * After the transaction buffer writes a transaction aborted mark to the 
topic,
+     * the transaction buffer will add the aborted transaction ID to 
AbortedTxnProcessor.
+     * @param txnID aborted transaction ID.
+     */
+    void appendAbortedTxn(TxnID txnID, PositionImpl position);
+
+    /**
+     * Pulsar has a configuration for ledger retention time.
+     * If the transaction aborted mark position has been deleted, the 
transaction is valid and can be clear.
+     * In the old implementation we clear the invalid aborted txn ID one by 
one.
+     * In the new implementation, we adopt snapshot segments. And then we 
clear invalid segment by its max read position.
+     */
+    void trimExpiredAbortedTxns();
+
+    /**
+     * Check whether the transaction ID is an aborted transaction ID.
+     * @param txnID the transaction ID that needs to be checked.
+     * @param readPosition the read position of the transaction message, can 
be used to find the segment.
+     * @return a boolean, whether the transaction ID is an aborted transaction 
ID.
+     */
+    boolean checkAbortedTransaction(TxnID txnID, Position readPosition);
+
+    /**
+     * Recover transaction buffer by transaction buffer snapshot.
+     * @return a pair consists of a Boolean if the transaction buffer needs to 
recover and a Position (startReadCursorPosition) determiner where to start to 
recover in the original topic.
+     */
+
+    CompletableFuture<PositionImpl> recoverFromSnapshot();
+
+    /**
+     * Clear the snapshot/snapshot segment and index for this topic.
+     * @return a completableFuture.
+     */
+    CompletableFuture<Void> clearAndCloseAsync();

Review Comment:
   ```suggestion
        * Delete the transaction buffer aborted transaction snapshot.
        * @return a completableFuture.
        */
       CompletableFuture<Void> deleteAbortedTxnSnapshot();
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java:
##########
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.buffer.impl;
+
+import io.netty.util.Timer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.commons.collections4.map.LinkedMap;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.systopic.SystemTopicClient;
+import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
+import org.apache.pulsar.broker.transaction.buffer.metadata.AbortTxnMetadata;
+import 
org.apache.pulsar.broker.transaction.buffer.metadata.TransactionBufferSnapshot;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.FutureUtil;
+
+@Slf4j
+public class SingleSnapshotAbortedTxnProcessorImpl implements 
AbortedTxnProcessor {
+    private final PersistentTopic topic;
+    private final 
CompletableFuture<SystemTopicClient.Writer<TransactionBufferSnapshot>> 
takeSnapshotWriter;
+    private volatile PositionImpl maxReadPosition;
+    /**
+     * Aborts, map for jude message is aborted, linked for remove abort txn in 
memory when this
+     * position have been deleted.
+     */
+    private final LinkedMap<TxnID, PositionImpl> aborts = new LinkedMap<>();
+
+    private volatile long lastSnapshotTimestamps;
+    private final int takeSnapshotIntervalNumber;
+
+    private final int takeSnapshotIntervalTime;
+
+
+    // when add abort or change max read position, the count will +1. Take 
snapshot will set 0 into it.
+    private final AtomicLong changeMaxReadPositionAndAddAbortTimes = new 
AtomicLong();
+
+
+    public SingleSnapshotAbortedTxnProcessorImpl(PersistentTopic topic) {
+        this.topic = topic;
+        this.maxReadPosition = (PositionImpl) 
topic.getManagedLedger().getLastConfirmedEntry();
+        this.takeSnapshotWriter = this.topic.getBrokerService().getPulsar()
+                .getTransactionBufferSnapshotServiceFactory()
+                
.getTxnBufferSnapshotService().createWriter(TopicName.get(topic.getName()));
+        this.takeSnapshotIntervalNumber = topic.getBrokerService().getPulsar()
+                
.getConfiguration().getTransactionBufferSnapshotMaxTransactionCount();
+        this.takeSnapshotIntervalTime = topic.getBrokerService().getPulsar()
+                
.getConfiguration().getTransactionBufferSnapshotMinTimeInMillis();
+    }
+
+    @Override
+    public void appendAbortedTxn(TxnID abortedTxnId, PositionImpl position) {
+        aborts.put(abortedTxnId, position);
+    }
+
+    @Override
+    public void trimExpiredAbortedTxns() {
+        while (!aborts.isEmpty() && !((ManagedLedgerImpl) 
topic.getManagedLedger())
+                .ledgerExists(aborts.get(aborts.firstKey()).getLedgerId())) {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Topic transaction buffer clear aborted 
transaction, TxnId : {}, Position : {}",
+                        topic.getName(), aborts.firstKey(), 
aborts.get(aborts.firstKey()));
+            }
+            aborts.remove(aborts.firstKey());
+        }
+    }
+
+    @Override
+    public boolean checkAbortedTransaction(TxnID txnID, Position readPosition) 
{
+        return aborts.containsKey(txnID);
+    }
+
+
+    @Override
+    public CompletableFuture<PositionImpl> recoverFromSnapshot() {
+        return 
topic.getBrokerService().getPulsar().getTransactionBufferSnapshotServiceFactory()
+                .getTxnBufferSnapshotService()
+                
.createReader(TopicName.get(topic.getName())).thenComposeAsync(reader -> {
+                    PositionImpl startReadCursorPosition = null;
+                    try {
+                        while (reader.hasMoreEvents()) {
+                            Message<TransactionBufferSnapshot> message = 
reader.readNext();
+                            if (topic.getName().equals(message.getKey())) {
+                                TransactionBufferSnapshot 
transactionBufferSnapshot = message.getValue();
+                                if (transactionBufferSnapshot != null) {
+                                    handleSnapshot(transactionBufferSnapshot);
+                                    startReadCursorPosition = PositionImpl.get(
+                                            
transactionBufferSnapshot.getMaxReadPositionLedgerId(),
+                                            
transactionBufferSnapshot.getMaxReadPositionEntryId());
+                                }
+                            }
+                        }
+                        closeReader(reader);
+                        return 
CompletableFuture.completedFuture(startReadCursorPosition);
+                    } catch (Exception ex) {
+                        log.error("[{}] Transaction buffer recover fail when 
read "
+                                + "transactionBufferSnapshot!", 
topic.getName(), ex);
+                        closeReader(reader);
+                        return FutureUtil.failedFuture(ex);
+                    }
+
+                },  
topic.getBrokerService().getPulsar().getTransactionExecutorProvider()
+                        .getExecutor(this));
+    }
+
+    @Override
+    public CompletableFuture<Void> clearAndCloseAsync() {
+        return this.takeSnapshotWriter.thenCompose(writer -> {
+            TransactionBufferSnapshot snapshot = new 
TransactionBufferSnapshot();
+            snapshot.setTopicName(topic.getName());
+            return writer.deleteAsync(snapshot.getTopicName(), snapshot);
+        }).thenRun(this::closeAsync);
+    }
+
+    @Override
+    public CompletableFuture<Void> takeAbortedTxnSnapshot(PositionImpl 
maxReadPosition) {
+        return takeAbortedTxnSnapshot(maxReadPosition, aborts);
+    }
+
+    @Override
+    public long getLastSnapshotTimestamps() {
+        return this.lastSnapshotTimestamps;
+    }
+
+    @Override
+    public CompletableFuture<Void> closeAsync() {
+        return 
takeSnapshotWriter.thenCompose(SystemTopicClient.Writer::closeAsync);
+    }
+
+    private void 
closeReader(SystemTopicClient.Reader<TransactionBufferSnapshot> reader) {
+        reader.closeAsync().exceptionally(e -> {
+            log.error("[{}]Transaction buffer reader close error!", 
topic.getName(), e);
+            return null;
+        });
+    }
+
+    private void handleSnapshot(TransactionBufferSnapshot snapshot) {
+        maxReadPosition = 
PositionImpl.get(snapshot.getMaxReadPositionLedgerId(),
+                snapshot.getMaxReadPositionEntryId());
+        if (snapshot.getAborts() != null) {
+            snapshot.getAborts().forEach(abortTxnMetadata ->
+                    aborts.put(new TxnID(abortTxnMetadata.getTxnIdMostBits(),
+                                    abortTxnMetadata.getTxnIdLeastBits()),
+                            PositionImpl.get(abortTxnMetadata.getLedgerId(),
+                                    abortTxnMetadata.getEntryId())));
+        }
+    }
+
+    private CompletableFuture<Void> takeAbortedTxnSnapshot(PositionImpl 
maxReadPosition, LinkedMap<TxnID, PositionImpl> aborts) {

Review Comment:
   move to `public CompletableFuture<Void> takeAbortedTxnSnapshot(PositionImpl 
maxReadPosition) {`



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java:
##########
@@ -0,0 +1,605 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,2
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.buffer.impl;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.util.Timer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.function.Supplier;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.bookkeeper.mledger.impl.ReadOnlyManagedLedgerImpl;
+import org.apache.commons.lang3.tuple.MutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.systopic.SystemTopicClient;
+import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
+import 
org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotIndex;
+import 
org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotIndexes;
+import 
org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotIndexesMetadata;
+import 
org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotSegment;
+import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TxnIDData;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.common.events.EventType;
+import org.apache.pulsar.common.naming.TopicDomain;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.util.FutureUtil;
+
+@Slf4j
+public class SnapshotSegmentAbortedTxnProcessorImpl implements 
AbortedTxnProcessor {
+
+    private ArrayList<TxnID> unsealedAbortedTxnIdSegment = new ArrayList<>();
+
+    //Store the fixed aborted transaction segment
+    private final ConcurrentSkipListMap<PositionImpl, ArrayList<TxnID>> 
abortTxnSegments

Review Comment:
   ```suggestion
       private final ConcurrentSkipListMap<PositionImpl, Set<TxnID>> 
abortTxnSegments
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to