[ https://issues.apache.org/jira/browse/FLINK-2004?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14553994#comment-14553994 ]
ASF GitHub Bot commented on FLINK-2004: --------------------------------------- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/674#discussion_r30787324 --- Diff: flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java --- @@ -225,7 +226,16 @@ public void close() { @Override public void restoreState(long[] state) { - // we maintain the offsets in Kafka, so nothing to do. + if(lastOffsets == null) { + LOG.warn("Restore state called before open() has been called"); + return; + } + LOG.info("Restoring state to {}", Arrays.toString(state)); + if(lastOffsets.length != state.length) { --- End diff -- How about sanity checking before logging that things are going to happen? Usually gives better logging insights... > Memory leak in presence of failed checkpoints in KafkaSource > ------------------------------------------------------------ > > Key: FLINK-2004 > URL: https://issues.apache.org/jira/browse/FLINK-2004 > Project: Flink > Issue Type: Bug > Components: Streaming > Affects Versions: 0.9 > Reporter: Stephan Ewen > Assignee: Robert Metzger > Priority: Critical > Fix For: 0.9 > > > Checkpoints that fail never send a commit message to the tasks. > Maintaining a map of all pending checkpoints introduces a memory leak, as > entries for failed checkpoints will never be removed. > Approaches to fix this: > - The source cleans up entries from older checkpoints once a checkpoint is > committed (simple implementation in a linked hash map) > - The commit message could include the optional state handle (source needs > not maintain the map) > - The checkpoint coordinator could send messages for failed checkpoints? -- This message was sent by Atlassian JIRA (v6.3.4#6332)