[ 
https://issues.apache.org/jira/browse/FLINK-26762?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17524271#comment-17524271
 ] 

Anton Kalashnikov commented on FLINK-26762:
-------------------------------------------

First of all, no rush is needed to implement this idea with 
overdraft-memory-size. Let's check your current implementation and then we can 
discuss whether it makes sense or not to use overdraft-memory-size rather than 
overdraft-buffers.

??Assuming that flatmap can produce 1Mb data at one time, they belong to 
different channels, do we apply for overdraft-buffer according to the actual 
data size???

I suppose we should consider the worst-case when all records will be sent to 
one channel(so for the user this property will be something like `max possible 
value at one time for one channel`)

??If the data is 100 bytes, do we apply for a memory segment of 100 bytes???
??If the same Subpartition has 5 records, we will apply for 5 100byte memory 
segments. Maybe they don't perform well and take up more credit downstream???

We should not change our internal logic of memory segment size or buffers 
number. I suppose the implementation should just take the closest possible 
buffers number. For example, if the user request 100 Kb as overdraft but our 
memory segment size is 32Kb then we take 4 buffers (128 bytes) as overdraft. 
The second implementation idea we always request to set this value to a 
multiple of the buffer size(so 100 Kb is not possible but 128 or 96 is 
possible).
It is one more reason why I don't still sure that this idea is good since on 
the one hand, it is transparent for the user(the user should not think about 
buffers number and etc.) but on the other hand, Flink should not execute this 
request blindly and it should adapt this value to internal implementation(which 
is not so transparent for user).

But if we merge this idea with buffer debloating I think it will work pretty 
great.

??If = -1, flink automatically infers and will use the number of Subpartitions 
as overdraft-buffers. Not only can cover flatmap, window, join scenes, but also 
cover scenes broadcast by Watermark. It will make users more convenient to 
use.??

I don't really get what it means `the number of Subpartitions as 
overdraft-buffers`. `the number of Subpartitions ` is roughly equal to 
parallelism(depends on exchange) and I don't really understand how it 
correlates to overdraft?

> Add the overdraft buffer in BufferPool to reduce unaligned checkpoint being 
> blocked
> -----------------------------------------------------------------------------------
>
>                 Key: FLINK-26762
>                 URL: https://issues.apache.org/jira/browse/FLINK-26762
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Checkpointing, Runtime / Network
>    Affects Versions: 1.13.0, 1.14.0, 1.15.0
>            Reporter: fanrui
>            Assignee: fanrui
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.16.0
>
>         Attachments: image-2022-04-18-11-45-14-700.png, 
> image-2022-04-18-11-46-03-895.png
>
>
> In some past JIRAs of Unaligned Checkpoint, the community has added the  
> recordWriter.isAvaliable() to reduce block for single record write. But for 
> large record, flatmap or broadcast watermark, they may need more buffer.
> Can we add the overdraft buffer in BufferPool to reduce unaligned checkpoint 
> being blocked? 
> h2. Overdraft Buffer mechanism
> Add the configuration of 
> 'taskmanager.network.memory.overdraft-buffers-per-gate=5'. 
> When requestMemory is called and the bufferPool is insufficient, the 
> bufferPool will allow the Task to overdraw up to 5 MemorySegments. And 
> bufferPool will be unavailable until all overdrawn buffers are consumed by 
> downstream tasks. Then the task will wait for bufferPool being available.
> From the above, we have the following benefits:
>  * For scenarios that require multiple buffers, the Task releases the 
> Checkpoint lock, so the Unaligned Checkpoint can be completed quickly.
>  * We can control the memory usage to prevent memory leak.
>  * It just needs a litter memory, and can improve the stability of the Task 
> under back pressure.
>  * Users can increase the overdraft-buffers to adapt the scenarios that 
> require more buffers.
>  
> Masters, please correct me if I'm wrong, thanks a lot.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to