This is an automated email from the ASF dual-hosted git repository.

rgao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 5d2d61e298e [feat][txn] Transaction buffer snapshot writer reuse 
(#19641)
5d2d61e298e is described below

commit 5d2d61e298e337b93fb178da191bc41d311107c2
Author: ran <r...@streamnative.io>
AuthorDate: Mon Mar 20 17:41:20 2023 +0800

    [feat][txn] Transaction buffer snapshot writer reuse (#19641)
---
 .../SystemTopicTxnBufferSnapshotService.java       | 100 +++++++++++++++++----
 .../SingleSnapshotAbortedTxnProcessorImpl.java     |  21 +++--
 .../SnapshotSegmentAbortedTxnProcessorImpl.java    |  45 +++++-----
 .../SegmentAbortedTxnProcessorTest.java            |  19 ++--
 .../TopicTransactionBufferRecoverTest.java         |  17 ++--
 .../pulsar/broker/transaction/TransactionTest.java |  85 +++++++++++++++++-
 .../broker/transaction/TransactionTestBase.java    |  20 +++++
 .../buffer/TransactionBufferCloseTest.java         |  79 ++++++++--------
 8 files changed, 293 insertions(+), 93 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicTxnBufferSnapshotService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicTxnBufferSnapshotService.java
index a1b78d89a13..332d754cf97 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicTxnBufferSnapshotService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicTxnBufferSnapshotService.java
@@ -21,63 +21,127 @@ package org.apache.pulsar.broker.service;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
 import org.apache.pulsar.broker.systopic.SystemTopicClient;
 import org.apache.pulsar.broker.systopic.SystemTopicClientBase;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.common.events.EventType;
+import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.util.FutureUtil;
 
+@Slf4j
 public class SystemTopicTxnBufferSnapshotService<T> {
 
-    protected final Map<TopicName, SystemTopicClient<T>> clients;
+    protected final ConcurrentHashMap<NamespaceName, SystemTopicClient<T>> 
clients;
     protected final NamespaceEventsSystemTopicFactory 
namespaceEventsSystemTopicFactory;
 
     protected final Class<T> schemaType;
     protected final EventType systemTopicType;
 
+    private final ConcurrentHashMap<NamespaceName, ReferenceCountedWriter<T>> 
refCountedWriterMap;
+
+    // The class ReferenceCountedWriter will maintain the reference count,
+    // when the reference count decrement to 0, it will be removed from 
writerFutureMap, the writer will be closed.
+    public static class ReferenceCountedWriter<T> {
+
+        private final AtomicLong referenceCount;
+        private final NamespaceName namespaceName;
+        private final CompletableFuture<SystemTopicClient.Writer<T>> future;
+        private final SystemTopicTxnBufferSnapshotService<T> snapshotService;
+
+        public ReferenceCountedWriter(NamespaceName namespaceName,
+                                      
CompletableFuture<SystemTopicClient.Writer<T>> future,
+                                      SystemTopicTxnBufferSnapshotService<T> 
snapshotService) {
+            this.referenceCount = new AtomicLong(1);
+            this.namespaceName = namespaceName;
+            this.snapshotService = snapshotService;
+            this.future = future;
+            this.future.exceptionally(t -> {
+                        log.error("[{}] Failed to create TB snapshot writer.", 
namespaceName, t);
+                snapshotService.refCountedWriterMap.remove(namespaceName, 
this);
+                return null;
+            });
+        }
+
+        public CompletableFuture<SystemTopicClient.Writer<T>> getFuture() {
+            return future;
+        }
+
+        private synchronized boolean retain() {
+            return this.referenceCount.incrementAndGet() > 0;
+        }
+
+        public synchronized void release() {
+            if (this.referenceCount.decrementAndGet() == 0) {
+                snapshotService.refCountedWriterMap.remove(namespaceName, 
this);
+                future.thenAccept(writer -> {
+                    final String topicName = 
writer.getSystemTopicClient().getTopicName().toString();
+                    writer.closeAsync().exceptionally(t -> {
+                        if (t != null) {
+                            log.error("[{}] Failed to close TB snapshot 
writer.", topicName, t);
+                        } else {
+                            if (log.isDebugEnabled()) {
+                                log.debug("[{}] Success to close TB snapshot 
writer.", topicName);
+                            }
+                        }
+                        return null;
+                    });
+                });
+            }
+        }
+
+    }
+
     public SystemTopicTxnBufferSnapshotService(PulsarClient client, EventType 
systemTopicType,
                                                Class<T> schemaType) {
         this.namespaceEventsSystemTopicFactory = new 
NamespaceEventsSystemTopicFactory(client);
         this.systemTopicType = systemTopicType;
         this.schemaType = schemaType;
         this.clients = new ConcurrentHashMap<>();
-    }
-
-    public CompletableFuture<SystemTopicClient.Writer<T>> 
createWriter(TopicName topicName) {
-        return 
getTransactionBufferSystemTopicClient(topicName).thenCompose(SystemTopicClient::newWriterAsync);
+        this.refCountedWriterMap = new ConcurrentHashMap<>();
     }
 
     public CompletableFuture<SystemTopicClient.Reader<T>> 
createReader(TopicName topicName) {
-        return 
getTransactionBufferSystemTopicClient(topicName).thenCompose(SystemTopicClient::newReaderAsync);
+        return 
getTransactionBufferSystemTopicClient(topicName.getNamespaceObject()).newReaderAsync();
     }
 
     public void removeClient(TopicName topicName, SystemTopicClientBase<T> 
transactionBufferSystemTopicClient) {
         if (transactionBufferSystemTopicClient.getReaders().size() == 0
                 && transactionBufferSystemTopicClient.getWriters().size() == 
0) {
-            clients.remove(topicName);
+            clients.remove(topicName.getNamespaceObject());
         }
     }
 
-    protected CompletableFuture<SystemTopicClient<T>> 
getTransactionBufferSystemTopicClient(TopicName topicName) {
+    public ReferenceCountedWriter<T> getReferenceWriter(NamespaceName 
namespaceName) {
+        return refCountedWriterMap.compute(namespaceName, (k, v) -> {
+            if (v != null && v.retain()) {
+                return v;
+            } else {
+                return new ReferenceCountedWriter<>(namespaceName,
+                        
getTransactionBufferSystemTopicClient(namespaceName).newWriterAsync(), this);
+            }
+        });
+    }
+
+    private SystemTopicClient<T> 
getTransactionBufferSystemTopicClient(NamespaceName namespaceName) {
         TopicName systemTopicName = NamespaceEventsSystemTopicFactory
-                .getSystemTopicName(topicName.getNamespaceObject(), 
systemTopicType);
+                .getSystemTopicName(namespaceName, systemTopicType);
         if (systemTopicName == null) {
-            return FutureUtil.failedFuture(
-                    new PulsarClientException
-                            .InvalidTopicNameException("Can't create 
SystemTopicBaseTxnBufferSnapshotIndexService, "
-                            + "because the topicName is null!"));
+            throw new RuntimeException(new 
PulsarClientException.InvalidTopicNameException(
+                    "Can't get the TB system topic client for namespace " + 
namespaceName
+                            + " with type " + systemTopicType + "."));
         }
-        return 
CompletableFuture.completedFuture(clients.computeIfAbsent(systemTopicName,
+
+        return clients.computeIfAbsent(namespaceName,
                 (v) -> namespaceEventsSystemTopicFactory
-                        
.createTransactionBufferSystemTopicClient(systemTopicName,
-                                this, schemaType)));
+                        
.createTransactionBufferSystemTopicClient(systemTopicName, this, schemaType));
     }
 
     public void close() throws Exception {
-        for (Map.Entry<TopicName, SystemTopicClient<T>> entry : 
clients.entrySet()) {
+        for (Map.Entry<NamespaceName, SystemTopicClient<T>> entry : 
clients.entrySet()) {
             entry.getValue().close();
         }
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java
index 87161e97512..5d582d564ea 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java
@@ -28,6 +28,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.commons.collections4.map.LinkedMap;
 import org.apache.pulsar.broker.service.BrokerServiceException;
+import 
org.apache.pulsar.broker.service.SystemTopicTxnBufferSnapshotService.ReferenceCountedWriter;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.systopic.SystemTopicClient;
 import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
@@ -42,7 +43,7 @@ import org.apache.pulsar.common.util.FutureUtil;
 @Slf4j
 public class SingleSnapshotAbortedTxnProcessorImpl implements 
AbortedTxnProcessor {
     private final PersistentTopic topic;
-    private final 
CompletableFuture<SystemTopicClient.Writer<TransactionBufferSnapshot>> 
takeSnapshotWriter;
+    private final ReferenceCountedWriter<TransactionBufferSnapshot> 
takeSnapshotWriter;
     /**
      * Aborts, map for jude message is aborted, linked for remove abort txn in 
memory when this
      * position have been deleted.
@@ -51,12 +52,14 @@ public class SingleSnapshotAbortedTxnProcessorImpl 
implements AbortedTxnProcesso
 
     private volatile long lastSnapshotTimestamps;
 
+    private volatile boolean isClosed = false;
+
     public SingleSnapshotAbortedTxnProcessorImpl(PersistentTopic topic) {
         this.topic = topic;
         this.takeSnapshotWriter = this.topic.getBrokerService().getPulsar()
                 .getTransactionBufferSnapshotServiceFactory()
-                
.getTxnBufferSnapshotService().createWriter(TopicName.get(topic.getName()));
-        this.takeSnapshotWriter.exceptionally((ex) -> {
+                
.getTxnBufferSnapshotService().getReferenceWriter(TopicName.get(topic.getName()).getNamespaceObject());
+        this.takeSnapshotWriter.getFuture().exceptionally((ex) -> {
                     log.error("{} Failed to create snapshot writer", 
topic.getName());
                     topic.close();
                     return null;
@@ -132,7 +135,7 @@ public class SingleSnapshotAbortedTxnProcessorImpl 
implements AbortedTxnProcesso
 
     @Override
     public CompletableFuture<Void> clearAbortedTxnSnapshot() {
-        return this.takeSnapshotWriter.thenCompose(writer -> {
+        return this.takeSnapshotWriter.getFuture().thenCompose(writer -> {
             TransactionBufferSnapshot snapshot = new 
TransactionBufferSnapshot();
             snapshot.setTopicName(topic.getName());
             return writer.deleteAsync(snapshot.getTopicName(), snapshot);
@@ -141,7 +144,7 @@ public class SingleSnapshotAbortedTxnProcessorImpl 
implements AbortedTxnProcesso
 
     @Override
     public CompletableFuture<Void> takeAbortedTxnsSnapshot(PositionImpl 
maxReadPosition) {
-        return takeSnapshotWriter.thenCompose(writer -> {
+        return takeSnapshotWriter.getFuture().thenCompose(writer -> {
             TransactionBufferSnapshot snapshot = new 
TransactionBufferSnapshot();
             snapshot.setTopicName(topic.getName());
             snapshot.setMaxReadPositionLedgerId(maxReadPosition.getLedgerId());
@@ -175,8 +178,12 @@ public class SingleSnapshotAbortedTxnProcessorImpl 
implements AbortedTxnProcesso
     }
 
     @Override
-    public CompletableFuture<Void> closeAsync() {
-        return 
takeSnapshotWriter.thenCompose(SystemTopicClient.Writer::closeAsync);
+    public synchronized CompletableFuture<Void> closeAsync() {
+        if (!isClosed) {
+            isClosed = true;
+            takeSnapshotWriter.release();
+        }
+        return CompletableFuture.completedFuture(null);
     }
 
     private void 
closeReader(SystemTopicClient.Reader<TransactionBufferSnapshot> reader) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
index 7a9e0e1abed..751c03aff95 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
@@ -43,6 +43,7 @@ import org.apache.commons.collections4.map.LinkedMap;
 import org.apache.commons.lang3.tuple.MutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.service.BrokerServiceException;
+import 
org.apache.pulsar.broker.service.SystemTopicTxnBufferSnapshotService.ReferenceCountedWriter;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.systopic.SystemTopicClient;
 import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
@@ -442,10 +443,10 @@ public class SnapshotSegmentAbortedTxnProcessorImpl 
implements AbortedTxnProcess
         private final PersistentTopic topic;
 
         //Persistent snapshot segment and index at the single thread.
-        private final 
CompletableFuture<SystemTopicClient.Writer<TransactionBufferSnapshotSegment>>
-                snapshotSegmentsWriterFuture;
-        private final 
CompletableFuture<SystemTopicClient.Writer<TransactionBufferSnapshotIndexes>>
-                snapshotIndexWriterFuture;
+        private final ReferenceCountedWriter<TransactionBufferSnapshotSegment> 
snapshotSegmentsWriter;
+        private final ReferenceCountedWriter<TransactionBufferSnapshotIndexes> 
snapshotIndexWriter;
+
+        private volatile boolean closed = false;
 
         private enum OperationState {
             None,
@@ -470,18 +471,20 @@ public class SnapshotSegmentAbortedTxnProcessorImpl 
implements AbortedTxnProcess
 
         public PersistentWorker(PersistentTopic topic) {
             this.topic = topic;
-            this.snapshotSegmentsWriterFuture =  
this.topic.getBrokerService().getPulsar()
+            this.snapshotSegmentsWriter = 
this.topic.getBrokerService().getPulsar()
                     .getTransactionBufferSnapshotServiceFactory()
-                    
.getTxnBufferSnapshotSegmentService().createWriter(TopicName.get(topic.getName()));
-            this.snapshotSegmentsWriterFuture.exceptionally(ex -> {
+                    .getTxnBufferSnapshotSegmentService()
+                    
.getReferenceWriter(TopicName.get(topic.getName()).getNamespaceObject());
+            this.snapshotSegmentsWriter.getFuture().exceptionally(ex -> {
                         log.error("{} Failed to create snapshot index writer", 
topic.getName());
                         topic.close();
                         return null;
                     });
-            this.snapshotIndexWriterFuture =  
this.topic.getBrokerService().getPulsar()
+            this.snapshotIndexWriter =  
this.topic.getBrokerService().getPulsar()
                     .getTransactionBufferSnapshotServiceFactory()
-                    
.getTxnBufferSnapshotIndexService().createWriter(TopicName.get(topic.getName()));
-            this.snapshotIndexWriterFuture.exceptionally((ex) -> {
+                    .getTxnBufferSnapshotIndexService()
+                    
.getReferenceWriter(TopicName.get(topic.getName()).getNamespaceObject());
+            this.snapshotIndexWriter.getFuture().exceptionally((ex) -> {
                         log.error("{} Failed to create snapshot writer", 
topic.getName());
                         topic.close();
                         return null;
@@ -631,7 +634,7 @@ public class SnapshotSegmentAbortedTxnProcessorImpl 
implements AbortedTxnProcess
             transactionBufferSnapshotSegment.setPersistentPositionLedgerId(
                     abortedMarkerPersistentPosition.getLedgerId());
 
-            return snapshotSegmentsWriterFuture.thenCompose(segmentWriter -> {
+            return 
snapshotSegmentsWriter.getFuture().thenCompose(segmentWriter -> {
                 
transactionBufferSnapshotSegment.setSequenceId(this.sequenceID.get());
                 return 
segmentWriter.writeAsync(buildKey(this.sequenceID.get()), 
transactionBufferSnapshotSegment);
             }).thenCompose((messageId) -> {
@@ -668,7 +671,7 @@ public class SnapshotSegmentAbortedTxnProcessorImpl 
implements AbortedTxnProcess
             List<CompletableFuture<Void>> results = new ArrayList<>();
             for (PositionImpl positionNeedToDelete : positionNeedToDeletes) {
                 long sequenceIdNeedToDelete = 
indexes.get(positionNeedToDelete).getSequenceID();
-                CompletableFuture<Void> res = snapshotSegmentsWriterFuture
+                CompletableFuture<Void> res = 
snapshotSegmentsWriter.getFuture()
                         .thenCompose(writer -> 
writer.deleteAsync(buildKey(sequenceIdNeedToDelete), null))
                         .thenCompose(messageId -> {
                             if (log.isDebugEnabled()) {
@@ -695,7 +698,7 @@ public class SnapshotSegmentAbortedTxnProcessorImpl 
implements AbortedTxnProcess
 
         private CompletableFuture<Void> 
updateSnapshotIndex(TransactionBufferSnapshotIndexesMetadata snapshotSegment) {
             TransactionBufferSnapshotIndexes snapshotIndexes = new 
TransactionBufferSnapshotIndexes();
-            CompletableFuture<Void> res = snapshotIndexWriterFuture
+            CompletableFuture<Void> res = snapshotIndexWriter.getFuture()
                     .thenCompose((indexesWriter) -> {
                         
snapshotIndexes.setIndexList(indexes.values().stream().toList());
                         snapshotIndexes.setSnapshot(snapshotSegment);
@@ -712,7 +715,7 @@ public class SnapshotSegmentAbortedTxnProcessorImpl 
implements AbortedTxnProcess
 
         private CompletableFuture<Void> clearSnapshotSegmentAndIndexes() {
             CompletableFuture<Void> res = 
persistentWorker.clearAllSnapshotSegments()
-                    .thenCompose((ignore) -> snapshotIndexWriterFuture
+                    .thenCompose((ignore) -> snapshotIndexWriter.getFuture()
                             .thenCompose(indexesWriter -> 
indexesWriter.writeAsync(topic.getName(), null)))
                     .thenRun(() ->
                             log.debug("Successes to clear the snapshot segment 
and indexes for the topic [{}]",
@@ -747,7 +750,7 @@ public class SnapshotSegmentAbortedTxnProcessorImpl 
implements AbortedTxnProcess
                                 Message<TransactionBufferSnapshotSegment> 
message = reader.readNextAsync()
                                         
.get(getSystemClientOperationTimeoutMs(), TimeUnit.MILLISECONDS);
                                 if 
(topic.getName().equals(message.getValue().getTopicName())) {
-                                   
snapshotSegmentsWriterFuture.get().write(message.getKey(), null);
+                                   
snapshotSegmentsWriter.getFuture().get().write(message.getKey(), null);
                                 }
                             }
                             return CompletableFuture.completedFuture(null);
@@ -760,11 +763,13 @@ public class SnapshotSegmentAbortedTxnProcessorImpl 
implements AbortedTxnProcess
            });
         }
 
-
-        CompletableFuture<Void> closeAsync() {
-            return CompletableFuture.allOf(
-                    
this.snapshotIndexWriterFuture.thenCompose(SystemTopicClient.Writer::closeAsync),
-                    
this.snapshotSegmentsWriterFuture.thenCompose(SystemTopicClient.Writer::closeAsync));
+        synchronized CompletableFuture<Void> closeAsync() {
+            if (!closed) {
+                closed = true;
+                snapshotSegmentsWriter.release();
+                snapshotIndexWriter.release();
+            }
+            return CompletableFuture.completedFuture(null);
         }
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/SegmentAbortedTxnProcessorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/SegmentAbortedTxnProcessorTest.java
index ffc059de8e6..c157d7cf8c5 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/SegmentAbortedTxnProcessorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/SegmentAbortedTxnProcessorTest.java
@@ -19,6 +19,8 @@
 package org.apache.pulsar.broker.transaction;
 
 import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
 import java.lang.reflect.Field;
 import java.util.LinkedList;
 import java.util.NavigableMap;
@@ -34,6 +36,7 @@ import org.apache.commons.collections4.map.LinkedMap;
 import org.apache.commons.lang3.tuple.MutablePair;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.service.BrokerServiceException;
+import 
org.apache.pulsar.broker.service.SystemTopicTxnBufferSnapshotService.ReferenceCountedWriter;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
 import org.apache.pulsar.broker.systopic.SystemTopicClient;
@@ -125,6 +128,7 @@ public class SegmentAbortedTxnProcessorTest extends 
TransactionTestBase {
         newProcessor.trimExpiredAbortedTxns();
         //4. Verify the two sealed segment will be deleted.
         Awaitility.await().untilAsserted(() -> 
verifyAbortedTxnIDAndSegmentIndex(newProcessor, 11, 4));
+        processor.closeAsync().get(5, TimeUnit.SECONDS);
     }
 
     private void waitTaskExecuteCompletely(AbortedTxnProcessor processor) 
throws Exception {
@@ -177,8 +181,11 @@ public class SegmentAbortedTxnProcessorTest extends 
TransactionTestBase {
                 new MutablePair<>(new CompletableFuture<>(), task)));
         try {
             processor.takeAbortedTxnsSnapshot(new PositionImpl(1, 10)).get(2, 
TimeUnit.SECONDS);
+            fail("The update index operation should fail.");
         } catch (Exception e) {
             Assert.assertTrue(e.getCause() instanceof 
BrokerServiceException.ServiceUnitNotReadyException);
+        } finally {
+            processor.closeAsync().get(5, TimeUnit.SECONDS);
         }
     }
 
@@ -200,12 +207,13 @@ public class SegmentAbortedTxnProcessorTest extends 
TransactionTestBase {
         SnapshotSegmentAbortedTxnProcessorImpl.PersistentWorker worker =
                 (SnapshotSegmentAbortedTxnProcessorImpl.PersistentWorker) 
field.get(processor);
         Field indexWriteFutureField = SnapshotSegmentAbortedTxnProcessorImpl
-                
.PersistentWorker.class.getDeclaredField("snapshotIndexWriterFuture");
+                
.PersistentWorker.class.getDeclaredField("snapshotIndexWriter");
         indexWriteFutureField.setAccessible(true);
-        
CompletableFuture<SystemTopicClient.Writer<TransactionBufferSnapshotIndexes>> 
snapshotIndexWriterFuture =
-                
(CompletableFuture<SystemTopicClient.Writer<TransactionBufferSnapshotIndexes>>)
-                        indexWriteFutureField.get(worker);
-        snapshotIndexWriterFuture.get().close();
+        ReferenceCountedWriter<TransactionBufferSnapshotIndexes> 
snapshotIndexWriter =
+                (ReferenceCountedWriter<TransactionBufferSnapshotIndexes>) 
indexWriteFutureField.get(worker);
+        snapshotIndexWriter.release();
+        // After release, the writer should be closed, call close method again 
to make sure the writer was closed.
+        snapshotIndexWriter.getFuture().get().close();
         //3. Try to write a snapshot segment that will fail to update indexes.
         for (int j = 0; j < SEGMENT_SIZE; j++) {
             TxnID txnID = new TxnID(0, j);
@@ -233,6 +241,7 @@ public class SegmentAbortedTxnProcessorTest extends 
TransactionTestBase {
         //7. Verify the snapshot segments and index after clearing.
         verifySnapshotSegmentsSize(PROCESSOR_TOPIC, 0);
         verifySnapshotSegmentsIndexSize(PROCESSOR_TOPIC, 1);
+        processor.closeAsync().get(5, TimeUnit.SECONDS);
     }
 
     private void verifySnapshotSegmentsSize(String topic, int size) throws 
Exception {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
index d4ddb26e014..2d6622571c0 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
@@ -57,6 +57,7 @@ import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.service.AbstractTopic;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.SystemTopicTxnBufferSnapshotService;
+import 
org.apache.pulsar.broker.service.SystemTopicTxnBufferSnapshotService.ReferenceCountedWriter;
 import org.apache.pulsar.broker.service.Topic;
 import 
org.apache.pulsar.broker.service.TransactionBufferSnapshotServiceFactory;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
@@ -602,11 +603,12 @@ public class TopicTransactionBufferRecoverTest extends 
TransactionTestBase {
                 mock(SystemTopicTxnBufferSnapshotService.class);
         SystemTopicClient.Reader<TransactionBufferSnapshot> reader = 
mock(SystemTopicClient.Reader.class);
         SystemTopicClient.Writer<TransactionBufferSnapshot> writer = 
mock(SystemTopicClient.Writer.class);
+        ReferenceCountedWriter<TransactionBufferSnapshot> refCounterWriter = 
mock(ReferenceCountedWriter.class);
+        
doReturn(CompletableFuture.completedFuture(writer)).when(refCounterWriter).getFuture();
 
         doReturn(CompletableFuture.completedFuture(reader))
                 .when(systemTopicTxnBufferSnapshotService).createReader(any());
-        doReturn(CompletableFuture.completedFuture(writer))
-                .when(systemTopicTxnBufferSnapshotService).createWriter(any());
+        
doReturn(refCounterWriter).when(systemTopicTxnBufferSnapshotService).getReferenceWriter(any());
         TransactionBufferSnapshotServiceFactory 
transactionBufferSnapshotServiceFactory =
                 mock(TransactionBufferSnapshotServiceFactory.class);
         doReturn(systemTopicTxnBufferSnapshotService)
@@ -645,8 +647,9 @@ public class TopicTransactionBufferRecoverTest extends 
TransactionTestBase {
         originalTopic = (PersistentTopic) getPulsarServiceList().get(0)
                 .getBrokerService().getTopic(TopicName.get(topic).toString(), 
false).get().get();
         // mock create writer fail
-        doReturn(FutureUtil.failedFuture(new PulsarClientException("test")))
-                .when(systemTopicTxnBufferSnapshotService).createWriter(any());
+        ReferenceCountedWriter<TransactionBufferSnapshot> failedCountedWriter 
= mock(ReferenceCountedWriter.class);
+        doReturn(FutureUtil.failedFuture(new 
PulsarClientException("test"))).when(failedCountedWriter).getFuture();
+        
doReturn(failedCountedWriter).when(systemTopicTxnBufferSnapshotService).getReferenceWriter(any());
         checkCloseTopic(pulsarClient, 
transactionBufferSnapshotServiceFactoryOriginal,
                 transactionBufferSnapshotServiceFactory, originalTopic, field, 
producer);
     }
@@ -703,7 +706,8 @@ public class TopicTransactionBufferRecoverTest extends 
TransactionTestBase {
                 new 
TransactionBufferSnapshotServiceFactory(pulsarClient).getTxnBufferSnapshotIndexService();
 
         SystemTopicClient.Writer<TransactionBufferSnapshotIndexes> 
indexesWriter =
-                
transactionBufferSnapshotIndexService.createWriter(TopicName.get(SNAPSHOT_INDEX)).get();
+                transactionBufferSnapshotIndexService.getReferenceWriter(
+                        
TopicName.get(SNAPSHOT_INDEX).getNamespaceObject()).getFuture().get();
 
         SystemTopicClient.Reader<TransactionBufferSnapshotIndexes> 
indexesReader =
                 
transactionBufferSnapshotIndexService.createReader(TopicName.get(SNAPSHOT_INDEX)).get();
@@ -764,7 +768,8 @@ public class TopicTransactionBufferRecoverTest extends 
TransactionTestBase {
                 new 
TransactionBufferSnapshotServiceFactory(pulsarClient).getTxnBufferSnapshotSegmentService();
 
         SystemTopicClient.Writer<TransactionBufferSnapshotSegment>
-                segmentWriter = 
transactionBufferSnapshotSegmentService.createWriter(snapshotSegmentTopicName).get();
+                segmentWriter = transactionBufferSnapshotSegmentService
+                
.getReferenceWriter(snapshotSegmentTopicName.getNamespaceObject()).getFuture().get();
 
         // write two snapshot to snapshot segment topic
         TransactionBufferSnapshotSegment snapshot =
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 20aeac0ed64..c3533e70cf8 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -86,6 +86,7 @@ import org.apache.pulsar.broker.service.BacklogQuotaManager;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.BrokerServiceException;
 import org.apache.pulsar.broker.service.SystemTopicTxnBufferSnapshotService;
+import 
org.apache.pulsar.broker.service.SystemTopicTxnBufferSnapshotService.ReferenceCountedWriter;
 import org.apache.pulsar.broker.service.Topic;
 import 
org.apache.pulsar.broker.service.TransactionBufferSnapshotServiceFactory;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
@@ -1532,8 +1533,10 @@ public class TransactionTest extends TransactionTestBase 
{
                 = mock(SystemTopicTxnBufferSnapshotService.class);
         SystemTopicClient.Writer<TransactionBufferSnapshot> writer = 
mock(SystemTopicClient.Writer.class);
         
when(writer.closeAsync()).thenReturn(CompletableFuture.completedFuture(null));
-        when(systemTopicTxnBufferSnapshotService.createWriter(any()))
-                .thenReturn(CompletableFuture.completedFuture(writer));
+        ReferenceCountedWriter<TransactionBufferSnapshot> refCounterWriter = 
mock(ReferenceCountedWriter.class);
+        
doReturn(CompletableFuture.completedFuture(writer)).when(refCounterWriter).getFuture();
+        when(systemTopicTxnBufferSnapshotService.getReferenceWriter(any()))
+                .thenReturn(refCounterWriter);
         TransactionBufferSnapshotServiceFactory 
transactionBufferSnapshotServiceFactory =
                 mock(TransactionBufferSnapshotServiceFactory.class);
         
when(transactionBufferSnapshotServiceFactory.getTxnBufferSnapshotService())
@@ -1676,4 +1679,82 @@ public class TransactionTest extends TransactionTestBase 
{
         admin.namespaces().deleteNamespace(namespace, true);
     }
 
+
+    @Test(timeOut = 10_000)
+    public void testTBSnapshotWriter() throws Exception {
+        String namespace = TENANT + "/ns-" + 
RandomStringUtils.randomAlphabetic(5);
+        admin.namespaces().createNamespace(namespace, 16);
+        String topic = namespace + "/test-create-snapshot-writer-failed";
+        int partitionCount = 20;
+        admin.topics().createPartitionedTopic(topic, partitionCount);
+
+        Class<SystemTopicTxnBufferSnapshotService> clazz = 
SystemTopicTxnBufferSnapshotService.class;
+        Field field = clazz.getDeclaredField("refCountedWriterMap");
+        field.setAccessible(true);
+        // inject a failed writer future
+        CompletableFuture<SystemTopicClient.Writer<?>> writerFuture = new 
CompletableFuture<>();
+        for (PulsarService pulsarService : pulsarServiceList) {
+            SystemTopicTxnBufferSnapshotService bufferSnapshotService =
+                    
pulsarService.getTransactionBufferSnapshotServiceFactory().getTxnBufferSnapshotService();
+            ConcurrentHashMap<NamespaceName, ReferenceCountedWriter> 
writerMap1 =
+                    ((ConcurrentHashMap<NamespaceName, 
ReferenceCountedWriter>) field.get(bufferSnapshotService));
+            ReferenceCountedWriter failedCountedWriter =
+                    new ReferenceCountedWriter(NamespaceName.get(namespace), 
writerFuture, bufferSnapshotService);
+            writerMap1.put(NamespaceName.get(namespace), failedCountedWriter);
+
+            SystemTopicTxnBufferSnapshotService segmentSnapshotService =
+                    
pulsarService.getTransactionBufferSnapshotServiceFactory().getTxnBufferSnapshotSegmentService();
+            ConcurrentHashMap<NamespaceName, ReferenceCountedWriter> 
writerMap2 =
+                    ((ConcurrentHashMap<NamespaceName, 
ReferenceCountedWriter>) field.get(segmentSnapshotService));
+            ReferenceCountedWriter failedCountedWriter2 =
+                    new ReferenceCountedWriter(NamespaceName.get(namespace), 
writerFuture, segmentSnapshotService);
+            writerMap2.put(NamespaceName.get(namespace), failedCountedWriter2);
+
+            SystemTopicTxnBufferSnapshotService indexSnapshotService =
+                    
pulsarService.getTransactionBufferSnapshotServiceFactory().getTxnBufferSnapshotIndexService();
+            ConcurrentHashMap<NamespaceName, ReferenceCountedWriter> 
writerMap3 =
+                    ((ConcurrentHashMap<NamespaceName, 
ReferenceCountedWriter>) field.get(indexSnapshotService));
+            ReferenceCountedWriter failedCountedWriter3 =
+                    new ReferenceCountedWriter(NamespaceName.get(namespace), 
writerFuture, indexSnapshotService);
+            writerMap3.put(NamespaceName.get(namespace), failedCountedWriter3);
+        }
+
+        CompletableFuture<Producer<byte[]>> producerFuture = 
pulsarClient.newProducer()
+                .topic(topic)
+                .sendTimeout(0, TimeUnit.SECONDS)
+                .createAsync();
+        getTopic("persistent://" + topic + "-partition-0");
+        Thread.sleep(3000);
+        // the producer shouldn't be created, because the transaction buffer 
snapshot writer future didn't finish.
+        assertFalse(producerFuture.isDone());
+
+        // The topic will be closed, because the transaction buffer snapshot 
writer future is failed,
+        // the failed writer future will be removed, the producer will be 
reconnected and work well.
+        writerFuture.completeExceptionally(new 
PulsarClientException.TopicTerminatedException("failed writer"));
+        Producer<byte[]> producer = producerFuture.get();
+
+        for (int i = 0; i < partitionCount * 2; i++) {
+            Transaction txn = pulsarClient.newTransaction()
+                    .withTransactionTimeout(1, TimeUnit.MINUTES).build().get();
+            producer.newMessage(txn).value("test".getBytes()).sendAsync();
+            txn.commit().get();
+        }
+        checkSnapshotPublisherCount(namespace, 1);
+        producer.close();
+        admin.topics().unload(topic);
+        checkSnapshotPublisherCount(namespace, 0);
+    }
+
+    private void getTopic(String topicName) {
+        Awaitility.await().atMost(5, TimeUnit.SECONDS).pollInterval(100, 
TimeUnit.MILLISECONDS)
+                .until(() -> {
+                    for (PulsarService pulsarService : pulsarServiceList) {
+                        if 
(pulsarService.getBrokerService().getTopicReference(topicName).isPresent()) {
+                            return true;
+                        }
+                    }
+                    return false;
+                });
+    }
+
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
index fd49354342f..f45eda8d21f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
@@ -41,12 +41,17 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.SystemTopicNames;
+import org.apache.pulsar.common.naming.TopicDomain;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.PublisherStats;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.policies.data.TopicType;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.tests.TestRetrySupport;
+import org.awaitility.Awaitility;
+import org.testng.Assert;
 
 @Slf4j
 public abstract class TransactionTestBase extends TestRetrySupport {
@@ -223,4 +228,19 @@ public abstract class TransactionTestBase extends 
TestRetrySupport {
             throws Exception {
         MockedPulsarServiceBaseTest.deleteNamespaceWithRetry(ns, force, admin, 
pulsarServiceList);
     }
+
+    public void checkSnapshotPublisherCount(String namespace, int expectCount) 
{
+        TopicName snTopicName = TopicName.get(TopicDomain.persistent.value(), 
NamespaceName.get(namespace),
+                SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT);
+        Awaitility.await()
+                .atMost(5, TimeUnit.SECONDS)
+                .pollInterval(100, TimeUnit.MILLISECONDS)
+                .untilAsserted(() -> {
+                    List<PublisherStats> publisherStatsList =
+                            (List<PublisherStats>) admin.topics()
+                                    
.getStats(snTopicName.getPartitionedTopicName()).getPublishers();
+                    Assert.assertEquals(publisherStatsList.size(), 
expectCount);
+                });
+    }
+
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferCloseTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferCloseTest.java
index cd3a14da596..e92cf29521e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferCloseTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferCloseTest.java
@@ -19,6 +19,9 @@
 package org.apache.pulsar.broker.transaction.buffer;
 
 import com.google.common.collect.Sets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.pulsar.broker.transaction.TransactionTestBase;
@@ -26,20 +29,13 @@ import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
-import org.apache.pulsar.common.naming.NamespaceName;
-import org.apache.pulsar.common.naming.SystemTopicNames;
-import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.policies.data.PublisherStats;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.awaitility.Awaitility;
-import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
 
 /**
  * Transaction buffer close test.
@@ -71,49 +67,62 @@ public class TransactionBufferCloseTest extends 
TransactionTestBase {
 
     @Test(timeOut = 10_000, dataProvider = "isPartition")
     public void deleteTopicCloseTransactionBufferTest(boolean isPartition) 
throws Exception {
-        int expectedCount = isPartition ? 30 : 1;
-        TopicName topicName = createAndLoadTopic(isPartition, expectedCount);
-        checkSnapshotPublisherCount(topicName.getNamespace(), expectedCount);
+        int partitionCount = isPartition ? 30 : 1;
+        List<TopicName> topicNames = createAndLoadTopics(isPartition, 
partitionCount);
+        String namespaceName = topicNames.get(0).getNamespace();
+        checkSnapshotPublisherCount(namespaceName, 1);
+
+        for (int i = 0; i < topicNames.size(); i++) {
+            deleteTopic(isPartition, topicNames.get(i));
+            // When delete all topics of the namespace, the publisher count 
should be 0.
+            int expectCount = i == topicNames.size() - 1 ? 0 : 1;
+            checkSnapshotPublisherCount(namespaceName, expectCount);
+        }
+    }
+
+    private void deleteTopic(boolean isPartition, TopicName topicName) throws 
PulsarAdminException {
         if (isPartition) {
             
admin.topics().deletePartitionedTopic(topicName.getPartitionedTopicName(), 
true);
         } else {
             admin.topics().delete(topicName.getPartitionedTopicName(), true);
         }
-        checkSnapshotPublisherCount(topicName.getNamespace(), 0);
     }
 
     @Test(timeOut = 10_000, dataProvider = "isPartition")
     public void unloadTopicCloseTransactionBufferTest(boolean isPartition) 
throws Exception {
-        int expectedCount = isPartition ? 30 : 1;
-        TopicName topicName = createAndLoadTopic(isPartition, expectedCount);
-        checkSnapshotPublisherCount(topicName.getNamespace(), expectedCount);
-        admin.topics().unload(topicName.getPartitionedTopicName());
-        checkSnapshotPublisherCount(topicName.getNamespace(), 0);
+        int partitionCount = isPartition ? 30 : 1;
+        List<TopicName> topicNames = createAndLoadTopics(isPartition, 
partitionCount);
+        String namespaceName = topicNames.get(0).getNamespace();
+        checkSnapshotPublisherCount(namespaceName, 1);
+
+        for (int i = 0; i < topicNames.size(); i++) {
+            admin.topics().unload(topicNames.get(i).getPartitionedTopicName());
+            // When unload all topics of the namespace, the publisher count 
should be 0.
+            int expectCount = i == topicNames.size() - 1 ? 0 : 1;
+            checkSnapshotPublisherCount(namespaceName, expectCount);
+        }
     }
 
-    private TopicName createAndLoadTopic(boolean isPartition, int 
partitionCount)
+    private List<TopicName> createAndLoadTopics(boolean isPartition, int 
partitionCount)
             throws PulsarAdminException, PulsarClientException {
         String namespace = TENANT + "/ns-" + 
RandomStringUtils.randomAlphabetic(5);
         admin.namespaces().createNamespace(namespace, 3);
-        String topic = namespace + "/tb-close-test-";
-        if (isPartition) {
-            admin.topics().createPartitionedTopic(topic, partitionCount);
-        }
-        pulsarClient.newProducer()
-                .topic(topic)
-                .sendTimeout(0, TimeUnit.SECONDS)
-                .create()
-                .close();
-        return TopicName.get(topic);
-    }
+        String topic = namespace + "/tb-close-test";
+        List<TopicName> topics = new ArrayList<>();
 
-    private void checkSnapshotPublisherCount(String namespace, int 
expectCount) throws PulsarAdminException {
-        TopicName snTopicName = TopicName.get(TopicDomain.persistent.value(), 
NamespaceName.get(namespace),
-                SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT);
-        List<PublisherStats> publisherStatsList =
-                (List<PublisherStats>) admin.topics()
-                        
.getStats(snTopicName.getPartitionedTopicName()).getPublishers();
-        Assert.assertEquals(publisherStatsList.size(), expectCount);
+        for (int i = 0; i < 2; i++) {
+            String t = topic + "-" + i;
+            if (isPartition) {
+                admin.topics().createPartitionedTopic(t, partitionCount);
+            }
+            pulsarClient.newProducer()
+                    .topic(t)
+                    .sendTimeout(0, TimeUnit.SECONDS)
+                    .create()
+                    .close();
+            topics.add(TopicName.get(t));
+        }
+        return topics;
     }
 
 }


Reply via email to