[ https://issues.apache.org/jira/browse/FLINK-2296?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14611715#comment-14611715 ]
Stephan Ewen commented on FLINK-2296: ------------------------------------- BTW: Are you talking about the actual state checkpointing, or about the commit notifications? > Checkpoint committing broken > ---------------------------- > > Key: FLINK-2296 > URL: https://issues.apache.org/jira/browse/FLINK-2296 > Project: Flink > Issue Type: Bug > Components: Streaming > Affects Versions: 0.10 > Reporter: Robert Metzger > Assignee: Robert Metzger > Priority: Blocker > > While working on fixing the failing {{PersistentKafkaSource}} test, I > realized that the recent changes introduced in "New operator state > interfaces" https://github.com/apache/flink/pull/747 (sadly, there is no JIRA > for this huge change) introduced some changes that I was not aware of. > * The {{CheckpointCoordinator}} is now sending the StateHandle back to the > TaskManager when confirming a checkpoint. For the non-FS case, this means > that for checkpoint committed operators, the state is send twice over the > wire for each checkpoint. > For the FS case, this means that for every checkpoint commit, the state needs > to be retrieved from the file system. > Did you conduct any tests on a cluster to measure the performance impact of > that? > I see three approaches for fixing the aforementioned issue: > - keep it this way (probably poor performance) > - always keep the state for uncommitted checkpoints in the TaskManager's > memory. Therefore, we need to come up with a good eviction strategy. I don't > know the implications for large state. > - change the interface and do not provide the state to the user function > (=old behavior). This forces users to think about how they want to keep the > state (but it is also a bit more work for them) > I would like to get some feedback on how to solve this issue! > Also, I discovered the following bugs: > * Non-source tasks didn't get {{commitCheckpoint}} calls, even though they > implemented the {{CheckpointCommitter}} interface. I fixed this issue in my > current branch. > * The state passed to the {{commitCheckpoint}} method did not match with the > subtask id. So user functions were receiving states from other parallel > instances. This lead to faulty behavior in the KafkaSource (thats also the > reason why the KafkaITCase was failing more frequently ...). I fixed this > issue in my current branch. -- This message was sent by Atlassian JIRA (v6.3.4#6332)