[ 
https://issues.apache.org/jira/browse/FLINK-2296?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14608364#comment-14608364
 ] 

Gyula Fora commented on FLINK-2296:
-----------------------------------

+1

> Checkpoint committing broken
> ----------------------------
>
>                 Key: FLINK-2296
>                 URL: https://issues.apache.org/jira/browse/FLINK-2296
>             Project: Flink
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 0.10
>            Reporter: Robert Metzger
>            Assignee: Robert Metzger
>            Priority: Blocker
>
> While working on fixing the failing {{PersistentKafkaSource}} test, I 
> realized that the recent changes introduced in "New operator state 
> interfaces" https://github.com/apache/flink/pull/747 (sadly, there is no JIRA 
> for this huge change) introduced some changes that I was not aware of.
> * The {{CheckpointCoordinator}} is now sending the StateHandle back to the 
> TaskManager when confirming a checkpoint. For the non-FS case, this means 
> that for checkpoint committed operators, the state is send twice over the 
> wire for each checkpoint. 
> For the FS case, this means that for every checkpoint commit, the state needs 
> to be retrieved from the file system.
> Did you conduct any tests on a cluster to measure the performance impact of 
> that?
> I see three approaches for fixing the aforementioned issue:
> - keep it this way (probably poor performance)
> - always keep the state for uncommitted checkpoints in the TaskManager's 
> memory. Therefore, we need to come up with a good eviction strategy. I don't 
> know the implications for large state.
> - change the interface and do not provide the state to the user function 
> (=old behavior). This forces users to think about how they want to keep the 
> state (but it is also a bit more work for them)
> I would like to get some feedback on how to solve this issue!
> Also, I discovered the following bugs:
> * Non-source tasks didn't get {{commitCheckpoint}} calls, even though they 
> implemented the {{CheckpointCommitter}} interface. I fixed this issue in my 
> current branch.
> * The state passed to the {{commitCheckpoint}} method did not match with the 
> subtask id. So user functions were receiving states from other parallel 
> instances. This lead to faulty behavior in the KafkaSource (thats also the 
> reason why the KafkaITCase was failing more frequently ...). I fixed this 
> issue in my current branch.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to