[ 
https://issues.apache.org/jira/browse/ARTEMIS-1025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15953158#comment-15953158
 ] 

ASF GitHub Bot commented on ARTEMIS-1025:
-----------------------------------------

Github user franz1981 commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/1119#discussion_r109368909
  
    --- Diff: 
artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
 ---
    @@ -92,69 +86,143 @@ public NettyConnection(final Map<String, Object> 
configuration,
     
           this.listener = listener;
     
    +      this.directDeliver = directDeliver;
    +
           this.batchingEnabled = batchingEnabled;
     
    -      this.directDeliver = directDeliver;
    +      this.writeBufferHighWaterMark = 
this.channel.config().getWriteBufferHighWaterMark();
    +
    +      this.batchLimit = batchingEnabled ? 
Math.min(this.writeBufferHighWaterMark, DEFAULT_BATCH_BYTES) : 0;
    +   }
    +
    +   private static void waitFor(ChannelPromise promise, long millis) {
    +      try {
    +         final boolean completed = promise.await(millis);
    +         if (!completed) {
    +            ActiveMQClientLogger.LOGGER.timeoutFlushingPacket();
    +         }
    +      } catch (InterruptedException e) {
    +         throw new ActiveMQInterruptedException(e);
    +      }
    +   }
    +
    +   /**
    +    * Returns an estimation of the current size of the write buffer in the 
channel.
    +    * To obtain a more precise value is necessary to use the unsafe API of 
the channel to
    +    * call the {@link 
io.netty.channel.ChannelOutboundBuffer#totalPendingWriteBytes()}.
    +    * Anyway, both these values are subject to concurrent modifications.
    +    */
    +   private static int batchBufferSize(Channel channel, int 
writeBufferHighWaterMark) {
    +      //Channel::bytesBeforeUnwritable is performing a volatile load
    +      //this is the reason why writeBufferHighWaterMark is passed as an 
argument
    +      final int bytesBeforeUnwritable = (int) 
channel.bytesBeforeUnwritable();
    +      assert bytesBeforeUnwritable >= 0;
    +      final int writtenBytes = writeBufferHighWaterMark - 
bytesBeforeUnwritable;
    +      assert writtenBytes >= 0;
    +      return writtenBytes;
    +   }
    +
    +   /**
    +    * When batching is not enabled, it tries to back-pressure the caller 
thread.
    +    * The back-pressure provided is not before the writeAndFlush request, 
buf after it: too many threads that are not
    +    * using {@link Channel#isWritable} to know when push unbatched data 
will risk to cause OOM due to the enqueue of each own {@link 
Channel#writeAndFlush} requests.
    +    * Trying to provide back-pressure before the {@link 
Channel#writeAndFlush} request could work, but in certain scenarios it will 
block {@link Channel#isWritable} to be true.
    +    */
    +   private static ChannelFuture backPressuredWriteAndFlush(final ByteBuf 
bytes,
    +                                                           final int 
readableBytes,
    +                                                           final Channel 
channel,
    +                                                           final 
ChannelPromise promise) {
    +      final ChannelFuture future;
    +      if (!channel.isWritable()) {
    +         final ChannelPromise channelPromise = promise.isVoid() ? 
channel.newPromise() : promise;
    +         future = channel.writeAndFlush(bytes, channelPromise);
    +         //is the channel is not writable wait the current request to be 
flushed, providing backpressuring on the caller thread
    +         if (!channel.isWritable() && 
!future.awaitUninterruptibly(DEFAULT_BACK_PRESSURE_WAIT_MILLIS)) {
    +            if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
    --- End diff --
    
    > on that case, we never block inside the NettyConnector... we verify 
isWritable().. and stop writing..
    
    I'm not 100 % sure it could work: it is counterintuitive, but not all the 
pending writes are counted in the write buffer of Netty and having that buffer 
writable doesn't mean that you can't go OOM :(
    We'll talk about it tomorrow anyway :+1: 


> OutOfDirectMemoryError raised from Netty
> ----------------------------------------
>
>                 Key: ARTEMIS-1025
>                 URL: https://issues.apache.org/jira/browse/ARTEMIS-1025
>             Project: ActiveMQ Artemis
>          Issue Type: Bug
>          Components: Broker
>            Reporter: Francesco Nigro
>            Assignee: Francesco Nigro
>
> If you send and receive a lot of messages in short time to Artemis via Netty 
> connector, the OutOfDirectMemoryError exception is thrown from the client.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to