This is an automated email from the ASF dual-hosted git repository.
zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 3f779a4 Reduce memory used in ClientCnx for pending lookups (#4104)
3f779a4 is described below
commit 3f779a401ab72d39150d3df4958660b58ac0e59c
Author: Matteo Merli <[email protected]>
AuthorDate: Mon Apr 22 22:01:14 2019 -0700
Reduce memory used in ClientCnx for pending lookups (#4104)
### Motivation
Currently, each `ClientCnx` has a blocking queue for the max number of
pending lookup requests. By default that ends up using an array of 45K objects.
When a single process handles many connections (eg: broker or proxy), that will
end up using 200K per connection just for this.
### Modifications
Instead of using the fixed array size, use semaphore and a
`ConcurrentLinkedQueue`
* Reduce memory used in ClientCnx for pending lookups
* Fixed semaphore size
---
.../org/apache/pulsar/client/impl/ClientCnx.java | 41 ++++++++++++----------
1 file changed, 23 insertions(+), 18 deletions(-)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 2921777..1a88afd 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -21,12 +21,24 @@ package org.apache.pulsar.client.impl;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Queues;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.unix.Errors.NativeIoException;
+import io.netty.handler.ssl.SslHandler;
+import io.netty.util.concurrent.Promise;
+
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.ClosedChannelException;
import java.util.List;
import java.util.Optional;
-import java.util.concurrent.BlockingQueue;
+import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ScheduledFuture;
@@ -73,18 +85,6 @@ import
org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Queues;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelHandler;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.unix.Errors.NativeIoException;
-import io.netty.handler.ssl.SslHandler;
-import io.netty.util.concurrent.Promise;
-
public class ClientCnx extends PulsarHandler {
protected final Authentication authentication;
@@ -95,7 +95,7 @@ public class ClientCnx extends PulsarHandler {
private final ConcurrentLongHashMap<CompletableFuture<LookupDataResult>>
pendingLookupRequests =
new ConcurrentLongHashMap<>(16, 1);
// LookupRequests that waiting in client side.
- private final BlockingQueue<Pair<Long, Pair<ByteBuf,
CompletableFuture<LookupDataResult>>>> waitingLookupRequests;
+ private final Queue<Pair<Long, Pair<ByteBuf,
CompletableFuture<LookupDataResult>>>> waitingLookupRequests;
private final ConcurrentLongHashMap<CompletableFuture<MessageIdData>>
pendingGetLastMessageIdRequests =
new ConcurrentLongHashMap<>(16, 1);
private final ConcurrentLongHashMap<CompletableFuture<List<String>>>
pendingGetTopicsRequests =
@@ -110,6 +110,7 @@ public class ClientCnx extends PulsarHandler {
private final CompletableFuture<Void> connectionFuture = new
CompletableFuture<Void>();
private final ConcurrentLinkedQueue<RequestTime> requestTimeoutQueue = new
ConcurrentLinkedQueue<>();
private final Semaphore pendingLookupRequestSemaphore;
+ private final Semaphore maxLookupRequestSemaphore;
private final EventLoopGroup eventLoopGroup;
private static final AtomicIntegerFieldUpdater<ClientCnx>
NUMBER_OF_REJECTED_REQUESTS_UPDATER = AtomicIntegerFieldUpdater
@@ -154,9 +155,9 @@ public class ClientCnx extends PulsarHandler {
public ClientCnx(ClientConfigurationData conf, EventLoopGroup
eventLoopGroup, int protocolVersion) {
super(conf.getKeepAliveIntervalSeconds(), TimeUnit.SECONDS);
checkArgument(conf.getMaxLookupRequest() >
conf.getConcurrentLookupRequest());
- this.pendingLookupRequestSemaphore = new
Semaphore(conf.getConcurrentLookupRequest(), true);
- this.waitingLookupRequests = Queues
- .newArrayBlockingQueue((conf.getMaxLookupRequest() -
conf.getConcurrentLookupRequest()));
+ this.pendingLookupRequestSemaphore = new
Semaphore(conf.getConcurrentLookupRequest(), false);
+ this.maxLookupRequestSemaphore = new
Semaphore(conf.getMaxLookupRequest() - conf.getConcurrentLookupRequest(),
false);
+ this.waitingLookupRequests = Queues.newConcurrentLinkedQueue();
this.authentication = conf.getAuthentication();
this.eventLoopGroup = eventLoopGroup;
this.maxNumberOfRejectedRequestPerConnection =
conf.getMaxNumberOfRejectedRequestPerConnection();
@@ -520,6 +521,7 @@ public class ClientCnx extends PulsarHandler {
if (result != null) {
Pair<Long, Pair<ByteBuf, CompletableFuture<LookupDataResult>>>
firstOneWaiting = waitingLookupRequests.poll();
if (firstOneWaiting != null) {
+ maxLookupRequestSemaphore.release();
// schedule a new lookup in.
eventLoopGroup.submit(() -> {
long newId = firstOneWaiting.getLeft();
@@ -629,7 +631,10 @@ public class ClientCnx extends PulsarHandler {
if (log.isDebugEnabled()) {
log.debug("{} Failed to add lookup-request into pending
queue", requestId);
}
- if (!waitingLookupRequests.offer(Pair.of(requestId,
Pair.of(request, future)))) {
+
+ if (maxLookupRequestSemaphore.tryAcquire()) {
+ waitingLookupRequests.add(Pair.of(requestId, Pair.of(request,
future)));
+ } else {
if (log.isDebugEnabled()) {
log.debug("{} Failed to add lookup-request into waiting
queue", requestId);
}