loserwang1024 opened a new issue, #2773: URL: https://github.com/apache/fluss/issues/2773
### Search before asking - [x] I searched in the [issues](https://github.com/apache/fluss/issues) and found nothing similar. ### Description ### Off-Heap Memory Allocation in Flink Flink's default off-heap memory (taskmanager.memory.task.off-heap.size = 0 + taskmanager.memory.framework.off-heap.size = 128MB) totals 128MB. This is shared between the framework (e.g., Flink's internal operations) and third-party connectors (not only just Fluss). From Fluss' perspective, the recommended batch size is 2–5MB (raw data). However, during deserialization and decompression, the memory requirement increases to 10–15MB per batch. If a task has multiple slots, this demand multiplies, leading to severe off-heap memory contention. **This limited off-heap memory leaves insufficient space for Netty clients**. ### Netty Client Memory Constraints Flink's default log.replica.fetch.max-bytes is 16MB.A Netty client connecting to 4 servers would require at least 64MB of off-heap memory (16MB × 4).While we could throw an error to force users to increase off-heap memory, this approach is not user-friendly. Instead, we prioritize graceful degradation: if off-heap memory is insufficient, the system should read data more slowly rather than failing abruptly. ### Proposed Configuration: Heap-First Allocation I want to propose Heap-First Allocation for netty: 1. Flink's default heap memory is sufficiently large 2. In NettyClientHandler, the record also will be copied as heap memory. Thus, why not just use PreferHeapByteBufAllocator in netty client(. <img width="983" height="307" alt="Image" src="https://github.com/user-attachments/assets/73d48e19-0c60-4ba5-addd-bbcd1a5a1ff6" />. ```java ----------------- | network buffer | ----------------- | epoll a piece of network buffer | ----------------- | little direct buffer | ------------------- | ByteToMessageDecoder.COMPOSITE_CUMULATOR | -------------------------------------------------- | heap memory( the whole responce body | ---------------------------------------------------- ```` Introduce a new configuration: ```java public static final ConfigOption<Boolean> NETTY_CLIENT_ALLOCATOR_HEAP_BUFFER_FIRST = key("netty.client.allocator.heap-buffer-first") .booleanType() .defaultValue(true) .withDescription( "Whether to allocate heap buffer first for the netty client. " + "If set to false, direct buffer will be used first, " + "which requires sufficient off-heap memory to be available."); ``` * Default (true): Prefer heap buffers to avoid off-heap memory exhaustion. * Advanced Use Case: Users seeking performance improvements (e.g., reduced GC pressure) can set heap-buffer-first = false and manually increase Flink's off-heap memory. * This design balances stability for default users and flexibility for advanced users. ### Willingness to contribute - [x] I'm willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
