loserwang1024 opened a new issue, #2773:
URL: https://github.com/apache/fluss/issues/2773

   ### Search before asking
   
   - [x] I searched in the [issues](https://github.com/apache/fluss/issues) and 
found nothing similar.
   
   
   ### Description
   
   ###  Off-Heap Memory Allocation in Flink
   Flink's default off-heap memory (taskmanager.memory.task.off-heap.size = 0 + 
taskmanager.memory.framework.off-heap.size = 128MB) totals 128MB. This is 
shared between the framework (e.g., Flink's internal operations) and 
third-party connectors (not only just  Fluss).
   
   From Fluss' perspective, the recommended batch size is 2–5MB (raw data). 
However, during deserialization and decompression, the memory requirement 
increases to 10–15MB per batch. If a task has multiple slots, this demand 
multiplies, leading to severe off-heap memory contention.
   
   **This limited off-heap memory leaves insufficient space for Netty clients**.
   
   ### Netty Client Memory Constraints
   
   Flink's default log.replica.fetch.max-bytes is 16MB.A Netty client 
connecting to 4 servers would require at least 64MB of off-heap memory (16MB × 
4).While we could throw an error to force users to increase off-heap memory, 
this approach is not user-friendly. Instead, we prioritize graceful 
degradation: if off-heap memory is insufficient, the system should read data 
more slowly rather than failing abruptly.
   
   ###  Proposed Configuration: Heap-First Allocation
   
   I want to propose Heap-First Allocation for netty:
   1. Flink's default heap memory is sufficiently large
   2.  In NettyClientHandler, the record also will be copied as heap memory. 
Thus, why not just use PreferHeapByteBufAllocator in netty client(.
   <img width="983" height="307" alt="Image" 
src="https://github.com/user-attachments/assets/73d48e19-0c60-4ba5-addd-bbcd1a5a1ff6";
 />.
   
   ```java
   -----------------                     
   | network buffer |
   -----------------
                |
            epoll a piece of network buffer
               |
   -----------------                     
   |  little direct buffer |
   -------------------
              | 
          ByteToMessageDecoder.COMPOSITE_CUMULATOR
            | 
   --------------------------------------------------                     
   |           heap memory( the whole responce body       |
   ----------------------------------------------------
   
   
   
   
   ````
   Introduce a new configuration:
   ```java
   
       public static final ConfigOption<Boolean> 
NETTY_CLIENT_ALLOCATOR_HEAP_BUFFER_FIRST =
               key("netty.client.allocator.heap-buffer-first")
                       .booleanType()
                       .defaultValue(true)
                       .withDescription(
                               "Whether to allocate heap buffer first for the 
netty client. "
                                       + "If set to false, direct buffer will 
be used first, "
                                       + "which requires sufficient off-heap 
memory to be available.");
   ```
   * Default (true): Prefer heap buffers to avoid off-heap memory exhaustion.
   * Advanced Use Case: Users seeking performance improvements (e.g., reduced 
GC pressure) can set heap-buffer-first = false and manually increase Flink's 
off-heap memory.
   * 
   This design balances stability for default users and flexibility for 
advanced users.
   
   ### Willingness to contribute
   
   - [x] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to