This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 3f779a4  Reduce memory used in ClientCnx for pending lookups (#4104)
3f779a4 is described below

commit 3f779a401ab72d39150d3df4958660b58ac0e59c
Author: Matteo Merli <[email protected]>
AuthorDate: Mon Apr 22 22:01:14 2019 -0700

    Reduce memory used in ClientCnx for pending lookups (#4104)
    
    ### Motivation
    
    Currently, each `ClientCnx` has a blocking queue for the max number of 
pending lookup requests. By default that ends up using an array of 45K objects. 
When a single process handles many connections (eg: broker or proxy), that will 
end up using 200K per connection just for this.
    
    ### Modifications
    
    Instead of using the fixed array size, use semaphore and a 
`ConcurrentLinkedQueue`
    
    * Reduce memory used in ClientCnx for pending lookups
    
    * Fixed semaphore size
---
 .../org/apache/pulsar/client/impl/ClientCnx.java   | 41 ++++++++++++----------
 1 file changed, 23 insertions(+), 18 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 2921777..1a88afd 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -21,12 +21,24 @@ package org.apache.pulsar.client.impl;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Queues;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.unix.Errors.NativeIoException;
+import io.netty.handler.ssl.SslHandler;
+import io.netty.util.concurrent.Promise;
+
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.nio.channels.ClosedChannelException;
 import java.util.List;
 import java.util.Optional;
-import java.util.concurrent.BlockingQueue;
+import java.util.Queue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ScheduledFuture;
@@ -73,18 +85,6 @@ import 
org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Queues;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelHandler;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.unix.Errors.NativeIoException;
-import io.netty.handler.ssl.SslHandler;
-import io.netty.util.concurrent.Promise;
-
 public class ClientCnx extends PulsarHandler {
 
     protected final Authentication authentication;
@@ -95,7 +95,7 @@ public class ClientCnx extends PulsarHandler {
     private final ConcurrentLongHashMap<CompletableFuture<LookupDataResult>> 
pendingLookupRequests =
         new ConcurrentLongHashMap<>(16, 1);
     // LookupRequests that waiting in client side.
-    private final BlockingQueue<Pair<Long, Pair<ByteBuf, 
CompletableFuture<LookupDataResult>>>> waitingLookupRequests;
+    private final Queue<Pair<Long, Pair<ByteBuf, 
CompletableFuture<LookupDataResult>>>> waitingLookupRequests;
     private final ConcurrentLongHashMap<CompletableFuture<MessageIdData>> 
pendingGetLastMessageIdRequests =
         new ConcurrentLongHashMap<>(16, 1);
     private final ConcurrentLongHashMap<CompletableFuture<List<String>>> 
pendingGetTopicsRequests =
@@ -110,6 +110,7 @@ public class ClientCnx extends PulsarHandler {
     private final CompletableFuture<Void> connectionFuture = new 
CompletableFuture<Void>();
     private final ConcurrentLinkedQueue<RequestTime> requestTimeoutQueue = new 
ConcurrentLinkedQueue<>();
     private final Semaphore pendingLookupRequestSemaphore;
+    private final Semaphore maxLookupRequestSemaphore;
     private final EventLoopGroup eventLoopGroup;
 
     private static final AtomicIntegerFieldUpdater<ClientCnx> 
NUMBER_OF_REJECTED_REQUESTS_UPDATER = AtomicIntegerFieldUpdater
@@ -154,9 +155,9 @@ public class ClientCnx extends PulsarHandler {
     public ClientCnx(ClientConfigurationData conf, EventLoopGroup 
eventLoopGroup, int protocolVersion) {
         super(conf.getKeepAliveIntervalSeconds(), TimeUnit.SECONDS);
         checkArgument(conf.getMaxLookupRequest() > 
conf.getConcurrentLookupRequest());
-        this.pendingLookupRequestSemaphore = new 
Semaphore(conf.getConcurrentLookupRequest(), true);
-        this.waitingLookupRequests = Queues
-            .newArrayBlockingQueue((conf.getMaxLookupRequest() - 
conf.getConcurrentLookupRequest()));
+        this.pendingLookupRequestSemaphore = new 
Semaphore(conf.getConcurrentLookupRequest(), false);
+        this.maxLookupRequestSemaphore = new 
Semaphore(conf.getMaxLookupRequest() - conf.getConcurrentLookupRequest(), 
false);
+        this.waitingLookupRequests = Queues.newConcurrentLinkedQueue();
         this.authentication = conf.getAuthentication();
         this.eventLoopGroup = eventLoopGroup;
         this.maxNumberOfRejectedRequestPerConnection = 
conf.getMaxNumberOfRejectedRequestPerConnection();
@@ -520,6 +521,7 @@ public class ClientCnx extends PulsarHandler {
         if (result != null) {
             Pair<Long, Pair<ByteBuf, CompletableFuture<LookupDataResult>>> 
firstOneWaiting = waitingLookupRequests.poll();
             if (firstOneWaiting != null) {
+                maxLookupRequestSemaphore.release();
                 // schedule a new lookup in.
                 eventLoopGroup.submit(() -> {
                     long newId = firstOneWaiting.getLeft();
@@ -629,7 +631,10 @@ public class ClientCnx extends PulsarHandler {
             if (log.isDebugEnabled()) {
                 log.debug("{} Failed to add lookup-request into pending 
queue", requestId);
             }
-            if (!waitingLookupRequests.offer(Pair.of(requestId, 
Pair.of(request, future)))) {
+
+            if (maxLookupRequestSemaphore.tryAcquire()) {
+                waitingLookupRequests.add(Pair.of(requestId, Pair.of(request, 
future)));
+            } else {
                 if (log.isDebugEnabled()) {
                     log.debug("{} Failed to add lookup-request into waiting 
queue", requestId);
                 }

Reply via email to