codelipenghui opened a new issue #13792:
URL: https://github.com/apache/pulsar/issues/13792
**Describe the bug**
Client-side error log:
```
22:35:10.765
[pulsar-client-internal-4-1:org.apache.pulsar.client.impl.TransactionMetaStoreHandler@511]
ERROR org.apache.pulsar.client.impl.TransactionMetaStoreHandler - Got END_TXN
response for request 2036419640951504857 error UnknownError
```
Server side error log:
```
22:35:07.055 [pulsar-transaction-timer-43-1] ERROR
org.apache.pulsar.broker.service.ServerCnx - Send response error for END_TXN
request 2036419640951505362.
org.apache.pulsar.client.api.transaction.TransactionBufferClientException$RequestTimeoutException:
Transaction buffer request timeout.
at
org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferHandlerImpl.run(TransactionBufferHandlerImpl.java:240)
[io.streamnative-pulsar-broker-2.9.1.2.jar:2.9.1.2]
at
io.netty.util.HashedWheelTimer$HashedWheelTimeout.run(HashedWheelTimer.java:715)
[io.netty-netty-common-4.1.72.Final.jar:4.1.72.Final]
at
io.netty.util.concurrent.ImmediateExecutor.execute(ImmediateExecutor.java:34)
[io.netty-netty-common-4.1.72.Final.jar:4.1.72.Final]
at
io.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:703)
[io.netty-netty-common-4.1.72.Final.jar:4.1.72.Final]
at
io.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:790)
[io.netty-netty-common-4.1.72.Final.jar:4.1.72.Final]
at io.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:503)
[io.netty-netty-common-4.1.72.Final.jar:4.1.72.Final]
at
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
[io.netty-netty-common-4.1.72.Final.jar:4.1.72.Final]
at java.lang.Thread.run(Thread.java:829) [?:?]
```
**To Reproduce**
Steps to reproduce the behavior:
1. Run the following test code
2. Unload the topic `public/test/s_topic` during the test (maybe multiple
times)
3. See error
Test code:
```java
package io.streamnative.test;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class TransactionTest {
private static final Logger log =
LoggerFactory.getLogger(TransactionTest.class);
public static void main(String[] args) throws PulsarClientException,
InterruptedException {
final String sourceTopic = "public/test/s_topic";
final String targetTopic = "public/test/t_topic";
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://127.0.0.1:6650")
.enableTransaction(true)
.build();
Producer<String> producer = client.newProducer(Schema.STRING)
.topic(targetTopic)
.blockIfQueueFull(true)
.sendTimeout(0, TimeUnit.SECONDS)
.create();
Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic(sourceTopic)
.subscriptionName("sub")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscriptionType(SubscriptionType.Shared)
.subscribe();
final int totalTransactions = 1000000;
CountDownLatch latch = new CountDownLatch(totalTransactions);
log.info("Prepare data!");
Producer<String> sourceProducer = client.newProducer(Schema.STRING)
.topic(sourceTopic)
.sendTimeout(0, TimeUnit.SECONDS)
.maxPendingMessages(10)
.blockIfQueueFull(true)
.create();
new Thread(() -> {
for (int i = 0; i < totalTransactions; i++) {
sourceProducer.newMessage().value(i + "").sendAsync();
}
}).start();
log.info("Test started!");
new Thread(() -> {
for (int i = 0; i < totalTransactions; i++) {
try {
Message<String> msg = consumer.receive();
log.debug("Received message!");
CompletableFuture<Transaction> txn =
client.newTransaction().build();
txn.thenComposeAsync(transaction -> {
log.debug("Transaction opened!");
return
producer.newMessage(transaction).value(msg.getValue()).sendAsync()
.thenComposeAsync(messageId -> {
return
consumer.acknowledgeAsync(msg.getMessageId(), transaction);
}).thenAcceptAsync(__ -> {
transaction.commit().thenRun(() -> {
log.debug("Transaction committed!");
latch.countDown();
}).exceptionally(e -> {
latch.countDown();
return null;
});
}).exceptionally(e -> {
transaction.abort().thenRun(() -> {
log.debug("Transaction aborted!");
latch.countDown();
}).exceptionally(ex -> {
latch.countDown();
return null;
});
return null;
});
}).exceptionally(ex -> {
log.error("", ex);
return null;
});
} catch (PulsarClientException e) {
e.printStackTrace();
}
}
}).start();
latch.await();
log.info("Test Done!");
}
}
```
**Expected behavior**
- The client-side should not print the `UnknownError`
**Additional context**
broker logs:
[pulsar-broker-lipenghuideMacBook-Pro-2.local.log](https://github.com/apache/pulsar/files/7882610/pulsar-broker-lipenghuideMacBook-Pro-2.local.log)
[pulsar-broker-lipenghuideMacBook-Pro-2.local.log](https://github.com/apache/pulsar/files/7882616/pulsar-broker-lipenghuideMacBook-Pro-2.local.log)
[pulsar-broker-lipenghuideMacBook-Pro-2.local.log](https://github.com/apache/pulsar/files/7882618/pulsar-broker-lipenghuideMacBook-Pro-2.local.log)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]