[ 
https://issues.apache.org/jira/browse/FLINK-13245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16888788#comment-16888788
 ] 

zhijiang commented on FLINK-13245:
----------------------------------

Thanks for these further thoughts and I think it would make things more clearly 
after discussion. [~azagrebin]

I could understand your above concerns. I agree that the current semantics of 
`CancelPartitionRequest/CloseRequest` are not very accurate, because they could 
indicate either successful consumption on consumer side or consumer task fails 
/any exceptions during consumption.

Considering the concern of when to call `notifySubpartitionConsumed`, I think 
the current implementation is based on whether the producer receives the 
confirmable notification(Cancel/CloseRequest) from consumer side. If it 
receives any messages then it would call `notifySubpartitionConsumed` no matter 
with consumer finishes/fails. In the case of handling channel exception, it 
only happens in consumer locally, so it would not call 
`notifySubpartitionConsumed`. Also for the case of channel inactive, the 
producer could not distinguish whether it is caused by initiative close 
connection on consumer side or TM lost exceptionally, so it would not call ` 
notifySubpartitionConsumed`.

For the first concern, we could provide the more definitely messages for 
clearly semantics of consumption successful/failed instead of current 
`Cancel/ClosePartition`. For the second concern we could also consider the 
proper way for handling messages/exception/inactive. But one precondition is 
that we should confirm the specific semantic of releasing partition based on 
consumption in partition management feature.

Currently there are three strategies which could release partition:
 * partition release based on consumers confirmation via network
 * partition release based on JM notification
 * partition release when disconnection between TM/JM

For the first strategy (partition release based on consumers confirmation): 
 * We could define the network message as `ReleasePartition` instead of current 
`Cancel/ClosePartition`. Then it might not care about whether the consumer 
finishes/fails during consumption. The precondition for this way is reliable 
network notification, but actually we have no ack mechanism for such message in 
application layer. Even if consumer task fails before establishing the 
connection with producer, we still need rely on JM notification of releasing 
partitions of producers.
 * We could not provide specific semantic as now, and the current strategy is 
only coupling with existing mechanism in network stack.
 * The semantic actually could be defined clearly as partition release based on 
one successful consumption. And considering the implementation it could be done 
by both consumer notification and JM notification.

In general I think we should consider how to define different release 
strategies which should provide specific semantics, and not caring about 
implementations when thinking about strategy. Actually any strategy could be 
implemented in multiple ways. E.g. the semantics might be divided into at-least 
once consumption, exactly-once consumption and at-most once consumption. After 
we confirm the specific semantics, then we would know how to refactor the 
current network stack considering implementation for certain strategy.

It might need worth further re-architecture the partition release strategy in 
release-1.10, because the feature of interactive queries is difficult to expand 
another strategy based on current architecture. In order not to block current 
release, the existing modifications could solve the file leak issue I think.

> Network stack is leaking files
> ------------------------------
>
>                 Key: FLINK-13245
>                 URL: https://issues.apache.org/jira/browse/FLINK-13245
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Network
>    Affects Versions: 1.9.0
>            Reporter: Chesnay Schepler
>            Assignee: zhijiang
>            Priority: Blocker
>              Labels: pull-request-available
>             Fix For: 1.9.0
>
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> There's file leak in the network stack / shuffle service.
> When running the {{SlotCountExceedingParallelismTest}} on Windows a large 
> number of {{.channel}} files continue to reside in a 
> {{flink-netty-shuffle-XXX}} directory.
> From what I've gathered so far these files are still being used by a 
> {{BoundedBlockingSubpartition}}. The cleanup logic in this class uses 
> ref-counting to ensure we don't release data while a reader is still present. 
> However, at the end of the job this count has not reached 0, and thus nothing 
> is being released.
> The same issue is also present on the {{ResultPartition}} level; the 
> {{ReleaseOnConsumptionResultPartition}} also are being released while the 
> ref-count is greater than 0.
> Overall it appears like there's some issue with the notifications for 
> partitions being consumed.
> It is feasible that this issue has recently caused issues on Travis where the 
> build were failing due to a lack of disk space.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to