Hi, all
In the Flink 1.12 we introduce the TM merge shuffle. But the out-of-the-box experience of using TM merge shuffle is not very good. The main reason is that the default configuration always makes users encounter OOM [1]. So we hope to introduce a managed memory pool for TM merge shuffle to avoid the problem. Goals 1. Don't affect the streaming and pipelined-shuffle-only batch setups. 2. Don't mix memory with different life cycle in the same pool. E.g., write buffers needed by running tasks and read buffer needed even after tasks being finished. 3. User can use the TM merge shuffle with default memory configurations. (May need further tunings for performance optimization, but should not fail with the default configurations.) Proposal 1. Introduce a configuration `taskmanager.memory.network.batch-read` to specify the size of this memory pool. The default value is 16m. 2. Allocate the pool lazily. It means that the memory pool would be allocated when the TM merge shuffle is used at the first time. 3. This pool size will not be add up to the TM's total memory size, but will be considered part of `taskmanager.memory.framework.off-heap.size`. We need to check that the pool size is not larger than the framework off-heap size, if TM merge shuffle is enabled. In this default configuration, the allocation of the memory pool is almost impossible to fail. Currently the default frameworkâs off-heap memory is 128m, which is mainly used by Netty. But after we introduced zero copy, the usage of it has been reduced, and you can refer to the detailed data [2]. Known Limitation Usability for increasing the memory pool size In addition to increasing `taskmanager.memory.network.batch-read`, the user may also need to adjust `taskmanager.memory.framework.off-heap.size` at the same time. It also means that once the user forgets this, it is likely to fail the check when allocating the memory pool. So in the following two situations, we will still prompt the user to increase the size of `framework.off-heap.size`. 1. `taskmanager.memory.network.batch-read` is bigger than `taskmanager.memory.framework.off-heap.size` 2. Allocating the pool encounters the OOM. An alternative is that when the user adjusts the size of the memory pool, the system automatically adjusts it. But we are not entierly sure about this, given its implicity and complicating the memory configurations. Potential memory waste In the first step, the memory pool will not be released once allocated. This means in the first step, even if there is no subsequent batch job, the pooled memory cannot be used by other consumers. We are not releasing the pool in the first step due to the concern that frequently allocating/deallocating the entire pool may increase the GC pressue. Investitations on how to dynamically release the pool when it's no longer needed is considered a future follow-up. Looking forward to your feedback. [1] https://issues.apache.org/jira/browse/FLINK-20740 [2] https://github.com/apache/flink/pull/7368. Best, Guowei