wy96f commented on a change in pull request #2832: ARTEMIS-2482 Large messages could leak native ByteBuffers URL: https://github.com/apache/activemq-artemis/pull/2832#discussion_r323071598
########## File path: artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java ########## @@ -832,46 +835,117 @@ public void stopReplication() { } } + static final int LARGE_MESSAGE_CHUNK_SIZE = 100 * 1024; + + private void addBytesUsingTempNativeBuffer(final SequentialFile file, final ActiveMQBuffer bytes) throws Exception { + assert file instanceof NIOSequentialFile; + //we can't use the actual content of it as it is and need to perform a copy into a direct ByteBuffer + int readableBytes = bytes.readableBytes(); + final int requiredCapacity = Math.min(LARGE_MESSAGE_CHUNK_SIZE, readableBytes); + final ByteBuf tempBuffer = PooledByteBufAllocator.DEFAULT.directBuffer(requiredCapacity, requiredCapacity); + try { + int readerIndex = bytes.readerIndex(); + while (readableBytes > 0) { + final int size = Math.min(readableBytes, LARGE_MESSAGE_CHUNK_SIZE); + final ByteBuffer nioBytes = tempBuffer.internalNioBuffer(0, size); + final int position = nioBytes.position(); + bytes.getBytes(readerIndex, nioBytes); + nioBytes.position(position); + file.blockingWriteDirect(nioBytes, false, false); + readerIndex += size; + readableBytes -= size; + } + } finally { + tempBuffer.release(); + } + } + public final void addBytesToLargeMessage(final SequentialFile file, final long messageId, final ActiveMQBuffer bytes) throws Exception { readLock(); try { file.position(file.size()); if (bytes.byteBuf() != null && bytes.byteBuf().nioBufferCount() == 1) { - final ByteBuffer nioBytes = bytes.byteBuf().internalNioBuffer(bytes.readerIndex(), bytes.readableBytes()); - file.blockingWriteDirect(nioBytes, false, false); - - if (isReplicated()) { - //copy defensively bytes - final byte[] bytesCopy = new byte[bytes.readableBytes()]; - bytes.getBytes(bytes.readerIndex(), bytesCopy); - replicator.largeMessageWrite(messageId, bytesCopy); + //NIO -> need direct ByteBuffers, while JDBC the opposite + if (file instanceof NIOSequentialFile) { + if (bytes.byteBuf().isDirect()) { + final ByteBuffer nioBytes = bytes.byteBuf().internalNioBuffer(bytes.readerIndex(), bytes.readableBytes()); Review comment: Do we need to judge nioBufferCount() == 1 first? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services