SteNicholas opened a new pull request, #3649:
URL: https://github.com/apache/celeborn/pull/3649

   ### What changes were proposed in this pull request?
   
   `MessageEncoder` enables zero-copy sendfile for `FileRegion` in Netty native 
transports, which emits the header `ByteBuf` and `FileRegion` as **separate 
objects** in the outbound message list when the body is a `FileRegion` backed 
by `FileSegmentManagedBuffer`, instead of wrapping them together in a 
`MessageWithHeader`.
   
   Previously, all messages with a body were unconditionally wrapped in 
`MessageWithHeader`. This caused native transports (EPOLL, KQUEUE) to fall into 
a generic `FileRegion.transferTo()` fallback path that copies data through 
user-space, bypassing the optimized `sendfile()` / `splice()` zero-copy path 
that Netty's native transports provide.
   
   The split is only applied when the `ManagedBuffer` is a 
`FileSegmentManagedBuffer`, whose `release()` is a no-op, making it safe to 
emit the `FileRegion` independently of write lifecycle management. Other 
`ManagedBuffer` types (e.g., `BlockManagerManagedBuffer`) still use the 
`MessageWithHeader` wrapper because they perform resource cleanup in 
`release()` that must be tied to `MessageWithHeader.deallocate()`.
   
   ### Why are the changes needed?
   
   When using native transports (AUTO/EPOLL on Linux), file-backed shuffle 
fetch performance was severely degraded compared to NIO mode. The root cause 
lies in how Netty's native transports dispatch `FileRegion` writes.
   
   In `AbstractEpollStreamChannel.doWriteSingle()` (and the analogous KQueue 
path), Netty uses an `instanceof` check to choose between two write strategies:
   
   
https://github.com/netty/netty/blob/eeb5674526f0b49a142580686a5a9a7147ddadec/transport-classes-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java#L474-L493
   
   ```java
   } else if (msg instanceof DefaultFileRegion) {
       return writeDefaultFileRegion(in, (DefaultFileRegion) msg);  // → 
socket.sendFile() (zero-copy)
   } else if (msg instanceof FileRegion) {
       return writeFileRegion(in, (FileRegion) msg);                // → 
region.transferTo() (user-space copy)
   }
   ```
   
   - **`writeDefaultFileRegion()`** calls `socket.sendFile()`, which maps 
directly to the Linux `sendfile()` syscall — a true zero-copy path where data 
is transferred from the file page cache to the socket buffer entirely within 
the kernel, with no user-space copy.
   
   
https://github.com/netty/netty/blob/eeb5674526f0b49a142580686a5a9a7147ddadec/transport-classes-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java#L367-L386
   
   ```java
       private int writeDefaultFileRegion(ChannelOutboundBuffer in, 
DefaultFileRegion region) throws Exception {
           final long offset = region.transferred();
           final long regionCount = region.count();
           if (offset >= regionCount) {
               in.remove();
               return 0;
           }
   
           final long flushedAmount = socket.sendFile(region, 
region.position(), offset, regionCount - offset);
           if (flushedAmount > 0) {
               in.progress(flushedAmount);
               if (region.transferred() >= regionCount) {
                   in.remove();
               }
               return 1;
           } else if (flushedAmount == 0) {
               validateFileRegion(region, offset);
           }
           return WRITE_STATUS_SNDBUF_FULL;
       }
   ```
   
   - **`writeFileRegion()`** falls back to 
`region.transferTo(WritableByteChannel)`, which writes data through a 
`SocketWritableByteChannel` wrapper — effectively a user-space copy path.
   
   
https://github.com/netty/netty/blob/eeb5674526f0b49a142580686a5a9a7147ddadec/transport-classes-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java#L402-L420
   
   ```java
   private int writeFileRegion(ChannelOutboundBuffer in, FileRegion region) 
throws Exception {
           if (region.transferred() >= region.count()) {
               in.remove();
               return 0;
           }
   
           if (byteChannel == null) {
               byteChannel = new EpollSocketWritableByteChannel();
           }
           final long flushedAmount = region.transferTo(byteChannel, 
region.transferred());
           if (flushedAmount > 0) {
               in.progress(flushedAmount);
               if (region.transferred() >= region.count()) {
                   in.remove();
               }
               return 1;
           }
           return WRITE_STATUS_SNDBUF_FULL;
       }
   ```
   
   Spark's `MessageWithHeader extends AbstractFileRegion` (not 
`DefaultFileRegion`). When `MessageEncoder` wraps a `DefaultFileRegion` body 
inside `MessageWithHeader`, the resulting object is a generic `FileRegion` from 
Netty's perspective. This means Netty dispatches it to the `writeFileRegion()` 
fallback, which calls `MessageWithHeader.transferTo()`:
   
   ```java
   // MessageWithHeader.java, line 121
   if (body instanceof FileRegion fileRegion) {
       writtenBody = fileRegion.transferTo(target, totalBytesTransferred - 
headerLength);
   }
   ```
   
   Here, even though the inner body is a `DefaultFileRegion`, its 
`transferTo()` is invoked with a `WritableByteChannel` (not a file descriptor), 
so the data is read from the file into a user-space buffer and then written to 
the socket — **the zero-copy opportunity is lost**.
   
   By emitting the `DefaultFileRegion` directly into Netty's outbound buffer 
(instead of wrapping it in `MessageWithHeader`), Netty's native transport 
recognizes it via `instanceof DefaultFileRegion` and routes it to 
`socket.sendFile()`, restoring the zero-copy `sendfile()` path.
   
   **Benchmark results (File-Backed Shuffle Fetch) show dramatic improvement:**
   
   | Scenario | Before (ms) | After (ms) | Improvement |
   |---|---|---|---|
   | KQUEUE, sequential fetch (JDK8) | 2227 | 420 | **~5.3x faster** |
   | KQUEUE, parallel fetch (JDK8) | 1810 | 805 | **~2.4x faster** |
   
   ### Does this PR resolve a correctness bug?
   
   No.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No. This is an internal optimization to the Netty transport layer. Users 
benefit from improved shuffle fetch performance when using native transports 
(the default on Linux) without any configuration changes.
   
   ### How was this patch tested?
   
   - Re-ran `NettyTransportBenchmark` with JDK8 to confirm the performance 
improvement. Updated benchmark result files accordingly.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to