This is an automated email from the ASF dual-hosted git repository.
bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new d9211c97776 [improve][java-client] Replace ScheduledExecutor to
improve performance of message consumption (#16236)
d9211c97776 is described below
commit d9211c97776fe61baddb4d9b3c4049dfd37b48c7
Author: lipenghui <[email protected]>
AuthorDate: Tue Jun 28 11:29:32 2022 +0800
[improve][java-client] Replace ScheduledExecutor to improve performance of
message consumption (#16236)
The Scheduled Executor doesn't work very efficiently because each task will
add to a DelayedQueue(A priority queue) first even if using the `.execute()`
method without any schedule delay.
<img width="1845" alt="image"
src="https://user-images.githubusercontent.com/12592133/175871343-ecda138f-43a2-472e-ac42-8efdefb58810.png">
<img width="1848" alt="image"
src="https://user-images.githubusercontent.com/12592133/175871415-3d8d9fbd-f140-4a4b-a78d-306c1ec9673c.png">
Profile result:
[perf_consumer_0.html.txt](https://github.com/apache/pulsar/files/8989093/perf_consumer_0.html.txt)
Running a performance test for single topic max message read rate test:
```
bin/pulsar-perf consume test -q 1000000 -p 100000000
bin/pulsar-perf produce test -r 1000000 -s 1 -mk random -o 10000 -threads 2
```
Without this PR (2.10.1):
```
Profiling started
2022-06-27T13:44:01,183+0800 [main] INFO
org.apache.pulsar.testclient.PerformanceConsumer - Throughput received:
23919664 msg --- 265702.851 msg/s --- 2.027 Mbit/s --- Latency: mean:
49430.572 ms - med: 49406 - 95pct: 52853 - 99pct: 53024 - 99.9pct: 53053 -
99.99pct: 53056 - Max: 53057
2022-06-27T13:44:11,196+0800 [main] INFO
org.apache.pulsar.testclient.PerformanceConsumer - Throughput received:
26690802 msg --- 276759.125 msg/s --- 2.112 Mbit/s --- Latency: mean:
56106.186 ms - med: 56000 - 95pct: 59289 - 99pct: 59985 - 99.9pct: 60037 -
99.99pct: 60042 - Max: 60042
2022-06-27T13:44:21,216+0800 [main] INFO
org.apache.pulsar.testclient.PerformanceConsumer - Throughput received:
28788693 msg --- 209467.861 msg/s --- 1.598 Mbit/s --- Latency: mean:
63523.143 ms - med: 63580 - 95pct: 67202 - 99pct: 67523 - 99.9pct: 67547 -
99.99pct: 67548 - Max: 67548
2022-06-27T13:44:31,233+0800 [main] INFO
org.apache.pulsar.testclient.PerformanceConsumer - Throughput received:
31255365 msg --- 246190.932 msg/s --- 1.878 Mbit/s --- Latency: mean:
71152.370 ms - med: 71058 - 95pct: 74555 - 99pct: 74806 - 99.9pct: 74842 -
99.99pct: 74847 - Max: 74847
2022-06-27T13:44:41,247+0800 [main] INFO
org.apache.pulsar.testclient.PerformanceConsumer - Throughput received:
33606630 msg --- 234769.313 msg/s --- 1.791 Mbit/s --- Latency: mean:
78636.478 ms - med: 78724 - 95pct: 81694 - 99pct: 82090 - 99.9pct: 82279 -
99.99pct: 82285 - Max: 82286
```
With this PR:
```
Profiling started
2022-06-27T13:56:20,426+0800 [main] INFO
org.apache.pulsar.testclient.PerformanceConsumer - Throughput received:
431272207 msg --- 1079360.516 msg/s --- 8.235 Mbit/s --- Latency: mean:
272.645 ms - med: 334 - 95pct: 470 - 99pct: 510 - 99.9pct: 522 - 99.99pct: 523
- Max: 524
2022-06-27T13:56:30,438+0800 [main] INFO
org.apache.pulsar.testclient.PerformanceConsumer - Throughput received:
441292346 msg --- 1000645.852 msg/s --- 7.634 Mbit/s --- Latency: mean:
15.512 ms - med: 14 - 95pct: 29 - 99pct: 39 - 99.9pct: 54 - 99.99pct: 55 - Max:
55
2022-06-27T13:56:40,450+0800 [main] INFO
org.apache.pulsar.testclient.PerformanceConsumer - Throughput received:
451303308 msg --- 999973.040 msg/s --- 7.629 Mbit/s --- Latency: mean: 18.265
ms - med: 14 - 95pct: 53 - 99pct: 98 - 99.9pct: 174 - 99.99pct: 176 - Max: 177
2022-06-27T13:56:50,462+0800 [main] INFO
org.apache.pulsar.testclient.PerformanceConsumer - Throughput received:
461308082 msg --- 999309.458 msg/s --- 7.624 Mbit/s --- Latency: mean: 14.728
ms - med: 14 - 95pct: 18 - 99pct: 41 - 99.9pct: 50 - 99.99pct: 51 - Max: 52
2022-06-27T13:57:00,475+0800 [main] INFO
org.apache.pulsar.testclient.PerformanceConsumer - Throughput received:
471327606 msg --- 1000738.584 msg/s --- 7.635 Mbit/s --- Latency: mean:
21.291 ms - med: 16 - 95pct: 52 - 99pct: 61 - 99.9pct: 65 - 99.99pct: 66 - Max:
66
```
Profile result with this PR:
[perf_consumer_1.html.txt](https://github.com/apache/pulsar/files/8989095/perf_consumer_1.html.txt)
- Change internal executor and external executor to normal executor service
- Added a new ScheduledExecutorProvider to handle the scheduled tasks.
(cherry picked from commit 96237a9615fefa2bed247b416bf1a12d8bc4b201)
---
.../transaction/pendingack/PendingAckStore.java | 4 +-
.../pendingack/impl/InMemoryPendingAckStore.java | 4 +-
.../pendingack/impl/MLPendingAckStore.java | 4 +-
.../pendingack/impl/PendingAckHandleImpl.java | 4 +-
.../persistent/PersistentSubscriptionTest.java | 4 +-
.../pulsar/client/api/MultiTopicsConsumerTest.java | 2 +-
.../apache/pulsar/client/impl/ConsumerBase.java | 9 ++---
.../apache/pulsar/client/impl/ConsumerImpl.java | 13 +++---
.../client/impl/MultiTopicsConsumerImpl.java | 47 +++++++++++-----------
.../pulsar/client/impl/PulsarClientImpl.java | 12 +++++-
.../pulsar/client/util/ExecutorProvider.java | 10 +++--
.../client/util/ScheduledExecutorProvider.java | 36 +++++++++++++++++
12 files changed, 99 insertions(+), 50 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckStore.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckStore.java
index 3da676eb827..2f85d2430db 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckStore.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckStore.java
@@ -20,7 +20,7 @@ package org.apache.pulsar.broker.transaction.pendingack;
import java.util.List;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ExecutorService;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.tuple.MutablePair;
@@ -38,7 +38,7 @@ public interface PendingAckStore {
* @param pendingAckHandle the handle of pending ack
* @param executorService the replay executor service
*/
- void replayAsync(PendingAckHandleImpl pendingAckHandle,
ScheduledExecutorService executorService);
+ void replayAsync(PendingAckHandleImpl pendingAckHandle, ExecutorService
executorService);
/**
* Close the transaction pending ack store.
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/InMemoryPendingAckStore.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/InMemoryPendingAckStore.java
index d882c80c478..44c9fbe039b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/InMemoryPendingAckStore.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/InMemoryPendingAckStore.java
@@ -20,7 +20,7 @@ package org.apache.pulsar.broker.transaction.pendingack.impl;
import java.util.List;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ExecutorService;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.pulsar.broker.transaction.pendingack.PendingAckStore;
@@ -33,7 +33,7 @@ import org.apache.pulsar.common.api.proto.CommandAck.AckType;
public class InMemoryPendingAckStore implements PendingAckStore {
@Override
- public void replayAsync(PendingAckHandleImpl pendingAckHandle,
ScheduledExecutorService scheduledExecutorService) {
+ public void replayAsync(PendingAckHandleImpl pendingAckHandle,
ExecutorService scheduledExecutorService) {
pendingAckHandle.changeToReadyState();
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
index e6d16fb7eae..af4e664b1e3 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
@@ -26,7 +26,7 @@ import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
@@ -110,7 +110,7 @@ public class MLPendingAckStore implements PendingAckStore {
}
@Override
- public void replayAsync(PendingAckHandleImpl pendingAckHandle,
ScheduledExecutorService transactionReplayExecutor) {
+ public void replayAsync(PendingAckHandleImpl pendingAckHandle,
ExecutorService transactionReplayExecutor) {
transactionReplayExecutor
.execute(new PendingAckReplay(new
MLPendingAckReplyCallBack(pendingAckHandle)));
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
index 5b808f1dedb..41ef25b3e4d 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
@@ -33,7 +33,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@@ -156,8 +155,7 @@ public class PendingAckHandleImpl extends
PendingAckHandleState implements Pendi
this.pendingAckStoreFuture =
pendingAckStoreProvider.newPendingAckStore(persistentSubscription);
this.pendingAckStoreFuture.thenAccept(pendingAckStore -> {
- pendingAckStore.replayAsync(this,
- (ScheduledExecutorService) internalPinnedExecutor);
+ pendingAckStore.replayAsync(this, internalPinnedExecutor);
}).exceptionally(e -> {
acceptQueue.clear();
changeToErrorState();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
index b9304cb5fb8..946f90a1ddd 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
@@ -40,7 +40,7 @@ import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ExecutorService;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedLedger;
@@ -127,7 +127,7 @@ public class PersistentSubscriptionTest {
public CompletableFuture<PendingAckStore>
newPendingAckStore(PersistentSubscription subscription) {
return CompletableFuture.completedFuture(new PendingAckStore()
{
@Override
- public void replayAsync(PendingAckHandleImpl
pendingAckHandle, ScheduledExecutorService executorService) {
+ public void replayAsync(PendingAckHandleImpl
pendingAckHandle, ExecutorService executorService) {
try {
Field field =
PendingAckHandleState.class.getDeclaredField("state");
field.setAccessible(true);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java
index d8c8bd657f8..29ecb39853a 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java
@@ -72,7 +72,7 @@ public class MultiTopicsConsumerTest extends
ProducerConsumerBase {
return new PulsarClientImpl(conf) {
{
ScheduledExecutorService internalExecutorService =
- (ScheduledExecutorService)
super.getInternalExecutorService();
+ (ScheduledExecutorService)
super.getScheduledExecutorProvider().getExecutor();
internalExecutorServiceDelegate =
mock(ScheduledExecutorService.class,
// a spy isn't used since that doesn't work for
private classes, instead
// the mock delegatesTo an existing instance. A
delegate is sufficient for verifying
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 71fb2d62756..c53d49ad4bd 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -32,7 +32,6 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.locks.Lock;
@@ -69,8 +68,8 @@ public abstract class ConsumerBase<T> extends HandlerState
implements Consumer<T
protected final MessageListener<T> listener;
protected final ConsumerEventListener consumerEventListener;
protected final ExecutorProvider executorProvider;
- protected final ScheduledExecutorService externalPinnedExecutor;
- protected final ScheduledExecutorService internalPinnedExecutor;
+ protected final ExecutorService externalPinnedExecutor;
+ protected final ExecutorService internalPinnedExecutor;
final BlockingQueue<Message<T>> incomingMessages;
protected ConcurrentOpenHashMap<MessageIdImpl, MessageIdImpl[]>
unAckedChunkedMessageIdSequenceMap;
protected final ConcurrentLinkedQueue<CompletableFuture<Message<T>>>
pendingReceives;
@@ -102,8 +101,8 @@ public abstract class ConsumerBase<T> extends HandlerState
implements Consumer<T
this.unAckedChunkedMessageIdSequenceMap =
ConcurrentOpenHashMap.<MessageIdImpl,
MessageIdImpl[]>newBuilder().build();
this.executorProvider = executorProvider;
- this.externalPinnedExecutor = (ScheduledExecutorService)
executorProvider.getExecutor();
- this.internalPinnedExecutor = (ScheduledExecutorService)
client.getInternalExecutorService();
+ this.externalPinnedExecutor = executorProvider.getExecutor();
+ this.internalPinnedExecutor = client.getInternalExecutorService();
this.pendingReceives = Queues.newConcurrentLinkedQueue();
this.pendingBatchReceives = Queues.newConcurrentLinkedQueue();
this.schema = schema;
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 3bcf95e7813..1a185d4c17d 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -48,6 +48,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -1267,10 +1268,12 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
// Lazy task scheduling to expire incomplete chunk message
if (!expireChunkMessageTaskScheduled &&
expireTimeOfIncompleteChunkedMessageMillis > 0) {
- internalPinnedExecutor
-
.scheduleAtFixedRate(catchingAndLoggingThrowables(this::removeExpireIncompleteChunkedMessages),
- expireTimeOfIncompleteChunkedMessageMillis,
expireTimeOfIncompleteChunkedMessageMillis,
- TimeUnit.MILLISECONDS);
+ ((ScheduledExecutorService)
client.getScheduledExecutorProvider().getExecutor()).scheduleAtFixedRate(
+ () -> internalPinnedExecutor
+
.execute(catchingAndLoggingThrowables(this::removeExpireIncompleteChunkedMessages)),
+ expireTimeOfIncompleteChunkedMessageMillis,
expireTimeOfIncompleteChunkedMessageMillis,
+ TimeUnit.MILLISECONDS
+ );
expireChunkMessageTaskScheduled = true;
}
@@ -2236,7 +2239,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
return;
}
- internalPinnedExecutor.schedule(() -> {
+ ((ScheduledExecutorService)
client.getScheduledExecutorProvider().getExecutor()).schedule(() -> {
log.warn("[{}] [{}] Could not get connection while
getLastMessageId -- Will try again in {} ms",
topic, getHandlerName(), nextDelay);
remainingTime.addAndGet(-nextDelay);
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 71fef6f83f0..2c3d8cb03a6 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -24,28 +24,6 @@ import com.google.common.collect.ImmutableMap.Builder;
import com.google.common.collect.Lists;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.ConsumerStats;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.Messages;
-import org.apache.pulsar.client.api.PulsarClientException;
-import
org.apache.pulsar.client.api.PulsarClientException.NotSupportedException;
-import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
-import org.apache.pulsar.client.impl.transaction.TransactionImpl;
-import org.apache.pulsar.client.util.ConsumerName;
-import org.apache.pulsar.client.util.ExecutorProvider;
-import org.apache.pulsar.common.api.proto.CommandAck.AckType;
-import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
-import org.apache.pulsar.common.util.CompletableFutureCancellationHandler;
-import org.apache.pulsar.common.util.FutureUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -61,6 +39,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -69,6 +48,27 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerStats;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Messages;
+import org.apache.pulsar.client.api.PulsarClientException;
+import
org.apache.pulsar.client.api.PulsarClientException.NotSupportedException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.apache.pulsar.client.impl.transaction.TransactionImpl;
+import org.apache.pulsar.client.util.ConsumerName;
+import org.apache.pulsar.client.util.ExecutorProvider;
+import org.apache.pulsar.common.api.proto.CommandAck.AckType;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.common.util.CompletableFutureCancellationHandler;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
@@ -280,7 +280,8 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
return null;
}
log.error("Receive operation failed on consumer {} - Retrying
later", consumer, ex);
- internalPinnedExecutor.schedule(() ->
receiveMessageFromConsumer(consumer), 10, TimeUnit.SECONDS);
+ ((ScheduledExecutorService) client.getScheduledExecutorProvider())
+ .schedule(() -> receiveMessageFromConsumer(consumer), 10,
TimeUnit.SECONDS);
return null;
});
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 3f14558a7ed..9a4bada3278 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -75,6 +75,7 @@ import
org.apache.pulsar.client.impl.schema.generic.MultiVersionSchemaInfoProvid
import org.apache.pulsar.client.impl.transaction.TransactionBuilderImpl;
import
org.apache.pulsar.client.impl.transaction.TransactionCoordinatorClientImpl;
import org.apache.pulsar.client.util.ExecutorProvider;
+import org.apache.pulsar.client.util.ScheduledExecutorProvider;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
@@ -99,6 +100,8 @@ public class PulsarClientImpl implements PulsarClient {
private boolean needStopTimer;
private final ExecutorProvider externalExecutorProvider;
private final ExecutorProvider internalExecutorProvider;
+
+ private final ScheduledExecutorProvider scheduledExecutorProvider;
private final boolean createdEventLoopGroup;
private final boolean createdCnxPool;
@@ -184,6 +187,8 @@ public class PulsarClientImpl implements PulsarClient {
new ExecutorProvider(conf.getNumListenerThreads(),
"pulsar-external-listener");
this.internalExecutorProvider = internalExecutorProvider != null ?
internalExecutorProvider :
new ExecutorProvider(conf.getNumIoThreads(),
"pulsar-client-internal");
+ this.scheduledExecutorProvider = new
ScheduledExecutorProvider(conf.getNumIoThreads(),
+ "pulsar-client-scheduled");
if (conf.getServiceUrl().startsWith("http")) {
lookup = new HttpLookupService(conf, this.eventLoopGroup);
} else {
@@ -949,7 +954,7 @@ public class PulsarClientImpl implements PulsarClient {
}
previousExceptions.add(e);
- ((ScheduledExecutorService)
externalExecutorProvider.getExecutor()).schedule(() -> {
+ ((ScheduledExecutorService)
scheduledExecutorProvider.getExecutor()).schedule(() -> {
log.warn("[topic: {}] Could not get connection while
getPartitionedTopicMetadata -- "
+ "Will try again in {} ms", topicName, nextDelay);
remainingTime.addAndGet(-nextDelay);
@@ -1071,6 +1076,11 @@ public class PulsarClientImpl implements PulsarClient {
public ExecutorService getInternalExecutorService() {
return internalExecutorProvider.getExecutor();
}
+
+ public ScheduledExecutorProvider getScheduledExecutorProvider() {
+ return scheduledExecutorProvider;
+ }
+
//
// Transaction related API
//
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java
index 1318d5665ae..db11358057f 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java
@@ -28,7 +28,6 @@ import org.apache.pulsar.common.util.Murmur3_32Hash;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -43,7 +42,7 @@ public class ExecutorProvider {
private final String poolName;
private volatile boolean isShutdown;
- private static class ExtendedThreadFactory extends DefaultThreadFactory {
+ protected static class ExtendedThreadFactory extends DefaultThreadFactory {
@Getter
private Thread thread;
@@ -58,7 +57,6 @@ public class ExecutorProvider {
}
}
-
public ExecutorProvider(int numThreads, String poolName) {
checkArgument(numThreads > 0);
this.numThreads = numThreads;
@@ -67,13 +65,17 @@ public class ExecutorProvider {
for (int i = 0; i < numThreads; i++) {
ExtendedThreadFactory threadFactory = new ExtendedThreadFactory(
poolName, Thread.currentThread().isDaemon());
- ScheduledExecutorService executor =
Executors.newSingleThreadScheduledExecutor(threadFactory);
+ ExecutorService executor = createExecutor(threadFactory);
executors.add(Pair.of(executor, threadFactory));
}
isShutdown = false;
this.poolName = poolName;
}
+ protected ExecutorService createExecutor(ExtendedThreadFactory
threadFactory) {
+ return Executors.newSingleThreadExecutor(threadFactory);
+ }
+
public ExecutorService getExecutor() {
return executors.get((currentThread.getAndIncrement() &
Integer.MAX_VALUE) % numThreads).getKey();
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/util/ScheduledExecutorProvider.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/util/ScheduledExecutorProvider.java
new file mode 100644
index 00000000000..887ae3bb7ff
--- /dev/null
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/util/ScheduledExecutorProvider.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.util;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class ScheduledExecutorProvider extends ExecutorProvider {
+
+ public ScheduledExecutorProvider(int numThreads, String poolName) {
+ super(numThreads, poolName);
+ }
+
+ @Override
+ protected ExecutorService createExecutor(ExtendedThreadFactory
threadFactory) {
+ return Executors.newSingleThreadScheduledExecutor(threadFactory);
+ }
+}