Hi, all

In the Flink 1.12 we introduce the TM merge shuffle. But the out-of-the-box
experience of using TM merge shuffle is not very good. The main reason is
that the default configuration always makes users encounter OOM [1]. So we
hope to introduce a managed memory pool for TM merge shuffle to avoid the
problem.
Goals

   1. Don't affect the streaming and pipelined-shuffle-only batch setups.
   2. Don't mix memory with different life cycle in the same pool. E.g.,
   write buffers needed by running tasks and read buffer needed even after
   tasks being finished.
   3. User can use the TM merge shuffle with default memory configurations.
   (May need further tunings for performance optimization, but should not fail
   with the default configurations.)

Proposal

   1. Introduce a configuration `taskmanager.memory.network.batch-read` to
   specify the size of this memory pool. The default value is 16m.
   2. Allocate the pool lazily. It means that the memory pool would be
   allocated when the TM merge shuffle is used at the first time.
   3. This pool size will not be add up to the TM's total memory size, but
   will be considered part of `taskmanager.memory.framework.off-heap.size`. We
   need to check that the pool size is not larger than the framework off-heap
   size, if TM merge shuffle is enabled.


In this default configuration, the allocation of the memory pool is almost
impossible to fail. Currently the default framework’s off-heap memory is
128m, which is mainly used by Netty. But after we introduced zero copy, the
usage of it has been reduced, and you can refer to the detailed data [2].
Known Limitation
Usability for increasing the memory pool size

In addition to increasing `taskmanager.memory.network.batch-read`, the user
may also need to adjust `taskmanager.memory.framework.off-heap.size` at the
same time. It also means that once the user forgets this, it is likely to
fail the check when allocating the memory pool.


So in the following two situations, we will still prompt the user to
increase the size of `framework.off-heap.size`.

   1. `taskmanager.memory.network.batch-read` is bigger than
   `taskmanager.memory.framework.off-heap.size`
   2. Allocating the pool encounters the OOM.


An alternative is that when the user adjusts the size of the memory pool,
the system automatically adjusts it. But we are not entierly sure about
this, given its implicity and complicating the memory configurations.
Potential memory waste

In the first step, the memory pool will not be released once allocated. This
means in the first step, even if there is no subsequent batch job, the
pooled memory cannot be used by other consumers.


We are not releasing the pool in the first step due to the concern that
frequently allocating/deallocating the entire pool may increase the GC
pressue. Investitations on how to dynamically release the pool when it's no
longer needed is considered a future follow-up.


Looking forward to your feedback.



[1] https://issues.apache.org/jira/browse/FLINK-20740

[2] https://github.com/apache/flink/pull/7368.
Best,
Guowei

Reply via email to