[jira] [Commented] (FLINK-2004) Memory leak in presence of failed checkpoints in KafkaSource
[ https://issues.apache.org/jira/browse/FLINK-2004?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14567144#comment-14567144 ] ASF GitHub Bot commented on FLINK-2004: --- Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/674#issuecomment-107392647 Looks good! Will merge this.. Memory leak in presence of failed checkpoints in KafkaSource Key: FLINK-2004 URL: https://issues.apache.org/jira/browse/FLINK-2004 Project: Flink Issue Type: Bug Components: Streaming Affects Versions: 0.9 Reporter: Stephan Ewen Assignee: Robert Metzger Priority: Critical Fix For: 0.9 Checkpoints that fail never send a commit message to the tasks. Maintaining a map of all pending checkpoints introduces a memory leak, as entries for failed checkpoints will never be removed. Approaches to fix this: - The source cleans up entries from older checkpoints once a checkpoint is committed (simple implementation in a linked hash map) - The commit message could include the optional state handle (source needs not maintain the map) - The checkpoint coordinator could send messages for failed checkpoints? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2004) Memory leak in presence of failed checkpoints in KafkaSource
[ https://issues.apache.org/jira/browse/FLINK-2004?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14567475#comment-14567475 ] ASF GitHub Bot commented on FLINK-2004: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/674 Memory leak in presence of failed checkpoints in KafkaSource Key: FLINK-2004 URL: https://issues.apache.org/jira/browse/FLINK-2004 Project: Flink Issue Type: Bug Components: Streaming Affects Versions: 0.9 Reporter: Stephan Ewen Assignee: Robert Metzger Priority: Critical Fix For: 0.9 Checkpoints that fail never send a commit message to the tasks. Maintaining a map of all pending checkpoints introduces a memory leak, as entries for failed checkpoints will never be removed. Approaches to fix this: - The source cleans up entries from older checkpoints once a checkpoint is committed (simple implementation in a linked hash map) - The commit message could include the optional state handle (source needs not maintain the map) - The checkpoint coordinator could send messages for failed checkpoints? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2004) Memory leak in presence of failed checkpoints in KafkaSource
[ https://issues.apache.org/jira/browse/FLINK-2004?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14560713#comment-14560713 ] ASF GitHub Bot commented on FLINK-2004: --- Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/674#issuecomment-105846281 What is the status of this pull request? Any pending changes, or is it blocked by anything? Memory leak in presence of failed checkpoints in KafkaSource Key: FLINK-2004 URL: https://issues.apache.org/jira/browse/FLINK-2004 Project: Flink Issue Type: Bug Components: Streaming Affects Versions: 0.9 Reporter: Stephan Ewen Assignee: Robert Metzger Priority: Critical Fix For: 0.9 Checkpoints that fail never send a commit message to the tasks. Maintaining a map of all pending checkpoints introduces a memory leak, as entries for failed checkpoints will never be removed. Approaches to fix this: - The source cleans up entries from older checkpoints once a checkpoint is committed (simple implementation in a linked hash map) - The commit message could include the optional state handle (source needs not maintain the map) - The checkpoint coordinator could send messages for failed checkpoints? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2004) Memory leak in presence of failed checkpoints in KafkaSource
[ https://issues.apache.org/jira/browse/FLINK-2004?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14561122#comment-14561122 ] ASF GitHub Bot commented on FLINK-2004: --- Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/674#issuecomment-105951860 I have those unit test style tests all in the KafkaITCase because they depend on the testing clusters started for the test. They all need at least a running Zookeeper instance. I can of course put them into a different class, but this will further slow down our tests because we spend more time starting and stopping zookeeper. I've reworked the restore to reflect the open() / restoreState() order. The PR has been updated. Memory leak in presence of failed checkpoints in KafkaSource Key: FLINK-2004 URL: https://issues.apache.org/jira/browse/FLINK-2004 Project: Flink Issue Type: Bug Components: Streaming Affects Versions: 0.9 Reporter: Stephan Ewen Assignee: Robert Metzger Priority: Critical Fix For: 0.9 Checkpoints that fail never send a commit message to the tasks. Maintaining a map of all pending checkpoints introduces a memory leak, as entries for failed checkpoints will never be removed. Approaches to fix this: - The source cleans up entries from older checkpoints once a checkpoint is committed (simple implementation in a linked hash map) - The commit message could include the optional state handle (source needs not maintain the map) - The checkpoint coordinator could send messages for failed checkpoints? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2004) Memory leak in presence of failed checkpoints in KafkaSource
[ https://issues.apache.org/jira/browse/FLINK-2004?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14561037#comment-14561037 ] ASF GitHub Bot commented on FLINK-2004: --- Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/674#issuecomment-105926083 Sorry for not reacting. I was busy with other stuff but I'll now address your comments. Memory leak in presence of failed checkpoints in KafkaSource Key: FLINK-2004 URL: https://issues.apache.org/jira/browse/FLINK-2004 Project: Flink Issue Type: Bug Components: Streaming Affects Versions: 0.9 Reporter: Stephan Ewen Assignee: Robert Metzger Priority: Critical Fix For: 0.9 Checkpoints that fail never send a commit message to the tasks. Maintaining a map of all pending checkpoints introduces a memory leak, as entries for failed checkpoints will never be removed. Approaches to fix this: - The source cleans up entries from older checkpoints once a checkpoint is committed (simple implementation in a linked hash map) - The commit message could include the optional state handle (source needs not maintain the map) - The checkpoint coordinator could send messages for failed checkpoints? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2004) Memory leak in presence of failed checkpoints in KafkaSource
[ https://issues.apache.org/jira/browse/FLINK-2004?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553987#comment-14553987 ] ASF GitHub Bot commented on FLINK-2004: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/674#discussion_r30787124 --- Diff: flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java --- @@ -236,13 +246,22 @@ public void restoreState(long[] state) { @Override public void commitCheckpoint(long checkpointId) { LOG.info(Commit checkpoint {}, checkpointId); - long[] checkpointOffsets = pendingCheckpoints.remove(checkpointId); - if(checkpointOffsets == null) { + final int posInMap = pendingCheckpoints.indexOf(checkpointId); + if(posInMap == -1) { LOG.warn(Unable to find pending checkpoint for id {}, checkpointId); return; } + + long[] checkpointOffsets = (long[]) pendingCheckpoints.remove(posInMap); LOG.info(Got corresponding offsets {}, Arrays.toString(checkpointOffsets)); --- End diff -- I think this log message can be improved. To something like Committing offsets {} to Kafka. Memory leak in presence of failed checkpoints in KafkaSource Key: FLINK-2004 URL: https://issues.apache.org/jira/browse/FLINK-2004 Project: Flink Issue Type: Bug Components: Streaming Affects Versions: 0.9 Reporter: Stephan Ewen Assignee: Robert Metzger Priority: Critical Fix For: 0.9 Checkpoints that fail never send a commit message to the tasks. Maintaining a map of all pending checkpoints introduces a memory leak, as entries for failed checkpoints will never be removed. Approaches to fix this: - The source cleans up entries from older checkpoints once a checkpoint is committed (simple implementation in a linked hash map) - The commit message could include the optional state handle (source needs not maintain the map) - The checkpoint coordinator could send messages for failed checkpoints? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2004) Memory leak in presence of failed checkpoints in KafkaSource
[ https://issues.apache.org/jira/browse/FLINK-2004?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553994#comment-14553994 ] ASF GitHub Bot commented on FLINK-2004: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/674#discussion_r30787324 --- Diff: flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java --- @@ -225,7 +226,16 @@ public void close() { @Override public void restoreState(long[] state) { - // we maintain the offsets in Kafka, so nothing to do. + if(lastOffsets == null) { + LOG.warn(Restore state called before open() has been called); + return; + } + LOG.info(Restoring state to {}, Arrays.toString(state)); + if(lastOffsets.length != state.length) { --- End diff -- How about sanity checking before logging that things are going to happen? Usually gives better logging insights... Memory leak in presence of failed checkpoints in KafkaSource Key: FLINK-2004 URL: https://issues.apache.org/jira/browse/FLINK-2004 Project: Flink Issue Type: Bug Components: Streaming Affects Versions: 0.9 Reporter: Stephan Ewen Assignee: Robert Metzger Priority: Critical Fix For: 0.9 Checkpoints that fail never send a commit message to the tasks. Maintaining a map of all pending checkpoints introduces a memory leak, as entries for failed checkpoints will never be removed. Approaches to fix this: - The source cleans up entries from older checkpoints once a checkpoint is committed (simple implementation in a linked hash map) - The commit message could include the optional state handle (source needs not maintain the map) - The checkpoint coordinator could send messages for failed checkpoints? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2004) Memory leak in presence of failed checkpoints in KafkaSource
[ https://issues.apache.org/jira/browse/FLINK-2004?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553992#comment-14553992 ] ASF GitHub Bot commented on FLINK-2004: --- Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/674#issuecomment-104206104 Looks good. Some of the tests look like unit-style tests that are sort of lightweight. May be good to pull those out in to a `PersistentKafkaSourceTest`- Otherwise, +1 to merge Memory leak in presence of failed checkpoints in KafkaSource Key: FLINK-2004 URL: https://issues.apache.org/jira/browse/FLINK-2004 Project: Flink Issue Type: Bug Components: Streaming Affects Versions: 0.9 Reporter: Stephan Ewen Assignee: Robert Metzger Priority: Critical Fix For: 0.9 Checkpoints that fail never send a commit message to the tasks. Maintaining a map of all pending checkpoints introduces a memory leak, as entries for failed checkpoints will never be removed. Approaches to fix this: - The source cleans up entries from older checkpoints once a checkpoint is committed (simple implementation in a linked hash map) - The commit message could include the optional state handle (source needs not maintain the map) - The checkpoint coordinator could send messages for failed checkpoints? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2004) Memory leak in presence of failed checkpoints in KafkaSource
[ https://issues.apache.org/jira/browse/FLINK-2004?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14543782#comment-14543782 ] ASF GitHub Bot commented on FLINK-2004: --- GitHub user rmetzger opened a pull request: https://github.com/apache/flink/pull/674 [FLINK-2004] Fix memory leak in presense of failed checkpoints for Kafka Source You can merge this pull request into a Git repository by running: $ git pull https://github.com/rmetzger/flink flink2004 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/674.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #674 commit 27f11822b7db2716f3484def8ad350eb7e0b0893 Author: Robert Metzger rmetz...@apache.org Date: 2015-05-14T09:45:30Z [FLINK-2004] Fix memory leak in presense of failed checkpoints in Kafka source commit 36cb4758c200713a97858989ac73f117186ed9dc Author: Robert Metzger rmetz...@apache.org Date: 2015-05-14T13:57:18Z unused imports Memory leak in presence of failed checkpoints in KafkaSource Key: FLINK-2004 URL: https://issues.apache.org/jira/browse/FLINK-2004 Project: Flink Issue Type: Bug Components: Streaming Affects Versions: 0.9 Reporter: Stephan Ewen Assignee: Robert Metzger Priority: Critical Fix For: 0.9 Checkpoints that fail never send a commit message to the tasks. Maintaining a map of all pending checkpoints introduces a memory leak, as entries for failed checkpoints will never be removed. Approaches to fix this: - The source cleans up entries from older checkpoints once a checkpoint is committed (simple implementation in a linked hash map) - The commit message could include the optional state handle (source needs not maintain the map) - The checkpoint coordinator could send messages for failed checkpoints? -- This message was sent by Atlassian JIRA (v6.3.4#6332)