ibessonov commented on code in PR #6323:
URL: https://github.com/apache/ignite-3/pull/6323#discussion_r2231335938


##########
modules/network/src/main/java/org/apache/ignite/internal/network/netty/OutboundEncoder.java:
##########
@@ -76,29 +96,108 @@ protected void encode(ChannelHandlerContext ctx, 
OutNetworkObject msg, List<Obje
             writerAttr.set(writer);
         }
 
-        out.add(new NetworkMessageChunkedInput(msg, serializationService, 
writer));
+        return writer;
+    }
+
+    /**
+     * Adds new message to the latest chunk if that's possible. Appends it to 
output list if it's not possible.
+     */
+    private void appendMessage(OutNetworkObject msg, List<Object> out, Channel 
channel, MessageWriter writer) {
+        Attribute<NetworkMessageChunkedInput> chunkAttr = 
channel.attr(CHUNK_KEY);
+        NetworkMessageChunkedInput chunkedInput = chunkAttr.get();
+
+        if (chunkedInput == null) {
+            appendNewChunkedInput(msg, out, writer, chunkAttr);
+        } else {
+            while (true) {
+                ChunkState curState = chunkedInput.state.get();
+
+                if (curState.finished || curState.size >= 
MAX_MESSAGES_IN_CHUNK) {
+                    appendNewChunkedInput(msg, out, writer, chunkAttr);
+
+                    return;
+                } else if (chunkedInput.state.compareAndSet(curState, 
curState.append(msg))) {
+                    return;
+                }
+            }
+        }
+    }
+
+    private void appendNewChunkedInput(
+            OutNetworkObject msg,
+            List<Object> out,
+            MessageWriter writer,
+            Attribute<NetworkMessageChunkedInput> chunkAttr
+    ) {
+        NetworkMessageChunkedInput chunkedInput = new 
NetworkMessageChunkedInput(msg, serializationService, writer);
+
+        chunkAttr.set(chunkedInput);
+        out.add(chunkedInput);
+    }
+
+    private static final class ChunkState {
+        final OutNetworkObject[] messages;
+        final int size;
+        final boolean finished;
+
+        private ChunkState(OutNetworkObject[] messages, int size, boolean 
finished) {
+            this.messages = messages;
+            this.size = size;
+            this.finished = finished;
+        }
+
+        static ChunkState newState(OutNetworkObject msg) {
+            OutNetworkObject[] messages = new OutNetworkObject[8];
+            messages[0] = msg;
+
+            return new ChunkState(messages, 1, false);
+        }
+
+        ChunkState append(OutNetworkObject msg) {
+            assert size < MAX_MESSAGES_IN_CHUNK : "ChunkState size should be 
less than " + MAX_MESSAGES_IN_CHUNK + ", but was " + size;
+            if (size < messages.length) {
+                messages[size] = msg;
+
+                return new ChunkState(messages, size + 1, false);
+            } else {
+                OutNetworkObject[] newMessages = Arrays.copyOf(messages, 
Math.min(MAX_MESSAGES_IN_CHUNK, size + (size >> 1)));
+                newMessages[size] = msg;
+
+                return new ChunkState(newMessages, size + 1, false);
+            }
+        }
+
+        ChunkState finish() {
+            return new ChunkState(messages, size, true);

Review Comment:
   This is intended



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to