Copilot commented on code in PR #6323: URL: https://github.com/apache/ignite-3/pull/6323#discussion_r2231173415
########## modules/network/src/main/java/org/apache/ignite/internal/network/netty/OutboundEncoder.java: ########## @@ -48,11 +52,17 @@ public class OutboundEncoder extends MessageToMessageEncoder<OutNetworkObject> { private static final int IO_BUFFER_CAPACITY = 16 * 1024; + /** Max number of messages in a single chink. */ Review Comment: Typo in comment: 'chink' should be 'chunk'. ```suggestion /** Max number of messages in a single chunk. */ ``` ########## modules/network/src/main/java/org/apache/ignite/internal/network/netty/OutboundEncoder.java: ########## @@ -140,12 +246,24 @@ private NetworkMessageChunkedInput( } this.serializer = serializationService.createMessageSerializer(msg.groupType(), msg.messageType()); - this.writer = writer; + } + + private void cleanupMessage() { + // Best effort fast cleanup. Won't work if array is concurrently reallocated. + // Non-volatile write is fine, no one will read this value anymore. + this.state.get().messages[currentMessageIndex] = null; Review Comment: Race condition: accessing state.get().messages without proper synchronization could lead to accessing a stale or reallocated array. Consider using a local variable to capture the current state before accessing the messages array. ```suggestion ChunkState currentState = this.state.get(); currentState.messages[currentMessageIndex] = null; ``` ########## modules/network/src/main/java/org/apache/ignite/internal/network/netty/OutboundEncoder.java: ########## @@ -76,29 +96,108 @@ protected void encode(ChannelHandlerContext ctx, OutNetworkObject msg, List<Obje writerAttr.set(writer); } - out.add(new NetworkMessageChunkedInput(msg, serializationService, writer)); + return writer; + } + + /** + * Adds new message to the latest chunk if that's possible. Appends it to output list if it's not possible. + */ + private void appendMessage(OutNetworkObject msg, List<Object> out, Channel channel, MessageWriter writer) { + Attribute<NetworkMessageChunkedInput> chunkAttr = channel.attr(CHUNK_KEY); + NetworkMessageChunkedInput chunkedInput = chunkAttr.get(); + + if (chunkedInput == null) { + appendNewChunkedInput(msg, out, writer, chunkAttr); + } else { + while (true) { + ChunkState curState = chunkedInput.state.get(); + + if (curState.finished || curState.size >= MAX_MESSAGES_IN_CHUNK) { + appendNewChunkedInput(msg, out, writer, chunkAttr); + + return; + } else if (chunkedInput.state.compareAndSet(curState, curState.append(msg))) { + return; + } + } + } + } + + private void appendNewChunkedInput( + OutNetworkObject msg, + List<Object> out, + MessageWriter writer, + Attribute<NetworkMessageChunkedInput> chunkAttr + ) { + NetworkMessageChunkedInput chunkedInput = new NetworkMessageChunkedInput(msg, serializationService, writer); + + chunkAttr.set(chunkedInput); + out.add(chunkedInput); + } + + private static final class ChunkState { + final OutNetworkObject[] messages; + final int size; + final boolean finished; + + private ChunkState(OutNetworkObject[] messages, int size, boolean finished) { + this.messages = messages; + this.size = size; + this.finished = finished; + } + + static ChunkState newState(OutNetworkObject msg) { + OutNetworkObject[] messages = new OutNetworkObject[8]; + messages[0] = msg; + + return new ChunkState(messages, 1, false); + } + + ChunkState append(OutNetworkObject msg) { + assert size < MAX_MESSAGES_IN_CHUNK : "ChunkState size should be less than " + MAX_MESSAGES_IN_CHUNK + ", but was " + size; + if (size < messages.length) { + messages[size] = msg; + + return new ChunkState(messages, size + 1, false); Review Comment: The existing messages array is being mutated and then shared with a new ChunkState instance. This creates a race condition where concurrent reads of the old state could see the modified array. The array should be copied to ensure immutability. ```suggestion return new ChunkState(Arrays.copyOf(messages, messages.length), size + 1, false); ``` ########## modules/network/src/main/java/org/apache/ignite/internal/network/netty/OutboundEncoder.java: ########## @@ -170,34 +288,57 @@ public ByteBuf readChunk(ByteBufAllocator allocator) { writer.setBuffer(byteBuffer); - while (byteBuffer.hasRemaining()) { + writeMessages(); + + buffer.writerIndex(byteBuffer.position() - initialPosition); + + // Do not hold a reference, might help GC to do its job better. + writer.setBuffer(EMPTY_BYTE_BUFFER); + + return buffer; + } + + /** + * Tries to write as many messages from {@link #state} as possible, until either buffer exhaustion or messages exhaustion. + */ + private void writeMessages() { + while (true) { if (!descriptorsFinished) { descriptorsFinished = descriptorSerializer.writeMessage(descriptors, writer); - if (descriptorsFinished) { - for (ClassDescriptorMessage classDescriptorMessage : descriptors.messages()) { - serializationService.addSentDescriptor(classDescriptorMessage.descriptorId()); - } - writer.reset(); - } else { - break; + if (!descriptorsFinished) { + return; } - } else { - finished = serializer.writeMessage(msg, writer); - if (finished) { - writer.reset(); + for (ClassDescriptorMessage classDescriptorMessage : descriptors.messages()) { + serializationService.addSentDescriptor(classDescriptorMessage.descriptorId()); } - break; + writer.reset(); } - } - buffer.writerIndex(byteBuffer.position() - initialPosition); + boolean messageFinished = serializer.writeMessage(msg, writer); + if (!messageFinished) { + return; + } - // Do not hold a reference, might help GC to do its job better. - writer.setBuffer(EMPTY_BYTE_BUFFER); + writer.reset(); - return buffer; + cleanupMessage(); + currentMessageIndex++; + + // Message is successfully written, now we must check if there's another message in our chunk. + while (true) { + ChunkState curState = state.get(); + + if (currentMessageIndex < curState.size) { + prepareMessage(curState.messages[currentMessageIndex]); + + break; Review Comment: Race condition: accessing curState.messages[currentMessageIndex] without bounds checking could cause ArrayIndexOutOfBoundsException if the array was reallocated between the size check and access. Consider adding bounds validation. ```suggestion // Validate bounds before accessing the array if (currentMessageIndex >= 0 && currentMessageIndex < curState.messages.length) { prepareMessage(curState.messages[currentMessageIndex]); break; } else { throw new ArrayIndexOutOfBoundsException("Invalid currentMessageIndex: " + currentMessageIndex); } ``` ########## modules/network/src/main/java/org/apache/ignite/internal/network/netty/OutboundEncoder.java: ########## @@ -76,29 +96,108 @@ protected void encode(ChannelHandlerContext ctx, OutNetworkObject msg, List<Obje writerAttr.set(writer); } - out.add(new NetworkMessageChunkedInput(msg, serializationService, writer)); + return writer; + } + + /** + * Adds new message to the latest chunk if that's possible. Appends it to output list if it's not possible. + */ + private void appendMessage(OutNetworkObject msg, List<Object> out, Channel channel, MessageWriter writer) { + Attribute<NetworkMessageChunkedInput> chunkAttr = channel.attr(CHUNK_KEY); + NetworkMessageChunkedInput chunkedInput = chunkAttr.get(); + + if (chunkedInput == null) { + appendNewChunkedInput(msg, out, writer, chunkAttr); + } else { + while (true) { + ChunkState curState = chunkedInput.state.get(); + + if (curState.finished || curState.size >= MAX_MESSAGES_IN_CHUNK) { + appendNewChunkedInput(msg, out, writer, chunkAttr); + + return; + } else if (chunkedInput.state.compareAndSet(curState, curState.append(msg))) { + return; + } + } + } + } + + private void appendNewChunkedInput( + OutNetworkObject msg, + List<Object> out, + MessageWriter writer, + Attribute<NetworkMessageChunkedInput> chunkAttr + ) { + NetworkMessageChunkedInput chunkedInput = new NetworkMessageChunkedInput(msg, serializationService, writer); + + chunkAttr.set(chunkedInput); + out.add(chunkedInput); + } + + private static final class ChunkState { + final OutNetworkObject[] messages; + final int size; + final boolean finished; + + private ChunkState(OutNetworkObject[] messages, int size, boolean finished) { + this.messages = messages; + this.size = size; + this.finished = finished; + } + + static ChunkState newState(OutNetworkObject msg) { + OutNetworkObject[] messages = new OutNetworkObject[8]; + messages[0] = msg; + + return new ChunkState(messages, 1, false); + } + + ChunkState append(OutNetworkObject msg) { + assert size < MAX_MESSAGES_IN_CHUNK : "ChunkState size should be less than " + MAX_MESSAGES_IN_CHUNK + ", but was " + size; + if (size < messages.length) { + messages[size] = msg; + + return new ChunkState(messages, size + 1, false); + } else { + OutNetworkObject[] newMessages = Arrays.copyOf(messages, Math.min(MAX_MESSAGES_IN_CHUNK, size + (size >> 1))); + newMessages[size] = msg; + + return new ChunkState(newMessages, size + 1, false); + } + } + + ChunkState finish() { + return new ChunkState(messages, size, true); Review Comment: The finish() method shares the same messages array reference between the old and new ChunkState instances. This violates immutability and could lead to concurrent modification issues. The array should be defensively copied. ```suggestion OutNetworkObject[] copiedMessages = Arrays.copyOf(messages, size); return new ChunkState(copiedMessages, size, true); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org