[ 
https://issues.apache.org/jira/browse/ARTEMIS-1025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15952923#comment-15952923
 ] 

ASF GitHub Bot commented on ARTEMIS-1025:
-----------------------------------------

Github user clebertsuconic commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/1119#discussion_r109327492
  
    --- Diff: 
artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
 ---
    @@ -92,69 +86,143 @@ public NettyConnection(final Map<String, Object> 
configuration,
     
           this.listener = listener;
     
    +      this.directDeliver = directDeliver;
    +
           this.batchingEnabled = batchingEnabled;
     
    -      this.directDeliver = directDeliver;
    +      this.writeBufferHighWaterMark = 
this.channel.config().getWriteBufferHighWaterMark();
    +
    +      this.batchLimit = batchingEnabled ? 
Math.min(this.writeBufferHighWaterMark, DEFAULT_BATCH_BYTES) : 0;
    +   }
    +
    +   private static void waitFor(ChannelPromise promise, long millis) {
    +      try {
    +         final boolean completed = promise.await(millis);
    +         if (!completed) {
    +            ActiveMQClientLogger.LOGGER.timeoutFlushingPacket();
    +         }
    +      } catch (InterruptedException e) {
    +         throw new ActiveMQInterruptedException(e);
    +      }
    +   }
    +
    +   /**
    +    * Returns an estimation of the current size of the write buffer in the 
channel.
    +    * To obtain a more precise value is necessary to use the unsafe API of 
the channel to
    +    * call the {@link 
io.netty.channel.ChannelOutboundBuffer#totalPendingWriteBytes()}.
    +    * Anyway, both these values are subject to concurrent modifications.
    +    */
    +   private static int batchBufferSize(Channel channel, int 
writeBufferHighWaterMark) {
    +      //Channel::bytesBeforeUnwritable is performing a volatile load
    +      //this is the reason why writeBufferHighWaterMark is passed as an 
argument
    +      final int bytesBeforeUnwritable = (int) 
channel.bytesBeforeUnwritable();
    +      assert bytesBeforeUnwritable >= 0;
    +      final int writtenBytes = writeBufferHighWaterMark - 
bytesBeforeUnwritable;
    +      assert writtenBytes >= 0;
    +      return writtenBytes;
    +   }
    +
    +   /**
    +    * When batching is not enabled, it tries to back-pressure the caller 
thread.
    +    * The back-pressure provided is not before the writeAndFlush request, 
buf after it: too many threads that are not
    +    * using {@link Channel#isWritable} to know when push unbatched data 
will risk to cause OOM due to the enqueue of each own {@link 
Channel#writeAndFlush} requests.
    +    * Trying to provide back-pressure before the {@link 
Channel#writeAndFlush} request could work, but in certain scenarios it will 
block {@link Channel#isWritable} to be true.
    +    */
    +   private static ChannelFuture backPressuredWriteAndFlush(final ByteBuf 
bytes,
    +                                                           final int 
readableBytes,
    +                                                           final Channel 
channel,
    +                                                           final 
ChannelPromise promise) {
    +      final ChannelFuture future;
    +      if (!channel.isWritable()) {
    +         final ChannelPromise channelPromise = promise.isVoid() ? 
channel.newPromise() : promise;
    +         future = channel.writeAndFlush(bytes, channelPromise);
    +         //is the channel is not writable wait the current request to be 
flushed, providing backpressuring on the caller thread
    +         if (!channel.isWritable() && 
!future.awaitUninterruptibly(DEFAULT_BACK_PRESSURE_WAIT_MILLIS)) {
    +            if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
    --- End diff --
    
    >> but I'm worried it could lead to a storm of users complaining that while 
sending 1 EB <<
    
    *We* are the users on this case...
    
    if an user is sending an huge message, it's out job to break it up into 
smaller packets...
    
    on that case, we never block inside the NettyConnector... we verify 
isWritable().. and stop writing...
    
    I don't think we need to be very specific about the capacity... we just put 
the bytes and resume whatever was happening before.
    



> OutOfDirectMemoryError raised from Netty
> ----------------------------------------
>
>                 Key: ARTEMIS-1025
>                 URL: https://issues.apache.org/jira/browse/ARTEMIS-1025
>             Project: ActiveMQ Artemis
>          Issue Type: Bug
>          Components: Broker
>            Reporter: Francesco Nigro
>            Assignee: Francesco Nigro
>
> If you send and receive a lot of messages in short time to Artemis via Netty 
> connector, the OutOfDirectMemoryError exception is thrown from the client.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to