[ https://issues.apache.org/jira/browse/ARTEMIS-1025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15953158#comment-15953158 ]
ASF GitHub Bot commented on ARTEMIS-1025: ----------------------------------------- Github user franz1981 commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/1119#discussion_r109368909 --- Diff: artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java --- @@ -92,69 +86,143 @@ public NettyConnection(final Map<String, Object> configuration, this.listener = listener; + this.directDeliver = directDeliver; + this.batchingEnabled = batchingEnabled; - this.directDeliver = directDeliver; + this.writeBufferHighWaterMark = this.channel.config().getWriteBufferHighWaterMark(); + + this.batchLimit = batchingEnabled ? Math.min(this.writeBufferHighWaterMark, DEFAULT_BATCH_BYTES) : 0; + } + + private static void waitFor(ChannelPromise promise, long millis) { + try { + final boolean completed = promise.await(millis); + if (!completed) { + ActiveMQClientLogger.LOGGER.timeoutFlushingPacket(); + } + } catch (InterruptedException e) { + throw new ActiveMQInterruptedException(e); + } + } + + /** + * Returns an estimation of the current size of the write buffer in the channel. + * To obtain a more precise value is necessary to use the unsafe API of the channel to + * call the {@link io.netty.channel.ChannelOutboundBuffer#totalPendingWriteBytes()}. + * Anyway, both these values are subject to concurrent modifications. + */ + private static int batchBufferSize(Channel channel, int writeBufferHighWaterMark) { + //Channel::bytesBeforeUnwritable is performing a volatile load + //this is the reason why writeBufferHighWaterMark is passed as an argument + final int bytesBeforeUnwritable = (int) channel.bytesBeforeUnwritable(); + assert bytesBeforeUnwritable >= 0; + final int writtenBytes = writeBufferHighWaterMark - bytesBeforeUnwritable; + assert writtenBytes >= 0; + return writtenBytes; + } + + /** + * When batching is not enabled, it tries to back-pressure the caller thread. + * The back-pressure provided is not before the writeAndFlush request, buf after it: too many threads that are not + * using {@link Channel#isWritable} to know when push unbatched data will risk to cause OOM due to the enqueue of each own {@link Channel#writeAndFlush} requests. + * Trying to provide back-pressure before the {@link Channel#writeAndFlush} request could work, but in certain scenarios it will block {@link Channel#isWritable} to be true. + */ + private static ChannelFuture backPressuredWriteAndFlush(final ByteBuf bytes, + final int readableBytes, + final Channel channel, + final ChannelPromise promise) { + final ChannelFuture future; + if (!channel.isWritable()) { + final ChannelPromise channelPromise = promise.isVoid() ? channel.newPromise() : promise; + future = channel.writeAndFlush(bytes, channelPromise); + //is the channel is not writable wait the current request to be flushed, providing backpressuring on the caller thread + if (!channel.isWritable() && !future.awaitUninterruptibly(DEFAULT_BACK_PRESSURE_WAIT_MILLIS)) { + if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) { --- End diff -- > on that case, we never block inside the NettyConnector... we verify isWritable().. and stop writing.. I'm not 100 % sure it could work: it is counterintuitive, but not all the pending writes are counted in the write buffer of Netty and having that buffer writable doesn't mean that you can't go OOM :( We'll talk about it tomorrow anyway :+1: > OutOfDirectMemoryError raised from Netty > ---------------------------------------- > > Key: ARTEMIS-1025 > URL: https://issues.apache.org/jira/browse/ARTEMIS-1025 > Project: ActiveMQ Artemis > Issue Type: Bug > Components: Broker > Reporter: Francesco Nigro > Assignee: Francesco Nigro > > If you send and receive a lot of messages in short time to Artemis via Netty > connector, the OutOfDirectMemoryError exception is thrown from the client. -- This message was sent by Atlassian JIRA (v6.3.15#6346)