This is an automated email from the ASF dual-hosted git repository. rgao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 5d2d61e298e [feat][txn] Transaction buffer snapshot writer reuse (#19641) 5d2d61e298e is described below commit 5d2d61e298e337b93fb178da191bc41d311107c2 Author: ran <r...@streamnative.io> AuthorDate: Mon Mar 20 17:41:20 2023 +0800 [feat][txn] Transaction buffer snapshot writer reuse (#19641) --- .../SystemTopicTxnBufferSnapshotService.java | 100 +++++++++++++++++---- .../SingleSnapshotAbortedTxnProcessorImpl.java | 21 +++-- .../SnapshotSegmentAbortedTxnProcessorImpl.java | 45 +++++----- .../SegmentAbortedTxnProcessorTest.java | 19 ++-- .../TopicTransactionBufferRecoverTest.java | 17 ++-- .../pulsar/broker/transaction/TransactionTest.java | 85 +++++++++++++++++- .../broker/transaction/TransactionTestBase.java | 20 +++++ .../buffer/TransactionBufferCloseTest.java | 79 ++++++++-------- 8 files changed, 293 insertions(+), 93 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicTxnBufferSnapshotService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicTxnBufferSnapshotService.java index a1b78d89a13..332d754cf97 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicTxnBufferSnapshotService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicTxnBufferSnapshotService.java @@ -21,63 +21,127 @@ package org.apache.pulsar.broker.service; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; +import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory; import org.apache.pulsar.broker.systopic.SystemTopicClient; import org.apache.pulsar.broker.systopic.SystemTopicClientBase; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.common.events.EventType; +import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; -import org.apache.pulsar.common.util.FutureUtil; +@Slf4j public class SystemTopicTxnBufferSnapshotService<T> { - protected final Map<TopicName, SystemTopicClient<T>> clients; + protected final ConcurrentHashMap<NamespaceName, SystemTopicClient<T>> clients; protected final NamespaceEventsSystemTopicFactory namespaceEventsSystemTopicFactory; protected final Class<T> schemaType; protected final EventType systemTopicType; + private final ConcurrentHashMap<NamespaceName, ReferenceCountedWriter<T>> refCountedWriterMap; + + // The class ReferenceCountedWriter will maintain the reference count, + // when the reference count decrement to 0, it will be removed from writerFutureMap, the writer will be closed. + public static class ReferenceCountedWriter<T> { + + private final AtomicLong referenceCount; + private final NamespaceName namespaceName; + private final CompletableFuture<SystemTopicClient.Writer<T>> future; + private final SystemTopicTxnBufferSnapshotService<T> snapshotService; + + public ReferenceCountedWriter(NamespaceName namespaceName, + CompletableFuture<SystemTopicClient.Writer<T>> future, + SystemTopicTxnBufferSnapshotService<T> snapshotService) { + this.referenceCount = new AtomicLong(1); + this.namespaceName = namespaceName; + this.snapshotService = snapshotService; + this.future = future; + this.future.exceptionally(t -> { + log.error("[{}] Failed to create TB snapshot writer.", namespaceName, t); + snapshotService.refCountedWriterMap.remove(namespaceName, this); + return null; + }); + } + + public CompletableFuture<SystemTopicClient.Writer<T>> getFuture() { + return future; + } + + private synchronized boolean retain() { + return this.referenceCount.incrementAndGet() > 0; + } + + public synchronized void release() { + if (this.referenceCount.decrementAndGet() == 0) { + snapshotService.refCountedWriterMap.remove(namespaceName, this); + future.thenAccept(writer -> { + final String topicName = writer.getSystemTopicClient().getTopicName().toString(); + writer.closeAsync().exceptionally(t -> { + if (t != null) { + log.error("[{}] Failed to close TB snapshot writer.", topicName, t); + } else { + if (log.isDebugEnabled()) { + log.debug("[{}] Success to close TB snapshot writer.", topicName); + } + } + return null; + }); + }); + } + } + + } + public SystemTopicTxnBufferSnapshotService(PulsarClient client, EventType systemTopicType, Class<T> schemaType) { this.namespaceEventsSystemTopicFactory = new NamespaceEventsSystemTopicFactory(client); this.systemTopicType = systemTopicType; this.schemaType = schemaType; this.clients = new ConcurrentHashMap<>(); - } - - public CompletableFuture<SystemTopicClient.Writer<T>> createWriter(TopicName topicName) { - return getTransactionBufferSystemTopicClient(topicName).thenCompose(SystemTopicClient::newWriterAsync); + this.refCountedWriterMap = new ConcurrentHashMap<>(); } public CompletableFuture<SystemTopicClient.Reader<T>> createReader(TopicName topicName) { - return getTransactionBufferSystemTopicClient(topicName).thenCompose(SystemTopicClient::newReaderAsync); + return getTransactionBufferSystemTopicClient(topicName.getNamespaceObject()).newReaderAsync(); } public void removeClient(TopicName topicName, SystemTopicClientBase<T> transactionBufferSystemTopicClient) { if (transactionBufferSystemTopicClient.getReaders().size() == 0 && transactionBufferSystemTopicClient.getWriters().size() == 0) { - clients.remove(topicName); + clients.remove(topicName.getNamespaceObject()); } } - protected CompletableFuture<SystemTopicClient<T>> getTransactionBufferSystemTopicClient(TopicName topicName) { + public ReferenceCountedWriter<T> getReferenceWriter(NamespaceName namespaceName) { + return refCountedWriterMap.compute(namespaceName, (k, v) -> { + if (v != null && v.retain()) { + return v; + } else { + return new ReferenceCountedWriter<>(namespaceName, + getTransactionBufferSystemTopicClient(namespaceName).newWriterAsync(), this); + } + }); + } + + private SystemTopicClient<T> getTransactionBufferSystemTopicClient(NamespaceName namespaceName) { TopicName systemTopicName = NamespaceEventsSystemTopicFactory - .getSystemTopicName(topicName.getNamespaceObject(), systemTopicType); + .getSystemTopicName(namespaceName, systemTopicType); if (systemTopicName == null) { - return FutureUtil.failedFuture( - new PulsarClientException - .InvalidTopicNameException("Can't create SystemTopicBaseTxnBufferSnapshotIndexService, " - + "because the topicName is null!")); + throw new RuntimeException(new PulsarClientException.InvalidTopicNameException( + "Can't get the TB system topic client for namespace " + namespaceName + + " with type " + systemTopicType + ".")); } - return CompletableFuture.completedFuture(clients.computeIfAbsent(systemTopicName, + + return clients.computeIfAbsent(namespaceName, (v) -> namespaceEventsSystemTopicFactory - .createTransactionBufferSystemTopicClient(systemTopicName, - this, schemaType))); + .createTransactionBufferSystemTopicClient(systemTopicName, this, schemaType)); } public void close() throws Exception { - for (Map.Entry<TopicName, SystemTopicClient<T>> entry : clients.entrySet()) { + for (Map.Entry<NamespaceName, SystemTopicClient<T>> entry : clients.entrySet()) { entry.getValue().close(); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java index 87161e97512..5d582d564ea 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java @@ -28,6 +28,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.commons.collections4.map.LinkedMap; import org.apache.pulsar.broker.service.BrokerServiceException; +import org.apache.pulsar.broker.service.SystemTopicTxnBufferSnapshotService.ReferenceCountedWriter; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.systopic.SystemTopicClient; import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor; @@ -42,7 +43,7 @@ import org.apache.pulsar.common.util.FutureUtil; @Slf4j public class SingleSnapshotAbortedTxnProcessorImpl implements AbortedTxnProcessor { private final PersistentTopic topic; - private final CompletableFuture<SystemTopicClient.Writer<TransactionBufferSnapshot>> takeSnapshotWriter; + private final ReferenceCountedWriter<TransactionBufferSnapshot> takeSnapshotWriter; /** * Aborts, map for jude message is aborted, linked for remove abort txn in memory when this * position have been deleted. @@ -51,12 +52,14 @@ public class SingleSnapshotAbortedTxnProcessorImpl implements AbortedTxnProcesso private volatile long lastSnapshotTimestamps; + private volatile boolean isClosed = false; + public SingleSnapshotAbortedTxnProcessorImpl(PersistentTopic topic) { this.topic = topic; this.takeSnapshotWriter = this.topic.getBrokerService().getPulsar() .getTransactionBufferSnapshotServiceFactory() - .getTxnBufferSnapshotService().createWriter(TopicName.get(topic.getName())); - this.takeSnapshotWriter.exceptionally((ex) -> { + .getTxnBufferSnapshotService().getReferenceWriter(TopicName.get(topic.getName()).getNamespaceObject()); + this.takeSnapshotWriter.getFuture().exceptionally((ex) -> { log.error("{} Failed to create snapshot writer", topic.getName()); topic.close(); return null; @@ -132,7 +135,7 @@ public class SingleSnapshotAbortedTxnProcessorImpl implements AbortedTxnProcesso @Override public CompletableFuture<Void> clearAbortedTxnSnapshot() { - return this.takeSnapshotWriter.thenCompose(writer -> { + return this.takeSnapshotWriter.getFuture().thenCompose(writer -> { TransactionBufferSnapshot snapshot = new TransactionBufferSnapshot(); snapshot.setTopicName(topic.getName()); return writer.deleteAsync(snapshot.getTopicName(), snapshot); @@ -141,7 +144,7 @@ public class SingleSnapshotAbortedTxnProcessorImpl implements AbortedTxnProcesso @Override public CompletableFuture<Void> takeAbortedTxnsSnapshot(PositionImpl maxReadPosition) { - return takeSnapshotWriter.thenCompose(writer -> { + return takeSnapshotWriter.getFuture().thenCompose(writer -> { TransactionBufferSnapshot snapshot = new TransactionBufferSnapshot(); snapshot.setTopicName(topic.getName()); snapshot.setMaxReadPositionLedgerId(maxReadPosition.getLedgerId()); @@ -175,8 +178,12 @@ public class SingleSnapshotAbortedTxnProcessorImpl implements AbortedTxnProcesso } @Override - public CompletableFuture<Void> closeAsync() { - return takeSnapshotWriter.thenCompose(SystemTopicClient.Writer::closeAsync); + public synchronized CompletableFuture<Void> closeAsync() { + if (!isClosed) { + isClosed = true; + takeSnapshotWriter.release(); + } + return CompletableFuture.completedFuture(null); } private void closeReader(SystemTopicClient.Reader<TransactionBufferSnapshot> reader) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java index 7a9e0e1abed..751c03aff95 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java @@ -43,6 +43,7 @@ import org.apache.commons.collections4.map.LinkedMap; import org.apache.commons.lang3.tuple.MutablePair; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.service.BrokerServiceException; +import org.apache.pulsar.broker.service.SystemTopicTxnBufferSnapshotService.ReferenceCountedWriter; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.systopic.SystemTopicClient; import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor; @@ -442,10 +443,10 @@ public class SnapshotSegmentAbortedTxnProcessorImpl implements AbortedTxnProcess private final PersistentTopic topic; //Persistent snapshot segment and index at the single thread. - private final CompletableFuture<SystemTopicClient.Writer<TransactionBufferSnapshotSegment>> - snapshotSegmentsWriterFuture; - private final CompletableFuture<SystemTopicClient.Writer<TransactionBufferSnapshotIndexes>> - snapshotIndexWriterFuture; + private final ReferenceCountedWriter<TransactionBufferSnapshotSegment> snapshotSegmentsWriter; + private final ReferenceCountedWriter<TransactionBufferSnapshotIndexes> snapshotIndexWriter; + + private volatile boolean closed = false; private enum OperationState { None, @@ -470,18 +471,20 @@ public class SnapshotSegmentAbortedTxnProcessorImpl implements AbortedTxnProcess public PersistentWorker(PersistentTopic topic) { this.topic = topic; - this.snapshotSegmentsWriterFuture = this.topic.getBrokerService().getPulsar() + this.snapshotSegmentsWriter = this.topic.getBrokerService().getPulsar() .getTransactionBufferSnapshotServiceFactory() - .getTxnBufferSnapshotSegmentService().createWriter(TopicName.get(topic.getName())); - this.snapshotSegmentsWriterFuture.exceptionally(ex -> { + .getTxnBufferSnapshotSegmentService() + .getReferenceWriter(TopicName.get(topic.getName()).getNamespaceObject()); + this.snapshotSegmentsWriter.getFuture().exceptionally(ex -> { log.error("{} Failed to create snapshot index writer", topic.getName()); topic.close(); return null; }); - this.snapshotIndexWriterFuture = this.topic.getBrokerService().getPulsar() + this.snapshotIndexWriter = this.topic.getBrokerService().getPulsar() .getTransactionBufferSnapshotServiceFactory() - .getTxnBufferSnapshotIndexService().createWriter(TopicName.get(topic.getName())); - this.snapshotIndexWriterFuture.exceptionally((ex) -> { + .getTxnBufferSnapshotIndexService() + .getReferenceWriter(TopicName.get(topic.getName()).getNamespaceObject()); + this.snapshotIndexWriter.getFuture().exceptionally((ex) -> { log.error("{} Failed to create snapshot writer", topic.getName()); topic.close(); return null; @@ -631,7 +634,7 @@ public class SnapshotSegmentAbortedTxnProcessorImpl implements AbortedTxnProcess transactionBufferSnapshotSegment.setPersistentPositionLedgerId( abortedMarkerPersistentPosition.getLedgerId()); - return snapshotSegmentsWriterFuture.thenCompose(segmentWriter -> { + return snapshotSegmentsWriter.getFuture().thenCompose(segmentWriter -> { transactionBufferSnapshotSegment.setSequenceId(this.sequenceID.get()); return segmentWriter.writeAsync(buildKey(this.sequenceID.get()), transactionBufferSnapshotSegment); }).thenCompose((messageId) -> { @@ -668,7 +671,7 @@ public class SnapshotSegmentAbortedTxnProcessorImpl implements AbortedTxnProcess List<CompletableFuture<Void>> results = new ArrayList<>(); for (PositionImpl positionNeedToDelete : positionNeedToDeletes) { long sequenceIdNeedToDelete = indexes.get(positionNeedToDelete).getSequenceID(); - CompletableFuture<Void> res = snapshotSegmentsWriterFuture + CompletableFuture<Void> res = snapshotSegmentsWriter.getFuture() .thenCompose(writer -> writer.deleteAsync(buildKey(sequenceIdNeedToDelete), null)) .thenCompose(messageId -> { if (log.isDebugEnabled()) { @@ -695,7 +698,7 @@ public class SnapshotSegmentAbortedTxnProcessorImpl implements AbortedTxnProcess private CompletableFuture<Void> updateSnapshotIndex(TransactionBufferSnapshotIndexesMetadata snapshotSegment) { TransactionBufferSnapshotIndexes snapshotIndexes = new TransactionBufferSnapshotIndexes(); - CompletableFuture<Void> res = snapshotIndexWriterFuture + CompletableFuture<Void> res = snapshotIndexWriter.getFuture() .thenCompose((indexesWriter) -> { snapshotIndexes.setIndexList(indexes.values().stream().toList()); snapshotIndexes.setSnapshot(snapshotSegment); @@ -712,7 +715,7 @@ public class SnapshotSegmentAbortedTxnProcessorImpl implements AbortedTxnProcess private CompletableFuture<Void> clearSnapshotSegmentAndIndexes() { CompletableFuture<Void> res = persistentWorker.clearAllSnapshotSegments() - .thenCompose((ignore) -> snapshotIndexWriterFuture + .thenCompose((ignore) -> snapshotIndexWriter.getFuture() .thenCompose(indexesWriter -> indexesWriter.writeAsync(topic.getName(), null))) .thenRun(() -> log.debug("Successes to clear the snapshot segment and indexes for the topic [{}]", @@ -747,7 +750,7 @@ public class SnapshotSegmentAbortedTxnProcessorImpl implements AbortedTxnProcess Message<TransactionBufferSnapshotSegment> message = reader.readNextAsync() .get(getSystemClientOperationTimeoutMs(), TimeUnit.MILLISECONDS); if (topic.getName().equals(message.getValue().getTopicName())) { - snapshotSegmentsWriterFuture.get().write(message.getKey(), null); + snapshotSegmentsWriter.getFuture().get().write(message.getKey(), null); } } return CompletableFuture.completedFuture(null); @@ -760,11 +763,13 @@ public class SnapshotSegmentAbortedTxnProcessorImpl implements AbortedTxnProcess }); } - - CompletableFuture<Void> closeAsync() { - return CompletableFuture.allOf( - this.snapshotIndexWriterFuture.thenCompose(SystemTopicClient.Writer::closeAsync), - this.snapshotSegmentsWriterFuture.thenCompose(SystemTopicClient.Writer::closeAsync)); + synchronized CompletableFuture<Void> closeAsync() { + if (!closed) { + closed = true; + snapshotSegmentsWriter.release(); + snapshotIndexWriter.release(); + } + return CompletableFuture.completedFuture(null); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/SegmentAbortedTxnProcessorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/SegmentAbortedTxnProcessorTest.java index ffc059de8e6..c157d7cf8c5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/SegmentAbortedTxnProcessorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/SegmentAbortedTxnProcessorTest.java @@ -19,6 +19,8 @@ package org.apache.pulsar.broker.transaction; import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; + import java.lang.reflect.Field; import java.util.LinkedList; import java.util.NavigableMap; @@ -34,6 +36,7 @@ import org.apache.commons.collections4.map.LinkedMap; import org.apache.commons.lang3.tuple.MutablePair; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.service.BrokerServiceException; +import org.apache.pulsar.broker.service.SystemTopicTxnBufferSnapshotService.ReferenceCountedWriter; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory; import org.apache.pulsar.broker.systopic.SystemTopicClient; @@ -125,6 +128,7 @@ public class SegmentAbortedTxnProcessorTest extends TransactionTestBase { newProcessor.trimExpiredAbortedTxns(); //4. Verify the two sealed segment will be deleted. Awaitility.await().untilAsserted(() -> verifyAbortedTxnIDAndSegmentIndex(newProcessor, 11, 4)); + processor.closeAsync().get(5, TimeUnit.SECONDS); } private void waitTaskExecuteCompletely(AbortedTxnProcessor processor) throws Exception { @@ -177,8 +181,11 @@ public class SegmentAbortedTxnProcessorTest extends TransactionTestBase { new MutablePair<>(new CompletableFuture<>(), task))); try { processor.takeAbortedTxnsSnapshot(new PositionImpl(1, 10)).get(2, TimeUnit.SECONDS); + fail("The update index operation should fail."); } catch (Exception e) { Assert.assertTrue(e.getCause() instanceof BrokerServiceException.ServiceUnitNotReadyException); + } finally { + processor.closeAsync().get(5, TimeUnit.SECONDS); } } @@ -200,12 +207,13 @@ public class SegmentAbortedTxnProcessorTest extends TransactionTestBase { SnapshotSegmentAbortedTxnProcessorImpl.PersistentWorker worker = (SnapshotSegmentAbortedTxnProcessorImpl.PersistentWorker) field.get(processor); Field indexWriteFutureField = SnapshotSegmentAbortedTxnProcessorImpl - .PersistentWorker.class.getDeclaredField("snapshotIndexWriterFuture"); + .PersistentWorker.class.getDeclaredField("snapshotIndexWriter"); indexWriteFutureField.setAccessible(true); - CompletableFuture<SystemTopicClient.Writer<TransactionBufferSnapshotIndexes>> snapshotIndexWriterFuture = - (CompletableFuture<SystemTopicClient.Writer<TransactionBufferSnapshotIndexes>>) - indexWriteFutureField.get(worker); - snapshotIndexWriterFuture.get().close(); + ReferenceCountedWriter<TransactionBufferSnapshotIndexes> snapshotIndexWriter = + (ReferenceCountedWriter<TransactionBufferSnapshotIndexes>) indexWriteFutureField.get(worker); + snapshotIndexWriter.release(); + // After release, the writer should be closed, call close method again to make sure the writer was closed. + snapshotIndexWriter.getFuture().get().close(); //3. Try to write a snapshot segment that will fail to update indexes. for (int j = 0; j < SEGMENT_SIZE; j++) { TxnID txnID = new TxnID(0, j); @@ -233,6 +241,7 @@ public class SegmentAbortedTxnProcessorTest extends TransactionTestBase { //7. Verify the snapshot segments and index after clearing. verifySnapshotSegmentsSize(PROCESSOR_TOPIC, 0); verifySnapshotSegmentsIndexSize(PROCESSOR_TOPIC, 1); + processor.closeAsync().get(5, TimeUnit.SECONDS); } private void verifySnapshotSegmentsSize(String topic, int size) throws Exception { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java index d4ddb26e014..2d6622571c0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java @@ -57,6 +57,7 @@ import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.service.AbstractTopic; import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.SystemTopicTxnBufferSnapshotService; +import org.apache.pulsar.broker.service.SystemTopicTxnBufferSnapshotService.ReferenceCountedWriter; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.TransactionBufferSnapshotServiceFactory; import org.apache.pulsar.broker.service.persistent.PersistentTopic; @@ -602,11 +603,12 @@ public class TopicTransactionBufferRecoverTest extends TransactionTestBase { mock(SystemTopicTxnBufferSnapshotService.class); SystemTopicClient.Reader<TransactionBufferSnapshot> reader = mock(SystemTopicClient.Reader.class); SystemTopicClient.Writer<TransactionBufferSnapshot> writer = mock(SystemTopicClient.Writer.class); + ReferenceCountedWriter<TransactionBufferSnapshot> refCounterWriter = mock(ReferenceCountedWriter.class); + doReturn(CompletableFuture.completedFuture(writer)).when(refCounterWriter).getFuture(); doReturn(CompletableFuture.completedFuture(reader)) .when(systemTopicTxnBufferSnapshotService).createReader(any()); - doReturn(CompletableFuture.completedFuture(writer)) - .when(systemTopicTxnBufferSnapshotService).createWriter(any()); + doReturn(refCounterWriter).when(systemTopicTxnBufferSnapshotService).getReferenceWriter(any()); TransactionBufferSnapshotServiceFactory transactionBufferSnapshotServiceFactory = mock(TransactionBufferSnapshotServiceFactory.class); doReturn(systemTopicTxnBufferSnapshotService) @@ -645,8 +647,9 @@ public class TopicTransactionBufferRecoverTest extends TransactionTestBase { originalTopic = (PersistentTopic) getPulsarServiceList().get(0) .getBrokerService().getTopic(TopicName.get(topic).toString(), false).get().get(); // mock create writer fail - doReturn(FutureUtil.failedFuture(new PulsarClientException("test"))) - .when(systemTopicTxnBufferSnapshotService).createWriter(any()); + ReferenceCountedWriter<TransactionBufferSnapshot> failedCountedWriter = mock(ReferenceCountedWriter.class); + doReturn(FutureUtil.failedFuture(new PulsarClientException("test"))).when(failedCountedWriter).getFuture(); + doReturn(failedCountedWriter).when(systemTopicTxnBufferSnapshotService).getReferenceWriter(any()); checkCloseTopic(pulsarClient, transactionBufferSnapshotServiceFactoryOriginal, transactionBufferSnapshotServiceFactory, originalTopic, field, producer); } @@ -703,7 +706,8 @@ public class TopicTransactionBufferRecoverTest extends TransactionTestBase { new TransactionBufferSnapshotServiceFactory(pulsarClient).getTxnBufferSnapshotIndexService(); SystemTopicClient.Writer<TransactionBufferSnapshotIndexes> indexesWriter = - transactionBufferSnapshotIndexService.createWriter(TopicName.get(SNAPSHOT_INDEX)).get(); + transactionBufferSnapshotIndexService.getReferenceWriter( + TopicName.get(SNAPSHOT_INDEX).getNamespaceObject()).getFuture().get(); SystemTopicClient.Reader<TransactionBufferSnapshotIndexes> indexesReader = transactionBufferSnapshotIndexService.createReader(TopicName.get(SNAPSHOT_INDEX)).get(); @@ -764,7 +768,8 @@ public class TopicTransactionBufferRecoverTest extends TransactionTestBase { new TransactionBufferSnapshotServiceFactory(pulsarClient).getTxnBufferSnapshotSegmentService(); SystemTopicClient.Writer<TransactionBufferSnapshotSegment> - segmentWriter = transactionBufferSnapshotSegmentService.createWriter(snapshotSegmentTopicName).get(); + segmentWriter = transactionBufferSnapshotSegmentService + .getReferenceWriter(snapshotSegmentTopicName.getNamespaceObject()).getFuture().get(); // write two snapshot to snapshot segment topic TransactionBufferSnapshotSegment snapshot = diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java index 20aeac0ed64..c3533e70cf8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java @@ -86,6 +86,7 @@ import org.apache.pulsar.broker.service.BacklogQuotaManager; import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.BrokerServiceException; import org.apache.pulsar.broker.service.SystemTopicTxnBufferSnapshotService; +import org.apache.pulsar.broker.service.SystemTopicTxnBufferSnapshotService.ReferenceCountedWriter; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.TransactionBufferSnapshotServiceFactory; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; @@ -1532,8 +1533,10 @@ public class TransactionTest extends TransactionTestBase { = mock(SystemTopicTxnBufferSnapshotService.class); SystemTopicClient.Writer<TransactionBufferSnapshot> writer = mock(SystemTopicClient.Writer.class); when(writer.closeAsync()).thenReturn(CompletableFuture.completedFuture(null)); - when(systemTopicTxnBufferSnapshotService.createWriter(any())) - .thenReturn(CompletableFuture.completedFuture(writer)); + ReferenceCountedWriter<TransactionBufferSnapshot> refCounterWriter = mock(ReferenceCountedWriter.class); + doReturn(CompletableFuture.completedFuture(writer)).when(refCounterWriter).getFuture(); + when(systemTopicTxnBufferSnapshotService.getReferenceWriter(any())) + .thenReturn(refCounterWriter); TransactionBufferSnapshotServiceFactory transactionBufferSnapshotServiceFactory = mock(TransactionBufferSnapshotServiceFactory.class); when(transactionBufferSnapshotServiceFactory.getTxnBufferSnapshotService()) @@ -1676,4 +1679,82 @@ public class TransactionTest extends TransactionTestBase { admin.namespaces().deleteNamespace(namespace, true); } + + @Test(timeOut = 10_000) + public void testTBSnapshotWriter() throws Exception { + String namespace = TENANT + "/ns-" + RandomStringUtils.randomAlphabetic(5); + admin.namespaces().createNamespace(namespace, 16); + String topic = namespace + "/test-create-snapshot-writer-failed"; + int partitionCount = 20; + admin.topics().createPartitionedTopic(topic, partitionCount); + + Class<SystemTopicTxnBufferSnapshotService> clazz = SystemTopicTxnBufferSnapshotService.class; + Field field = clazz.getDeclaredField("refCountedWriterMap"); + field.setAccessible(true); + // inject a failed writer future + CompletableFuture<SystemTopicClient.Writer<?>> writerFuture = new CompletableFuture<>(); + for (PulsarService pulsarService : pulsarServiceList) { + SystemTopicTxnBufferSnapshotService bufferSnapshotService = + pulsarService.getTransactionBufferSnapshotServiceFactory().getTxnBufferSnapshotService(); + ConcurrentHashMap<NamespaceName, ReferenceCountedWriter> writerMap1 = + ((ConcurrentHashMap<NamespaceName, ReferenceCountedWriter>) field.get(bufferSnapshotService)); + ReferenceCountedWriter failedCountedWriter = + new ReferenceCountedWriter(NamespaceName.get(namespace), writerFuture, bufferSnapshotService); + writerMap1.put(NamespaceName.get(namespace), failedCountedWriter); + + SystemTopicTxnBufferSnapshotService segmentSnapshotService = + pulsarService.getTransactionBufferSnapshotServiceFactory().getTxnBufferSnapshotSegmentService(); + ConcurrentHashMap<NamespaceName, ReferenceCountedWriter> writerMap2 = + ((ConcurrentHashMap<NamespaceName, ReferenceCountedWriter>) field.get(segmentSnapshotService)); + ReferenceCountedWriter failedCountedWriter2 = + new ReferenceCountedWriter(NamespaceName.get(namespace), writerFuture, segmentSnapshotService); + writerMap2.put(NamespaceName.get(namespace), failedCountedWriter2); + + SystemTopicTxnBufferSnapshotService indexSnapshotService = + pulsarService.getTransactionBufferSnapshotServiceFactory().getTxnBufferSnapshotIndexService(); + ConcurrentHashMap<NamespaceName, ReferenceCountedWriter> writerMap3 = + ((ConcurrentHashMap<NamespaceName, ReferenceCountedWriter>) field.get(indexSnapshotService)); + ReferenceCountedWriter failedCountedWriter3 = + new ReferenceCountedWriter(NamespaceName.get(namespace), writerFuture, indexSnapshotService); + writerMap3.put(NamespaceName.get(namespace), failedCountedWriter3); + } + + CompletableFuture<Producer<byte[]>> producerFuture = pulsarClient.newProducer() + .topic(topic) + .sendTimeout(0, TimeUnit.SECONDS) + .createAsync(); + getTopic("persistent://" + topic + "-partition-0"); + Thread.sleep(3000); + // the producer shouldn't be created, because the transaction buffer snapshot writer future didn't finish. + assertFalse(producerFuture.isDone()); + + // The topic will be closed, because the transaction buffer snapshot writer future is failed, + // the failed writer future will be removed, the producer will be reconnected and work well. + writerFuture.completeExceptionally(new PulsarClientException.TopicTerminatedException("failed writer")); + Producer<byte[]> producer = producerFuture.get(); + + for (int i = 0; i < partitionCount * 2; i++) { + Transaction txn = pulsarClient.newTransaction() + .withTransactionTimeout(1, TimeUnit.MINUTES).build().get(); + producer.newMessage(txn).value("test".getBytes()).sendAsync(); + txn.commit().get(); + } + checkSnapshotPublisherCount(namespace, 1); + producer.close(); + admin.topics().unload(topic); + checkSnapshotPublisherCount(namespace, 0); + } + + private void getTopic(String topicName) { + Awaitility.await().atMost(5, TimeUnit.SECONDS).pollInterval(100, TimeUnit.MILLISECONDS) + .until(() -> { + for (PulsarService pulsarService : pulsarServiceList) { + if (pulsarService.getBrokerService().getTopicReference(topicName).isPresent()) { + return true; + } + } + return false; + }); + } + } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java index fd49354342f..f45eda8d21f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java @@ -41,12 +41,17 @@ import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.SystemTopicNames; +import org.apache.pulsar.common.naming.TopicDomain; +import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.PublisherStats; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.policies.data.TopicType; import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.tests.TestRetrySupport; +import org.awaitility.Awaitility; +import org.testng.Assert; @Slf4j public abstract class TransactionTestBase extends TestRetrySupport { @@ -223,4 +228,19 @@ public abstract class TransactionTestBase extends TestRetrySupport { throws Exception { MockedPulsarServiceBaseTest.deleteNamespaceWithRetry(ns, force, admin, pulsarServiceList); } + + public void checkSnapshotPublisherCount(String namespace, int expectCount) { + TopicName snTopicName = TopicName.get(TopicDomain.persistent.value(), NamespaceName.get(namespace), + SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT); + Awaitility.await() + .atMost(5, TimeUnit.SECONDS) + .pollInterval(100, TimeUnit.MILLISECONDS) + .untilAsserted(() -> { + List<PublisherStats> publisherStatsList = + (List<PublisherStats>) admin.topics() + .getStats(snTopicName.getPartitionedTopicName()).getPublishers(); + Assert.assertEquals(publisherStatsList.size(), expectCount); + }); + } + } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferCloseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferCloseTest.java index cd3a14da596..e92cf29521e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferCloseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferCloseTest.java @@ -19,6 +19,9 @@ package org.apache.pulsar.broker.transaction.buffer; import com.google.common.collect.Sets; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.RandomStringUtils; import org.apache.pulsar.broker.transaction.TransactionTestBase; @@ -26,20 +29,13 @@ import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient; import org.apache.pulsar.client.impl.PulsarClientImpl; -import org.apache.pulsar.common.naming.NamespaceName; -import org.apache.pulsar.common.naming.SystemTopicNames; -import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; -import org.apache.pulsar.common.policies.data.PublisherStats; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.awaitility.Awaitility; -import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; -import java.util.List; -import java.util.concurrent.TimeUnit; /** * Transaction buffer close test. @@ -71,49 +67,62 @@ public class TransactionBufferCloseTest extends TransactionTestBase { @Test(timeOut = 10_000, dataProvider = "isPartition") public void deleteTopicCloseTransactionBufferTest(boolean isPartition) throws Exception { - int expectedCount = isPartition ? 30 : 1; - TopicName topicName = createAndLoadTopic(isPartition, expectedCount); - checkSnapshotPublisherCount(topicName.getNamespace(), expectedCount); + int partitionCount = isPartition ? 30 : 1; + List<TopicName> topicNames = createAndLoadTopics(isPartition, partitionCount); + String namespaceName = topicNames.get(0).getNamespace(); + checkSnapshotPublisherCount(namespaceName, 1); + + for (int i = 0; i < topicNames.size(); i++) { + deleteTopic(isPartition, topicNames.get(i)); + // When delete all topics of the namespace, the publisher count should be 0. + int expectCount = i == topicNames.size() - 1 ? 0 : 1; + checkSnapshotPublisherCount(namespaceName, expectCount); + } + } + + private void deleteTopic(boolean isPartition, TopicName topicName) throws PulsarAdminException { if (isPartition) { admin.topics().deletePartitionedTopic(topicName.getPartitionedTopicName(), true); } else { admin.topics().delete(topicName.getPartitionedTopicName(), true); } - checkSnapshotPublisherCount(topicName.getNamespace(), 0); } @Test(timeOut = 10_000, dataProvider = "isPartition") public void unloadTopicCloseTransactionBufferTest(boolean isPartition) throws Exception { - int expectedCount = isPartition ? 30 : 1; - TopicName topicName = createAndLoadTopic(isPartition, expectedCount); - checkSnapshotPublisherCount(topicName.getNamespace(), expectedCount); - admin.topics().unload(topicName.getPartitionedTopicName()); - checkSnapshotPublisherCount(topicName.getNamespace(), 0); + int partitionCount = isPartition ? 30 : 1; + List<TopicName> topicNames = createAndLoadTopics(isPartition, partitionCount); + String namespaceName = topicNames.get(0).getNamespace(); + checkSnapshotPublisherCount(namespaceName, 1); + + for (int i = 0; i < topicNames.size(); i++) { + admin.topics().unload(topicNames.get(i).getPartitionedTopicName()); + // When unload all topics of the namespace, the publisher count should be 0. + int expectCount = i == topicNames.size() - 1 ? 0 : 1; + checkSnapshotPublisherCount(namespaceName, expectCount); + } } - private TopicName createAndLoadTopic(boolean isPartition, int partitionCount) + private List<TopicName> createAndLoadTopics(boolean isPartition, int partitionCount) throws PulsarAdminException, PulsarClientException { String namespace = TENANT + "/ns-" + RandomStringUtils.randomAlphabetic(5); admin.namespaces().createNamespace(namespace, 3); - String topic = namespace + "/tb-close-test-"; - if (isPartition) { - admin.topics().createPartitionedTopic(topic, partitionCount); - } - pulsarClient.newProducer() - .topic(topic) - .sendTimeout(0, TimeUnit.SECONDS) - .create() - .close(); - return TopicName.get(topic); - } + String topic = namespace + "/tb-close-test"; + List<TopicName> topics = new ArrayList<>(); - private void checkSnapshotPublisherCount(String namespace, int expectCount) throws PulsarAdminException { - TopicName snTopicName = TopicName.get(TopicDomain.persistent.value(), NamespaceName.get(namespace), - SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT); - List<PublisherStats> publisherStatsList = - (List<PublisherStats>) admin.topics() - .getStats(snTopicName.getPartitionedTopicName()).getPublishers(); - Assert.assertEquals(publisherStatsList.size(), expectCount); + for (int i = 0; i < 2; i++) { + String t = topic + "-" + i; + if (isPartition) { + admin.topics().createPartitionedTopic(t, partitionCount); + } + pulsarClient.newProducer() + .topic(t) + .sendTimeout(0, TimeUnit.SECONDS) + .create() + .close(); + topics.add(TopicName.get(t)); + } + return topics; } }