SteNicholas opened a new pull request, #3649: URL: https://github.com/apache/celeborn/pull/3649
### What changes were proposed in this pull request? `MessageEncoder` enables zero-copy sendfile for `FileRegion` in Netty native transports, which emits the header `ByteBuf` and `FileRegion` as **separate objects** in the outbound message list when the body is a `FileRegion` backed by `FileSegmentManagedBuffer`, instead of wrapping them together in a `MessageWithHeader`. Previously, all messages with a body were unconditionally wrapped in `MessageWithHeader`. This caused native transports (EPOLL, KQUEUE) to fall into a generic `FileRegion.transferTo()` fallback path that copies data through user-space, bypassing the optimized `sendfile()` / `splice()` zero-copy path that Netty's native transports provide. The split is only applied when the `ManagedBuffer` is a `FileSegmentManagedBuffer`, whose `release()` is a no-op, making it safe to emit the `FileRegion` independently of write lifecycle management. Other `ManagedBuffer` types (e.g., `BlockManagerManagedBuffer`) still use the `MessageWithHeader` wrapper because they perform resource cleanup in `release()` that must be tied to `MessageWithHeader.deallocate()`. ### Why are the changes needed? When using native transports (AUTO/EPOLL on Linux), file-backed shuffle fetch performance was severely degraded compared to NIO mode. The root cause lies in how Netty's native transports dispatch `FileRegion` writes. In `AbstractEpollStreamChannel.doWriteSingle()` (and the analogous KQueue path), Netty uses an `instanceof` check to choose between two write strategies: https://github.com/netty/netty/blob/eeb5674526f0b49a142580686a5a9a7147ddadec/transport-classes-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java#L474-L493 ```java } else if (msg instanceof DefaultFileRegion) { return writeDefaultFileRegion(in, (DefaultFileRegion) msg); // → socket.sendFile() (zero-copy) } else if (msg instanceof FileRegion) { return writeFileRegion(in, (FileRegion) msg); // → region.transferTo() (user-space copy) } ``` - **`writeDefaultFileRegion()`** calls `socket.sendFile()`, which maps directly to the Linux `sendfile()` syscall — a true zero-copy path where data is transferred from the file page cache to the socket buffer entirely within the kernel, with no user-space copy. https://github.com/netty/netty/blob/eeb5674526f0b49a142580686a5a9a7147ddadec/transport-classes-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java#L367-L386 ```java private int writeDefaultFileRegion(ChannelOutboundBuffer in, DefaultFileRegion region) throws Exception { final long offset = region.transferred(); final long regionCount = region.count(); if (offset >= regionCount) { in.remove(); return 0; } final long flushedAmount = socket.sendFile(region, region.position(), offset, regionCount - offset); if (flushedAmount > 0) { in.progress(flushedAmount); if (region.transferred() >= regionCount) { in.remove(); } return 1; } else if (flushedAmount == 0) { validateFileRegion(region, offset); } return WRITE_STATUS_SNDBUF_FULL; } ``` - **`writeFileRegion()`** falls back to `region.transferTo(WritableByteChannel)`, which writes data through a `SocketWritableByteChannel` wrapper — effectively a user-space copy path. https://github.com/netty/netty/blob/eeb5674526f0b49a142580686a5a9a7147ddadec/transport-classes-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java#L402-L420 ```java private int writeFileRegion(ChannelOutboundBuffer in, FileRegion region) throws Exception { if (region.transferred() >= region.count()) { in.remove(); return 0; } if (byteChannel == null) { byteChannel = new EpollSocketWritableByteChannel(); } final long flushedAmount = region.transferTo(byteChannel, region.transferred()); if (flushedAmount > 0) { in.progress(flushedAmount); if (region.transferred() >= region.count()) { in.remove(); } return 1; } return WRITE_STATUS_SNDBUF_FULL; } ``` Spark's `MessageWithHeader extends AbstractFileRegion` (not `DefaultFileRegion`). When `MessageEncoder` wraps a `DefaultFileRegion` body inside `MessageWithHeader`, the resulting object is a generic `FileRegion` from Netty's perspective. This means Netty dispatches it to the `writeFileRegion()` fallback, which calls `MessageWithHeader.transferTo()`: ```java // MessageWithHeader.java, line 121 if (body instanceof FileRegion fileRegion) { writtenBody = fileRegion.transferTo(target, totalBytesTransferred - headerLength); } ``` Here, even though the inner body is a `DefaultFileRegion`, its `transferTo()` is invoked with a `WritableByteChannel` (not a file descriptor), so the data is read from the file into a user-space buffer and then written to the socket — **the zero-copy opportunity is lost**. By emitting the `DefaultFileRegion` directly into Netty's outbound buffer (instead of wrapping it in `MessageWithHeader`), Netty's native transport recognizes it via `instanceof DefaultFileRegion` and routes it to `socket.sendFile()`, restoring the zero-copy `sendfile()` path. **Benchmark results (File-Backed Shuffle Fetch) show dramatic improvement:** | Scenario | Before (ms) | After (ms) | Improvement | |---|---|---|---| | KQUEUE, sequential fetch (JDK8) | 2227 | 420 | **~5.3x faster** | | KQUEUE, parallel fetch (JDK8) | 1810 | 805 | **~2.4x faster** | ### Does this PR resolve a correctness bug? No. ### Does this PR introduce _any_ user-facing change? No. This is an internal optimization to the Netty transport layer. Users benefit from improved shuffle fetch performance when using native transports (the default on Linux) without any configuration changes. ### How was this patch tested? - Re-ran `NettyTransportBenchmark` with JDK8 to confirm the performance improvement. Updated benchmark result files accordingly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
