ibessonov commented on code in PR #6323: URL: https://github.com/apache/ignite-3/pull/6323#discussion_r2231335350
########## modules/network/src/main/java/org/apache/ignite/internal/network/netty/OutboundEncoder.java: ########## @@ -76,29 +96,108 @@ protected void encode(ChannelHandlerContext ctx, OutNetworkObject msg, List<Obje writerAttr.set(writer); } - out.add(new NetworkMessageChunkedInput(msg, serializationService, writer)); + return writer; + } + + /** + * Adds new message to the latest chunk if that's possible. Appends it to output list if it's not possible. + */ + private void appendMessage(OutNetworkObject msg, List<Object> out, Channel channel, MessageWriter writer) { + Attribute<NetworkMessageChunkedInput> chunkAttr = channel.attr(CHUNK_KEY); + NetworkMessageChunkedInput chunkedInput = chunkAttr.get(); + + if (chunkedInput == null) { + appendNewChunkedInput(msg, out, writer, chunkAttr); + } else { + while (true) { + ChunkState curState = chunkedInput.state.get(); + + if (curState.finished || curState.size >= MAX_MESSAGES_IN_CHUNK) { + appendNewChunkedInput(msg, out, writer, chunkAttr); + + return; + } else if (chunkedInput.state.compareAndSet(curState, curState.append(msg))) { + return; + } + } + } + } + + private void appendNewChunkedInput( + OutNetworkObject msg, + List<Object> out, + MessageWriter writer, + Attribute<NetworkMessageChunkedInput> chunkAttr + ) { + NetworkMessageChunkedInput chunkedInput = new NetworkMessageChunkedInput(msg, serializationService, writer); + + chunkAttr.set(chunkedInput); + out.add(chunkedInput); + } + + private static final class ChunkState { + final OutNetworkObject[] messages; + final int size; + final boolean finished; + + private ChunkState(OutNetworkObject[] messages, int size, boolean finished) { + this.messages = messages; + this.size = size; + this.finished = finished; + } + + static ChunkState newState(OutNetworkObject msg) { + OutNetworkObject[] messages = new OutNetworkObject[8]; + messages[0] = msg; + + return new ChunkState(messages, 1, false); + } + + ChunkState append(OutNetworkObject msg) { + assert size < MAX_MESSAGES_IN_CHUNK : "ChunkState size should be less than " + MAX_MESSAGES_IN_CHUNK + ", but was " + size; + if (size < messages.length) { + messages[size] = msg; + + return new ChunkState(messages, size + 1, false); Review Comment: This is intended -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org