Copilot commented on code in PR #7894:
URL: https://github.com/apache/ignite-3/pull/7894#discussion_r3000929632


##########
modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java:
##########
@@ -667,24 +670,33 @@ private void writeHandshakeResponse(BitSet 
mutuallySupportedFeatures, ClientMess
         throw new UnsupportedAuthenticationTypeException("Unsupported 
authentication type: " + authnType);
     }
 
-    private void writeAndFlush(ClientMessagePacker packer, 
ChannelHandlerContext ctx) {
+    private boolean writeAndFlush(ClientMessagePacker packer, 
ChannelHandlerContext ctx, ResponseWriteGuard guard) {
         var buf = packer.getBuffer();
         int bytes = buf.readableBytes();
 
         try {
-            // writeAndFlush releases pooled buffer.
-            ctx.writeAndFlush(buf);
+            // write releases pooled buffer.
+            if (!guard.write(ctx, buf)) {
+                // Response for this request has already been sent.
+                // Example: exception after response write, catch block in 
processOperation tries to write an error
+                //          => duplicate response. Guard prevents this.
+                packer.close();
+                return false;
+            }
+
+            ctx.flush();
         } catch (Throwable t) {
             packer.close();
             throw t;
         }
 
         metrics.bytesSentAdd(bytes);
+        return true;
     }
 
-    private void writeAndFlushWithMagic(ClientMessagePacker packer, 
ChannelHandlerContext ctx) {
+    private void writeAndFlushWithMagic(ClientMessagePacker packer, 
ChannelHandlerContext ctx, ResponseWriteGuard guard) {
         ctx.write(Unpooled.wrappedBuffer(ClientMessageCommon.MAGIC_BYTES));
-        writeAndFlush(packer, ctx);
+        writeAndFlush(packer, ctx, guard);
         metrics.bytesSentAdd(ClientMessageCommon.MAGIC_BYTES.length);
     }

Review Comment:
   `writeAndFlushWithMagic` writes MAGIC_BYTES to the channel *before* checking 
`ResponseWriteGuard`. If `guard.write(...)` rejects the actual payload write, 
the magic buffer remains queued (and may be flushed by a later write), 
producing a corrupted/partial response on the wire and also overcounting 
`bytesSent`.
   
   Guarding needs to be applied to the *entire* response (magic + payload) 
atomically (e.g., check/flip the guard before any writes, or write a single 
composite buffer containing magic + payload), and 
`bytesSentAdd(MAGIC_BYTES.length)` should only happen when the response was 
actually written.



##########
modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java:
##########
@@ -862,22 +884,33 @@ private void processOperation(ChannelHandlerContext ctx, 
ClientMessageUnpacker i
 
                 partitionOperationsExecutor.execute(() -> {
                     try {
-                        processOperationInternal(ctx, in, requestId0, opCode0);
+                        processOperationInternal(ctx, in, requestId0, opCode0, 
guard);
                     } catch (Throwable t) {
                         in.close();
 
-                        writeError(requestId0, opCode0, t, ctx, false);
+                        writeError(requestId0, opCode0, t, ctx, false, guard);
 
                         metrics.requestsFailedIncrement();
                     }
                 });
             } else {
-                processOperationInternal(ctx, in, requestId, opCode);
+                processOperationInternal(ctx, in, requestId, opCode, guard);
             }
         } catch (Throwable t) {
             in.close();
 
-            writeError(requestId, opCode, t, ctx, false);
+            // If we failed to read the request ID, we cannot send a valid 
error response.
+            // Close the connection instead.
+            if (requestId == -1) {
+                LOG.warn("Failed to read client request, closing connection 
[connectionId={}, remoteAddress={}]",
+                        t, connectionId, ctx.channel().remoteAddress());
+

Review Comment:
   In the `requestId == -1` error path, `processOperation` returns early after 
`requestsActiveIncrement()` without a corresponding 
`requestsActiveDecrement()`. This will leak the "active requests" metric for 
malformed/partial messages.
   
   Consider ensuring `requestsActiveDecrement()` is always executed on all exit 
paths (e.g., decrement before returning here or restructure with a `finally`).
   ```suggestion
   
                   metrics.requestsActiveDecrement();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to