This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 1fd1b2b [pulsar-broker] avoid backpressure by skipping dispatching if
consumer channel is not writable (#6740)
1fd1b2b is described below
commit 1fd1b2b440af2477f916999a67752f9f532d1620
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Wed Apr 15 16:00:14 2020 -0700
[pulsar-broker] avoid backpressure by skipping dispatching if consumer
channel is not writable (#6740)
### Motivation
Recently we are seeing broker is crashing with OutOfMemory when it has
higher dispatch rate with large size messages. High message rate out saturates
network and broker will try to write on the channel which is not writable which
buffers the message and eventually broker sees OOM and shutdown with below
error:
```
java.lang.OutOfMemoryError: Direct buffer memory
at java.nio.Bits.reserveMemory(Bits.java:175) ~[?:?]
at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:118)
~[?:?]
at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:317) ~[?:?]
at
io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:769)
~[netty-all-4.1.32.Final.jar:4.1.32.Final]
at
io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:745)
~[netty-all-4.1.32.Final.jar:4.1.32.Final]
at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:244)
~[netty-all-4.1.32.Final.jar:4.1.32.Final]
at io.netty.buffer.PoolArena.allocate(PoolArena.java:226)
~[netty-all-4.1.32.Final.jar:4.1.32.Final]
at io.netty.buffer.PoolArena.allocate(PoolArena.java:146)
~[netty-all-4.1.32.Final.jar:4.1.32.Final]
at
io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:324)
~[netty-all-4.1.32.Final.jar:4.1.32.Final]
at
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:185)
~[netty-all-4.1.32.Final.jar:4.1.32.Final]
at
org.apache.bookkeeper.common.allocator.impl.ByteBufAllocatorImpl.newDirectBuffer(ByteBufAllocatorImpl.java:164)
~[bookkeeper-common-allocator-4.9.4.2-yahoo.jar:4.9.4.2-yahoo]
at
org.apache.bookkeeper.common.allocator.impl.ByteBufAllocatorImpl.newDirectBuffer(ByteBufAllocatorImpl.java:158)
~[bookkeeper-common-allocator-4.9.4.2-yahoo.jar:4.9.4.2-yahoo]
at
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:185)
~[netty-all-4.1.32.Final.jar:4.1.32.Final]
at
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:176)
~[netty-all-4.1.32.Final.jar:4.1.32.Final]
at io.netty.handler.ssl.SslHandler.allocate(SslHandler.java:1912)
~[netty-all-4.1.32.Final.jar:4.1.32.Final]
at
io.netty.handler.ssl.SslHandler.allocateOutNetBuf(SslHandler.java:1923)
~[netty-all-4.1.32.Final.jar:4.1.32.Final]
at io.netty.handler.ssl.SslHandler.wrap(SslHandler.java:826)
~[netty-all-4.1.32.Final.jar:4.1.32.Final]
at
io.netty.handler.ssl.SslHandler.wrapAndFlush(SslHandler.java:797)
~[netty-all-4.1.32.Final.jar:4.1.32.Final]
at io.netty.handler.ssl.SslHandler.flush(SslHandler.java:778)
~[netty-all-4.1.32.Final.jar:4.1.32.Final]
at
io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
~[netty-all-4.1.32.Final.jar:4.1.32.Final]
at
io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
~[netty-all-4.1.32.Final.jar:4.1.32.Final]
at
io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
~[netty-all-4.1.32.Final.jar:4.1.32.Final]
at
io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115)
~[netty-all-4.1.32.Final.jar:4.1.32.Final]
at
io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
~[netty-all-4.1.32.Final.jar:4.1.32.Final]
at
io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:802)
~[netty-all-4.1.32.Final.jar:4.1.32.Final]
at
io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:814)
~[netty-all-4.1.32.Final.jar:4.1.32.Final]
at
io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:794)
~[netty-all-4.1.32.Final.jar:4.1.32.Final]
at
org.apache.pulsar.broker.service.Consumer.lambda$sendMessages$51(Consumer.java:265)
~[pulsar-broker-2.4.6-yahoo.jar:2.4.6-yahoo]
at
io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
[netty-all-4.1.32.Final.jar:4.1.32.Final]
at
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
[netty-all-4.1.32.Final.jar:4.1.32.Final]
at
io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:335)
[netty-all-4.1.32.Final.jar:4.1.32.Final]
at
io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)
[netty-all-4.1.32.Final.jar:4.1.32.Final]
at
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
[netty-all-4.1.32.Final.jar:4.1.32.Final]
at java.lang.Thread.run(Thread.java:834) [?:?]
```
### Modification
In order to reduce backpressure, broker should slow down dispatching if
consumer cnx-channel is writable. Broker does it for replicator and Exclusive
consumer but not doing for shared consumer. So, add similar check for shared
subscription to avoid OOM for high dispatch rate. It might be helpful for #5896
as well.
---
.../PersistentDispatcherMultipleConsumers.java | 27 +++++++++++++++++++++-
...istentStickyKeyDispatcherMultipleConsumers.java | 7 +++++-
2 files changed, 32 insertions(+), 2 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 4c9aef1..74d131f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -254,6 +254,14 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
if (totalAvailablePermits > 0 && isAtleastOneConsumerAvailable()) {
int messagesToRead = Math.min(totalAvailablePermits,
readBatchSize);
+ if (!isConsumerWritable()) {
+ // If the connection is not currently writable, we issue the
read request anyway, but for a single
+ // message. The intent here is to keep use the request as a
notification mechanism while avoiding to
+ // read and dispatch a big batch of messages which will need
to wait before getting written to the
+ // socket.
+ messagesToRead = 1;
+ }
+
// throttle only if: (1) cursor is not active (or flag for
throttle-nonBacklogConsumer is enabled) bcz
// active-cursor reads message from cache rather from bookkeeper
(2) if topic has reached message-rate
// threshold: then schedule the read after MESSAGE_RATE_BACKOFF_MS
@@ -476,8 +484,13 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
}
// round-robin dispatch batch size for this consumer
+ int availablePermits = c.isWritable() ? c.getAvailablePermits() :
1;
+ if (log.isDebugEnabled() && !c.isWritable()) {
+ log.debug("[{}-{}] consumer is not writable. dispatching only
1 message to {} ", topic.getName(), name,
+ c);
+ }
int messagesForC = Math.min(
- Math.min(entriesToDispatch, c.getAvailablePermits()),
+ Math.min(entriesToDispatch, availablePermits),
serviceConfig.getDispatcherMaxRoundRobinBatchSize());
if (messagesForC > 0) {
@@ -614,6 +627,18 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
return false;
}
+ private boolean isConsumerWritable() {
+ for (Consumer consumer : consumerList) {
+ if (consumer.isWritable()) {
+ return true;
+ }
+ }
+ if (log.isDebugEnabled()) {
+ log.debug("[{}-{}] consumer is not writable", topic.getName(),
name);
+ }
+ return false;
+ }
+
@Override
public boolean isConsumerAvailable(Consumer consumer) {
return consumer != null && !consumer.isBlocked() &&
consumer.getAvailablePermits() > 0;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 248f45a..c651782 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -92,7 +92,12 @@ public class PersistentStickyKeyDispatcherMultipleConsumers
extends PersistentDi
return;
}
- int messagesForC = Math.min(entriesWithSameKey.getValue().size(),
consumer.getAvailablePermits());
+ int availablePermits = consumer.isWritable() ?
consumer.getAvailablePermits() : 1;
+ if (log.isDebugEnabled() && !consumer.isWritable()) {
+ log.debug("[{}-{}] consumer is not writable. dispatching only
1 message to {} ", topic.getName(), name,
+ consumer);
+ }
+ int messagesForC = Math.min(entriesWithSameKey.getValue().size(),
availablePermits);
if (log.isDebugEnabled()) {
log.debug("[{}] select consumer {} for key {} with messages
num {}, read type is {}",
name, consumer.consumerName(),
entriesWithSameKey.getKey(), messagesForC, readType);