Copilot commented on code in PR #6323:
URL: https://github.com/apache/ignite-3/pull/6323#discussion_r2231173415


##########
modules/network/src/main/java/org/apache/ignite/internal/network/netty/OutboundEncoder.java:
##########
@@ -48,11 +52,17 @@ public class OutboundEncoder extends 
MessageToMessageEncoder<OutNetworkObject> {
 
     private static final int IO_BUFFER_CAPACITY = 16 * 1024;
 
+    /** Max number of messages in a single chink. */

Review Comment:
   Typo in comment: 'chink' should be 'chunk'.
   ```suggestion
       /** Max number of messages in a single chunk. */
   ```



##########
modules/network/src/main/java/org/apache/ignite/internal/network/netty/OutboundEncoder.java:
##########
@@ -140,12 +246,24 @@ private NetworkMessageChunkedInput(
             }
 
             this.serializer = 
serializationService.createMessageSerializer(msg.groupType(), 
msg.messageType());
-            this.writer = writer;
+        }
+
+        private void cleanupMessage() {
+            // Best effort fast cleanup. Won't work if array is concurrently 
reallocated.
+            // Non-volatile write is fine, no one will read this value anymore.
+            this.state.get().messages[currentMessageIndex] = null;

Review Comment:
   Race condition: accessing state.get().messages without proper 
synchronization could lead to accessing a stale or reallocated array. Consider 
using a local variable to capture the current state before accessing the 
messages array.
   ```suggestion
               ChunkState currentState = this.state.get();
               currentState.messages[currentMessageIndex] = null;
   ```



##########
modules/network/src/main/java/org/apache/ignite/internal/network/netty/OutboundEncoder.java:
##########
@@ -76,29 +96,108 @@ protected void encode(ChannelHandlerContext ctx, 
OutNetworkObject msg, List<Obje
             writerAttr.set(writer);
         }
 
-        out.add(new NetworkMessageChunkedInput(msg, serializationService, 
writer));
+        return writer;
+    }
+
+    /**
+     * Adds new message to the latest chunk if that's possible. Appends it to 
output list if it's not possible.
+     */
+    private void appendMessage(OutNetworkObject msg, List<Object> out, Channel 
channel, MessageWriter writer) {
+        Attribute<NetworkMessageChunkedInput> chunkAttr = 
channel.attr(CHUNK_KEY);
+        NetworkMessageChunkedInput chunkedInput = chunkAttr.get();
+
+        if (chunkedInput == null) {
+            appendNewChunkedInput(msg, out, writer, chunkAttr);
+        } else {
+            while (true) {
+                ChunkState curState = chunkedInput.state.get();
+
+                if (curState.finished || curState.size >= 
MAX_MESSAGES_IN_CHUNK) {
+                    appendNewChunkedInput(msg, out, writer, chunkAttr);
+
+                    return;
+                } else if (chunkedInput.state.compareAndSet(curState, 
curState.append(msg))) {
+                    return;
+                }
+            }
+        }
+    }
+
+    private void appendNewChunkedInput(
+            OutNetworkObject msg,
+            List<Object> out,
+            MessageWriter writer,
+            Attribute<NetworkMessageChunkedInput> chunkAttr
+    ) {
+        NetworkMessageChunkedInput chunkedInput = new 
NetworkMessageChunkedInput(msg, serializationService, writer);
+
+        chunkAttr.set(chunkedInput);
+        out.add(chunkedInput);
+    }
+
+    private static final class ChunkState {
+        final OutNetworkObject[] messages;
+        final int size;
+        final boolean finished;
+
+        private ChunkState(OutNetworkObject[] messages, int size, boolean 
finished) {
+            this.messages = messages;
+            this.size = size;
+            this.finished = finished;
+        }
+
+        static ChunkState newState(OutNetworkObject msg) {
+            OutNetworkObject[] messages = new OutNetworkObject[8];
+            messages[0] = msg;
+
+            return new ChunkState(messages, 1, false);
+        }
+
+        ChunkState append(OutNetworkObject msg) {
+            assert size < MAX_MESSAGES_IN_CHUNK : "ChunkState size should be 
less than " + MAX_MESSAGES_IN_CHUNK + ", but was " + size;
+            if (size < messages.length) {
+                messages[size] = msg;
+
+                return new ChunkState(messages, size + 1, false);

Review Comment:
   The existing messages array is being mutated and then shared with a new 
ChunkState instance. This creates a race condition where concurrent reads of 
the old state could see the modified array. The array should be copied to 
ensure immutability.
   ```suggestion
                   return new ChunkState(Arrays.copyOf(messages, 
messages.length), size + 1, false);
   ```



##########
modules/network/src/main/java/org/apache/ignite/internal/network/netty/OutboundEncoder.java:
##########
@@ -170,34 +288,57 @@ public ByteBuf readChunk(ByteBufAllocator allocator) {
 
             writer.setBuffer(byteBuffer);
 
-            while (byteBuffer.hasRemaining()) {
+            writeMessages();
+
+            buffer.writerIndex(byteBuffer.position() - initialPosition);
+
+            // Do not hold a reference, might help GC to do its job better.
+            writer.setBuffer(EMPTY_BYTE_BUFFER);
+
+            return buffer;
+        }
+
+        /**
+         * Tries to write as many messages from {@link #state} as possible, 
until either buffer exhaustion or messages exhaustion.
+         */
+        private void writeMessages() {
+            while (true) {
                 if (!descriptorsFinished) {
                     descriptorsFinished = 
descriptorSerializer.writeMessage(descriptors, writer);
-                    if (descriptorsFinished) {
-                        for (ClassDescriptorMessage classDescriptorMessage : 
descriptors.messages()) {
-                            
serializationService.addSentDescriptor(classDescriptorMessage.descriptorId());
-                        }
-                        writer.reset();
-                    } else {
-                        break;
+                    if (!descriptorsFinished) {
+                        return;
                     }
-                } else {
-                    finished = serializer.writeMessage(msg, writer);
 
-                    if (finished) {
-                        writer.reset();
+                    for (ClassDescriptorMessage classDescriptorMessage : 
descriptors.messages()) {
+                        
serializationService.addSentDescriptor(classDescriptorMessage.descriptorId());
                     }
 
-                    break;
+                    writer.reset();
                 }
-            }
 
-            buffer.writerIndex(byteBuffer.position() - initialPosition);
+                boolean messageFinished = serializer.writeMessage(msg, writer);
+                if (!messageFinished) {
+                    return;
+                }
 
-            // Do not hold a reference, might help GC to do its job better.
-            writer.setBuffer(EMPTY_BYTE_BUFFER);
+                writer.reset();
 
-            return buffer;
+                cleanupMessage();
+                currentMessageIndex++;
+
+                // Message is successfully written, now we must check if 
there's another message in our chunk.
+                while (true) {
+                    ChunkState curState = state.get();
+
+                    if (currentMessageIndex < curState.size) {
+                        prepareMessage(curState.messages[currentMessageIndex]);
+
+                        break;

Review Comment:
   Race condition: accessing curState.messages[currentMessageIndex] without 
bounds checking could cause ArrayIndexOutOfBoundsException if the array was 
reallocated between the size check and access. Consider adding bounds 
validation.
   ```suggestion
                           // Validate bounds before accessing the array
                           if (currentMessageIndex >= 0 && currentMessageIndex 
< curState.messages.length) {
                               
prepareMessage(curState.messages[currentMessageIndex]);
                               break;
                           } else {
                               throw new 
ArrayIndexOutOfBoundsException("Invalid currentMessageIndex: " + 
currentMessageIndex);
                           }
   ```



##########
modules/network/src/main/java/org/apache/ignite/internal/network/netty/OutboundEncoder.java:
##########
@@ -76,29 +96,108 @@ protected void encode(ChannelHandlerContext ctx, 
OutNetworkObject msg, List<Obje
             writerAttr.set(writer);
         }
 
-        out.add(new NetworkMessageChunkedInput(msg, serializationService, 
writer));
+        return writer;
+    }
+
+    /**
+     * Adds new message to the latest chunk if that's possible. Appends it to 
output list if it's not possible.
+     */
+    private void appendMessage(OutNetworkObject msg, List<Object> out, Channel 
channel, MessageWriter writer) {
+        Attribute<NetworkMessageChunkedInput> chunkAttr = 
channel.attr(CHUNK_KEY);
+        NetworkMessageChunkedInput chunkedInput = chunkAttr.get();
+
+        if (chunkedInput == null) {
+            appendNewChunkedInput(msg, out, writer, chunkAttr);
+        } else {
+            while (true) {
+                ChunkState curState = chunkedInput.state.get();
+
+                if (curState.finished || curState.size >= 
MAX_MESSAGES_IN_CHUNK) {
+                    appendNewChunkedInput(msg, out, writer, chunkAttr);
+
+                    return;
+                } else if (chunkedInput.state.compareAndSet(curState, 
curState.append(msg))) {
+                    return;
+                }
+            }
+        }
+    }
+
+    private void appendNewChunkedInput(
+            OutNetworkObject msg,
+            List<Object> out,
+            MessageWriter writer,
+            Attribute<NetworkMessageChunkedInput> chunkAttr
+    ) {
+        NetworkMessageChunkedInput chunkedInput = new 
NetworkMessageChunkedInput(msg, serializationService, writer);
+
+        chunkAttr.set(chunkedInput);
+        out.add(chunkedInput);
+    }
+
+    private static final class ChunkState {
+        final OutNetworkObject[] messages;
+        final int size;
+        final boolean finished;
+
+        private ChunkState(OutNetworkObject[] messages, int size, boolean 
finished) {
+            this.messages = messages;
+            this.size = size;
+            this.finished = finished;
+        }
+
+        static ChunkState newState(OutNetworkObject msg) {
+            OutNetworkObject[] messages = new OutNetworkObject[8];
+            messages[0] = msg;
+
+            return new ChunkState(messages, 1, false);
+        }
+
+        ChunkState append(OutNetworkObject msg) {
+            assert size < MAX_MESSAGES_IN_CHUNK : "ChunkState size should be 
less than " + MAX_MESSAGES_IN_CHUNK + ", but was " + size;
+            if (size < messages.length) {
+                messages[size] = msg;
+
+                return new ChunkState(messages, size + 1, false);
+            } else {
+                OutNetworkObject[] newMessages = Arrays.copyOf(messages, 
Math.min(MAX_MESSAGES_IN_CHUNK, size + (size >> 1)));
+                newMessages[size] = msg;
+
+                return new ChunkState(newMessages, size + 1, false);
+            }
+        }
+
+        ChunkState finish() {
+            return new ChunkState(messages, size, true);

Review Comment:
   The finish() method shares the same messages array reference between the old 
and new ChunkState instances. This violates immutability and could lead to 
concurrent modification issues. The array should be defensively copied.
   ```suggestion
               OutNetworkObject[] copiedMessages = Arrays.copyOf(messages, 
size);
               return new ChunkState(copiedMessages, size, true);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to