This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new b9446aa01d4 [fix] [tx] Transaction buffer recover blocked by readNext
(#18833)
b9446aa01d4 is described below
commit b9446aa01d4c22a170e305cb4a8fbd2966eaff74
Author: fengyubiao <[email protected]>
AuthorDate: Sat Dec 17 09:22:36 2022 +0800
[fix] [tx] Transaction buffer recover blocked by readNext (#18833)
---
.../SingleSnapshotAbortedTxnProcessorImpl.java | 25 +++++--
.../TopicTransactionBufferRecoverTest.java | 81 ++++++++++++++++++++++
2 files changed, 101 insertions(+), 5 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java
index a13dd0499a6..f8d0d323912 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java
@@ -21,11 +21,14 @@ package org.apache.pulsar.broker.transaction.buffer.impl;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.collections4.map.LinkedMap;
+import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
@@ -33,6 +36,7 @@ import
org.apache.pulsar.broker.transaction.buffer.metadata.AbortTxnMetadata;
import
org.apache.pulsar.broker.transaction.buffer.metadata.TransactionBufferSnapshot;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
@@ -78,16 +82,21 @@ public class SingleSnapshotAbortedTxnProcessorImpl
implements AbortedTxnProcesso
return aborts.containsKey(txnID);
}
+ private long getSystemClientOperationTimeoutMs() throws Exception {
+ PulsarClientImpl pulsarClient = (PulsarClientImpl)
topic.getBrokerService().getPulsar().getClient();
+ return pulsarClient.getConfiguration().getOperationTimeoutMs();
+ }
@Override
public CompletableFuture<PositionImpl> recoverFromSnapshot() {
return
topic.getBrokerService().getPulsar().getTransactionBufferSnapshotServiceFactory()
.getTxnBufferSnapshotService()
.createReader(TopicName.get(topic.getName())).thenComposeAsync(reader -> {
- PositionImpl startReadCursorPosition = null;
try {
+ PositionImpl startReadCursorPosition = null;
while (reader.hasMoreEvents()) {
- Message<TransactionBufferSnapshot> message =
reader.readNext();
+ Message<TransactionBufferSnapshot> message =
reader.readNextAsync()
+ .get(getSystemClientOperationTimeoutMs(),
TimeUnit.MILLISECONDS);
if (topic.getName().equals(message.getKey())) {
TransactionBufferSnapshot
transactionBufferSnapshot = message.getValue();
if (transactionBufferSnapshot != null) {
@@ -98,15 +107,21 @@ public class SingleSnapshotAbortedTxnProcessorImpl
implements AbortedTxnProcesso
}
}
}
- closeReader(reader);
return
CompletableFuture.completedFuture(startReadCursorPosition);
+ } catch (TimeoutException ex) {
+ Throwable t = FutureUtil.unwrapCompletionException(ex);
+ String errorMessage = String.format("[%s] Transaction
buffer recover fail by read "
+ + "transactionBufferSnapshot timeout!",
topic.getName());
+ log.error(errorMessage, t);
+ return FutureUtil.failedFuture(
+ new
BrokerServiceException.ServiceUnitNotReadyException(errorMessage, t));
} catch (Exception ex) {
log.error("[{}] Transaction buffer recover fail when
read "
+ "transactionBufferSnapshot!",
topic.getName(), ex);
- closeReader(reader);
return FutureUtil.failedFuture(ex);
+ } finally {
+ closeReader(reader);
}
-
},
topic.getBrokerService().getPulsar().getTransactionExecutorProvider()
.getExecutor(this));
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
index 39c324d92f3..a2b72fc458d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
@@ -20,9 +20,12 @@ package org.apache.pulsar.broker.transaction;
import static
org.apache.pulsar.common.naming.SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
@@ -35,8 +38,10 @@ import java.util.LinkedList;
import java.util.List;
import java.util.NavigableMap;
import java.util.Optional;
+import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
@@ -104,6 +109,7 @@ public class TopicTransactionBufferRecoverTest extends
TransactionTestBase {
private static final int NUM_PARTITIONS = 16;
@BeforeMethod
protected void setup() throws Exception {
+ conf.getProperties().setProperty("brokerClient_operationTimeoutMs",
Integer.valueOf(10 * 1000).toString());
setUpBase(1, NUM_PARTITIONS, RECOVER_COMMIT, 0);
admin.topics().createNonPartitionedTopic(RECOVER_ABORT);
admin.topics().createNonPartitionedTopic(TAKE_SNAPSHOT);
@@ -248,6 +254,81 @@ public class TopicTransactionBufferRecoverTest extends
TransactionTestBase {
}
+ private void makeTBSnapshotReaderTimeoutIfFirstRead(TopicName topicName)
throws Exception {
+ SystemTopicClient.Reader mockReader =
mock(SystemTopicClient.Reader.class);
+ AtomicBoolean isFirstCallOfMethodHasMoreEvents = new AtomicBoolean();
+ AtomicBoolean isFirstCallOfMethodHasReadNext = new AtomicBoolean();
+ AtomicBoolean isFirstCallOfMethodHasReadNextAsync = new
AtomicBoolean();
+
+ doAnswer(invocation -> {
+ if (isFirstCallOfMethodHasMoreEvents.compareAndSet(false,true)){
+ return true;
+ } else {
+ return false;
+ }
+ }).when(mockReader).hasMoreEvents();
+
+ doAnswer(invocation -> {
+ if (isFirstCallOfMethodHasReadNext.compareAndSet(false, true)){
+ // Just stuck the thread.
+ Thread.sleep(3600 * 1000);
+ }
+ return null;
+ }).when(mockReader).readNext();
+
+ doAnswer(invocation -> {
+ CompletableFuture<Message> future = new CompletableFuture<>();
+ new Thread(() -> {
+ if (isFirstCallOfMethodHasReadNextAsync.compareAndSet(false,
true)){
+ // Just stuck the thread.
+ try {
+ Thread.sleep(3600 * 1000);
+ } catch (InterruptedException e) {
+ }
+ future.complete(null);
+ } else {
+ future.complete(null);
+ }
+ }).start();
+ return future;
+ }).when(mockReader).readNextAsync();
+
+
when(mockReader.closeAsync()).thenReturn(CompletableFuture.completedFuture(null));
+
+ for (PulsarService pulsarService : pulsarServiceList){
+ // Init prop: lastMessageIdInBroker.
+ final SystemTopicTxnBufferSnapshotService tbSnapshotService =
+
pulsarService.getTransactionBufferSnapshotServiceFactory().getTxnBufferSnapshotService();
+ SystemTopicTxnBufferSnapshotService spyTbSnapshotService =
spy(tbSnapshotService);
+ doAnswer(invocation ->
CompletableFuture.completedFuture(mockReader))
+ .when(spyTbSnapshotService).createReader(topicName);
+ Field field =
+
TransactionBufferSnapshotServiceFactory.class.getDeclaredField("txnBufferSnapshotService");
+ field.setAccessible(true);
+
field.set(pulsarService.getTransactionBufferSnapshotServiceFactory(),
spyTbSnapshotService);
+ }
+ }
+
+ @Test(timeOut = 60 * 1000)
+ public void testTBRecoverCanRetryIfTimeoutRead() throws Exception {
+ String topicName = String.format("persistent://%s/%s", NAMESPACE1,
+ "tx_recover_" + UUID.randomUUID().toString().replaceAll("-",
"_"));
+
+ // Make race condition of "getLastMessageId" and "compaction" to make
recover can't complete.
+ makeTBSnapshotReaderTimeoutIfFirstRead(TopicName.get(topicName));
+ // Verify( Cmd-PRODUCER will wait for TB recover finished )
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .topic(topicName)
+ .sendTimeout(0, TimeUnit.SECONDS)
+ .enableBatching(false)
+ .batchingMaxMessages(2)
+ .create();
+
+ // cleanup.
+ producer.close();
+ admin.topics().delete(topicName, false);
+ }
+
private void testTakeSnapshot() throws Exception {
@Cleanup
Producer<String> producer = pulsarClient