This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1fd1b2b  [pulsar-broker] avoid backpressure by skipping dispatching if 
consumer channel is not writable (#6740)
1fd1b2b is described below

commit 1fd1b2b440af2477f916999a67752f9f532d1620
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Wed Apr 15 16:00:14 2020 -0700

    [pulsar-broker] avoid backpressure by skipping dispatching if consumer 
channel is not writable (#6740)
    
    ### Motivation
    Recently we are seeing broker is crashing with OutOfMemory when it has 
higher dispatch rate with large size messages. High message rate out saturates 
network and broker will try to write on the channel which is not writable which 
buffers the message and eventually broker sees OOM and shutdown with below 
error:
    ```
    java.lang.OutOfMemoryError: Direct buffer memory
            at java.nio.Bits.reserveMemory(Bits.java:175) ~[?:?]
            at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:118) 
~[?:?]
            at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:317) ~[?:?]
            at 
io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:769) 
~[netty-all-4.1.32.Final.jar:4.1.32.Final]
            at 
io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:745) 
~[netty-all-4.1.32.Final.jar:4.1.32.Final]
            at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:244) 
~[netty-all-4.1.32.Final.jar:4.1.32.Final]
            at io.netty.buffer.PoolArena.allocate(PoolArena.java:226) 
~[netty-all-4.1.32.Final.jar:4.1.32.Final]
            at io.netty.buffer.PoolArena.allocate(PoolArena.java:146) 
~[netty-all-4.1.32.Final.jar:4.1.32.Final]
            at 
io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:324)
 ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
            at 
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:185)
 ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
            at 
org.apache.bookkeeper.common.allocator.impl.ByteBufAllocatorImpl.newDirectBuffer(ByteBufAllocatorImpl.java:164)
 ~[bookkeeper-common-allocator-4.9.4.2-yahoo.jar:4.9.4.2-yahoo]
            at 
org.apache.bookkeeper.common.allocator.impl.ByteBufAllocatorImpl.newDirectBuffer(ByteBufAllocatorImpl.java:158)
 ~[bookkeeper-common-allocator-4.9.4.2-yahoo.jar:4.9.4.2-yahoo]
            at 
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:185)
 ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
            at 
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:176)
 ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
            at io.netty.handler.ssl.SslHandler.allocate(SslHandler.java:1912) 
~[netty-all-4.1.32.Final.jar:4.1.32.Final]
            at 
io.netty.handler.ssl.SslHandler.allocateOutNetBuf(SslHandler.java:1923) 
~[netty-all-4.1.32.Final.jar:4.1.32.Final]
            at io.netty.handler.ssl.SslHandler.wrap(SslHandler.java:826) 
~[netty-all-4.1.32.Final.jar:4.1.32.Final]
            at 
io.netty.handler.ssl.SslHandler.wrapAndFlush(SslHandler.java:797) 
~[netty-all-4.1.32.Final.jar:4.1.32.Final]
            at io.netty.handler.ssl.SslHandler.flush(SslHandler.java:778) 
~[netty-all-4.1.32.Final.jar:4.1.32.Final]
            at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
 ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
            at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
 ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
            at 
io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
 ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
            at 
io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115)
 ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
            at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
 ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
            at 
io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:802)
 ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
            at 
io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:814)
 ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
            at 
io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:794)
 ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
            at 
org.apache.pulsar.broker.service.Consumer.lambda$sendMessages$51(Consumer.java:265)
 ~[pulsar-broker-2.4.6-yahoo.jar:2.4.6-yahoo]
            at 
io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
 [netty-all-4.1.32.Final.jar:4.1.32.Final]
            at 
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
 [netty-all-4.1.32.Final.jar:4.1.32.Final]
            at 
io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:335) 
[netty-all-4.1.32.Final.jar:4.1.32.Final]
            at 
io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)
 [netty-all-4.1.32.Final.jar:4.1.32.Final]
            at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
 [netty-all-4.1.32.Final.jar:4.1.32.Final]
            at java.lang.Thread.run(Thread.java:834) [?:?]
    ```
    
    ### Modification
    In order to reduce backpressure, broker should slow down dispatching if 
consumer cnx-channel is writable. Broker does it for replicator and Exclusive 
consumer but not doing for shared consumer. So, add similar check for shared 
subscription to avoid OOM for high dispatch rate. It might be helpful for #5896 
as well.
---
 .../PersistentDispatcherMultipleConsumers.java     | 27 +++++++++++++++++++++-
 ...istentStickyKeyDispatcherMultipleConsumers.java |  7 +++++-
 2 files changed, 32 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 4c9aef1..74d131f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -254,6 +254,14 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
         if (totalAvailablePermits > 0 && isAtleastOneConsumerAvailable()) {
             int messagesToRead = Math.min(totalAvailablePermits, 
readBatchSize);
 
+            if (!isConsumerWritable()) {
+                // If the connection is not currently writable, we issue the 
read request anyway, but for a single
+                // message. The intent here is to keep use the request as a 
notification mechanism while avoiding to
+                // read and dispatch a big batch of messages which will need 
to wait before getting written to the
+                // socket.
+                messagesToRead = 1;
+            }
+
             // throttle only if: (1) cursor is not active (or flag for 
throttle-nonBacklogConsumer is enabled) bcz
             // active-cursor reads message from cache rather from bookkeeper 
(2) if topic has reached message-rate
             // threshold: then schedule the read after MESSAGE_RATE_BACKOFF_MS
@@ -476,8 +484,13 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
             }
 
             // round-robin dispatch batch size for this consumer
+            int availablePermits = c.isWritable() ? c.getAvailablePermits() : 
1;
+            if (log.isDebugEnabled() && !c.isWritable()) {
+                log.debug("[{}-{}] consumer is not writable. dispatching only 
1 message to {} ", topic.getName(), name,
+                        c);
+            }
             int messagesForC = Math.min(
-                    Math.min(entriesToDispatch, c.getAvailablePermits()),
+                    Math.min(entriesToDispatch, availablePermits),
                     serviceConfig.getDispatcherMaxRoundRobinBatchSize());
 
             if (messagesForC > 0) {
@@ -614,6 +627,18 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
         return false;
     }
 
+    private boolean isConsumerWritable() {
+        for (Consumer consumer : consumerList) {
+            if (consumer.isWritable()) {
+                return true;
+            }
+        }
+        if (log.isDebugEnabled()) {
+            log.debug("[{}-{}] consumer is not writable", topic.getName(), 
name);
+        }
+        return false;
+    }
+
     @Override
     public boolean isConsumerAvailable(Consumer consumer) {
         return consumer != null && !consumer.isBlocked() && 
consumer.getAvailablePermits() > 0;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 248f45a..c651782 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -92,7 +92,12 @@ public class PersistentStickyKeyDispatcherMultipleConsumers 
extends PersistentDi
                 return;
             }
 
-            int messagesForC = Math.min(entriesWithSameKey.getValue().size(), 
consumer.getAvailablePermits());
+            int availablePermits = consumer.isWritable() ? 
consumer.getAvailablePermits() : 1;
+            if (log.isDebugEnabled() && !consumer.isWritable()) {
+                log.debug("[{}-{}] consumer is not writable. dispatching only 
1 message to {} ", topic.getName(), name,
+                        consumer);
+            }
+            int messagesForC = Math.min(entriesWithSameKey.getValue().size(), 
availablePermits);
             if (log.isDebugEnabled()) {
                 log.debug("[{}] select consumer {} for key {} with messages 
num {}, read type is {}",
                         name, consumer.consumerName(), 
entriesWithSameKey.getKey(), messagesForC, readType);

Reply via email to