[ https://issues.apache.org/jira/browse/ARTEMIS-1025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15957044#comment-15957044 ]
ASF GitHub Bot commented on ARTEMIS-1025: ----------------------------------------- Github user clebertsuconic commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/1119#discussion_r109950463 --- Diff: artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java --- @@ -211,143 +251,203 @@ public void run() { @Override public ActiveMQBuffer createTransportBuffer(final int size) { - return new ChannelBufferWrapper(PooledByteBufAllocator.DEFAULT.directBuffer(size), true); + try { + return new ChannelBufferWrapper(channel.alloc().directBuffer(size), true); + } catch (OutOfMemoryError oom) { + if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) { + final long totalPendingWriteBytes = batchBufferSize(this.channel, this.writeBufferHighWaterMark); + ActiveMQClientLogger.LOGGER.trace("pendingWrites: [NETTY] -> " + totalPendingWriteBytes + "[EVENT LOOP] -> " + pendingWritesOnEventLoopView.get() + " causes: " + oom.getMessage(), oom); + } + throw oom; + } } @Override - public Object getID() { + public final Object getID() { // TODO: Think of it return channel.hashCode(); } // This is called periodically to flush the batch buffer @Override - public void checkFlushBatchBuffer() { - if (!batchingEnabled) { - return; - } - - if (writeLock.tryAcquire()) { - try { - if (batchBuffer != null && batchBuffer.readable()) { - channel.writeAndFlush(batchBuffer.byteBuf()); - - batchBuffer = createTransportBuffer(BATCHING_BUFFER_SIZE); - } - } finally { - writeLock.release(); + public final void checkFlushBatchBuffer() { + if (this.batchingEnabled) { + //perform the flush only if necessary + final int batchBufferSize = batchBufferSize(this.channel, this.writeBufferHighWaterMark); + if (batchBufferSize > 0) { + this.channel.flush(); } } } @Override - public void write(final ActiveMQBuffer buffer) { + public final void write(final ActiveMQBuffer buffer) { write(buffer, false, false); } @Override - public void write(ActiveMQBuffer buffer, final boolean flush, final boolean batched) { + public final void write(ActiveMQBuffer buffer, final boolean flush, final boolean batched) { write(buffer, flush, batched, null); } @Override - public void write(ActiveMQBuffer buffer, - final boolean flush, - final boolean batched, - final ChannelFutureListener futureListener) { - - try { - writeLock.acquire(); - - try { - if (batchBuffer == null && batchingEnabled && batched && !flush) { - // Lazily create batch buffer - - batchBuffer = ActiveMQBuffers.dynamicBuffer(BATCHING_BUFFER_SIZE); - } - - if (batchBuffer != null) { - batchBuffer.writeBytes(buffer, 0, buffer.writerIndex()); - - if (batchBuffer.writerIndex() >= BATCHING_BUFFER_SIZE || !batched || flush) { - // If the batch buffer is full or it's flush param or not batched then flush the buffer - - buffer = batchBuffer; - } else { - return; - } - - if (!batched || flush) { - batchBuffer = null; - } else { - // Create a new buffer + public final boolean blockUntilWritable(final int requiredCapacity, final long timeout, final TimeUnit timeUnit) { + final boolean isAllowedToBlock = isAllowedToBlock(); + if (!isAllowedToBlock) { + return canWrite(requiredCapacity); + } else { + final long timeoutNanos = timeUnit.toNanos(timeout); + final long deadline = System.nanoTime() + timeoutNanos; + //choose wait time unit size + final long parkNanos; + //if is requested to wait more than a millisecond than we could use + if (timeoutNanos >= 1_000_000L) { + parkNanos = 100_000L; + } else { + //reduce it doesn't make sense, only a spin loop could be enough precise with the most OS + parkNanos = 1000L; + } + boolean canWrite; + while (!(canWrite = canWrite(requiredCapacity)) && System.nanoTime() < deadline) { + LockSupport.parkNanos(parkNanos); + } + return canWrite; + } + } - batchBuffer = ActiveMQBuffers.dynamicBuffer(BATCHING_BUFFER_SIZE); - } - } + private boolean isAllowedToBlock() { + final EventLoop eventLoop = channel.eventLoop(); + final boolean inEventLoop = eventLoop.inEventLoop(); --- End diff -- My concern is when you are in the evenLoop for another connection Acceptor receives data, replication sends to backup.. blocks... main receiver blocks.! > OutOfDirectMemoryError raised from Netty > ---------------------------------------- > > Key: ARTEMIS-1025 > URL: https://issues.apache.org/jira/browse/ARTEMIS-1025 > Project: ActiveMQ Artemis > Issue Type: Bug > Components: Broker > Reporter: Francesco Nigro > Assignee: Francesco Nigro > > If you send and receive a lot of messages in short time to Artemis via Netty > connector, the OutOfDirectMemoryError exception is thrown from the client. -- This message was sent by Atlassian JIRA (v6.3.15#6346)