[jira] [Commented] (FLINK-2004) Memory leak in presence of failed checkpoints in KafkaSource

2015-06-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2004?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14567144#comment-14567144
 ] 

ASF GitHub Bot commented on FLINK-2004:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/674#issuecomment-107392647
  
Looks good!

Will merge this..


 Memory leak in presence of failed checkpoints in KafkaSource
 

 Key: FLINK-2004
 URL: https://issues.apache.org/jira/browse/FLINK-2004
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Affects Versions: 0.9
Reporter: Stephan Ewen
Assignee: Robert Metzger
Priority: Critical
 Fix For: 0.9


 Checkpoints that fail never send a commit message to the tasks.
 Maintaining a map of all pending checkpoints introduces a memory leak, as 
 entries for failed checkpoints will never be removed.
 Approaches to fix this:
   - The source cleans up entries from older checkpoints once a checkpoint is 
 committed (simple implementation in a linked hash map)
   - The commit message could include the optional state handle (source needs 
 not maintain the map)
   - The checkpoint coordinator could send messages for failed checkpoints?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2004) Memory leak in presence of failed checkpoints in KafkaSource

2015-06-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2004?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14567475#comment-14567475
 ] 

ASF GitHub Bot commented on FLINK-2004:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/674


 Memory leak in presence of failed checkpoints in KafkaSource
 

 Key: FLINK-2004
 URL: https://issues.apache.org/jira/browse/FLINK-2004
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Affects Versions: 0.9
Reporter: Stephan Ewen
Assignee: Robert Metzger
Priority: Critical
 Fix For: 0.9


 Checkpoints that fail never send a commit message to the tasks.
 Maintaining a map of all pending checkpoints introduces a memory leak, as 
 entries for failed checkpoints will never be removed.
 Approaches to fix this:
   - The source cleans up entries from older checkpoints once a checkpoint is 
 committed (simple implementation in a linked hash map)
   - The commit message could include the optional state handle (source needs 
 not maintain the map)
   - The checkpoint coordinator could send messages for failed checkpoints?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2004) Memory leak in presence of failed checkpoints in KafkaSource

2015-05-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2004?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14560713#comment-14560713
 ] 

ASF GitHub Bot commented on FLINK-2004:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/674#issuecomment-105846281
  
What is the status of this pull request? Any pending changes, or is it 
blocked by anything?


 Memory leak in presence of failed checkpoints in KafkaSource
 

 Key: FLINK-2004
 URL: https://issues.apache.org/jira/browse/FLINK-2004
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Affects Versions: 0.9
Reporter: Stephan Ewen
Assignee: Robert Metzger
Priority: Critical
 Fix For: 0.9


 Checkpoints that fail never send a commit message to the tasks.
 Maintaining a map of all pending checkpoints introduces a memory leak, as 
 entries for failed checkpoints will never be removed.
 Approaches to fix this:
   - The source cleans up entries from older checkpoints once a checkpoint is 
 committed (simple implementation in a linked hash map)
   - The commit message could include the optional state handle (source needs 
 not maintain the map)
   - The checkpoint coordinator could send messages for failed checkpoints?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2004) Memory leak in presence of failed checkpoints in KafkaSource

2015-05-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2004?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14561122#comment-14561122
 ] 

ASF GitHub Bot commented on FLINK-2004:
---

Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/674#issuecomment-105951860
  
I have those unit test style tests all in the KafkaITCase because they 
depend on the testing clusters started for the test.
They all need at least a running Zookeeper instance.
I can of course put them into a different class, but this will further slow 
down our tests because we spend more time starting and stopping zookeeper.

I've reworked the restore to reflect the open() / restoreState() order.
The PR has been updated.


 Memory leak in presence of failed checkpoints in KafkaSource
 

 Key: FLINK-2004
 URL: https://issues.apache.org/jira/browse/FLINK-2004
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Affects Versions: 0.9
Reporter: Stephan Ewen
Assignee: Robert Metzger
Priority: Critical
 Fix For: 0.9


 Checkpoints that fail never send a commit message to the tasks.
 Maintaining a map of all pending checkpoints introduces a memory leak, as 
 entries for failed checkpoints will never be removed.
 Approaches to fix this:
   - The source cleans up entries from older checkpoints once a checkpoint is 
 committed (simple implementation in a linked hash map)
   - The commit message could include the optional state handle (source needs 
 not maintain the map)
   - The checkpoint coordinator could send messages for failed checkpoints?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2004) Memory leak in presence of failed checkpoints in KafkaSource

2015-05-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2004?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14561037#comment-14561037
 ] 

ASF GitHub Bot commented on FLINK-2004:
---

Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/674#issuecomment-105926083
  
Sorry for not reacting. I was busy with other stuff but I'll now address 
your comments.


 Memory leak in presence of failed checkpoints in KafkaSource
 

 Key: FLINK-2004
 URL: https://issues.apache.org/jira/browse/FLINK-2004
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Affects Versions: 0.9
Reporter: Stephan Ewen
Assignee: Robert Metzger
Priority: Critical
 Fix For: 0.9


 Checkpoints that fail never send a commit message to the tasks.
 Maintaining a map of all pending checkpoints introduces a memory leak, as 
 entries for failed checkpoints will never be removed.
 Approaches to fix this:
   - The source cleans up entries from older checkpoints once a checkpoint is 
 committed (simple implementation in a linked hash map)
   - The commit message could include the optional state handle (source needs 
 not maintain the map)
   - The checkpoint coordinator could send messages for failed checkpoints?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2004) Memory leak in presence of failed checkpoints in KafkaSource

2015-05-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2004?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553987#comment-14553987
 ] 

ASF GitHub Bot commented on FLINK-2004:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/674#discussion_r30787124
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
 ---
@@ -236,13 +246,22 @@ public void restoreState(long[] state) {
@Override
public void commitCheckpoint(long checkpointId) {
LOG.info(Commit checkpoint {}, checkpointId);
-   long[] checkpointOffsets = 
pendingCheckpoints.remove(checkpointId);
-   if(checkpointOffsets == null) {
+   final int posInMap = pendingCheckpoints.indexOf(checkpointId);
+   if(posInMap == -1) {
LOG.warn(Unable to find pending checkpoint for id {}, 
checkpointId);
return;
}
+
+   long[] checkpointOffsets = (long[]) 
pendingCheckpoints.remove(posInMap);
LOG.info(Got corresponding offsets {}, 
Arrays.toString(checkpointOffsets));
--- End diff --

I think this log message can be improved. To something like Committing 
offsets {} to Kafka.


 Memory leak in presence of failed checkpoints in KafkaSource
 

 Key: FLINK-2004
 URL: https://issues.apache.org/jira/browse/FLINK-2004
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Affects Versions: 0.9
Reporter: Stephan Ewen
Assignee: Robert Metzger
Priority: Critical
 Fix For: 0.9


 Checkpoints that fail never send a commit message to the tasks.
 Maintaining a map of all pending checkpoints introduces a memory leak, as 
 entries for failed checkpoints will never be removed.
 Approaches to fix this:
   - The source cleans up entries from older checkpoints once a checkpoint is 
 committed (simple implementation in a linked hash map)
   - The commit message could include the optional state handle (source needs 
 not maintain the map)
   - The checkpoint coordinator could send messages for failed checkpoints?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2004) Memory leak in presence of failed checkpoints in KafkaSource

2015-05-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2004?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553994#comment-14553994
 ] 

ASF GitHub Bot commented on FLINK-2004:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/674#discussion_r30787324
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
 ---
@@ -225,7 +226,16 @@ public void close() {
 
@Override
public void restoreState(long[] state) {
-   // we maintain the offsets in Kafka, so nothing to do.
+   if(lastOffsets == null) {
+   LOG.warn(Restore state called before open() has been 
called);
+   return;
+   }
+   LOG.info(Restoring state to {}, Arrays.toString(state));
+   if(lastOffsets.length != state.length) {
--- End diff --

How about sanity checking before logging that things are going to happen? 
Usually gives better logging insights...


 Memory leak in presence of failed checkpoints in KafkaSource
 

 Key: FLINK-2004
 URL: https://issues.apache.org/jira/browse/FLINK-2004
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Affects Versions: 0.9
Reporter: Stephan Ewen
Assignee: Robert Metzger
Priority: Critical
 Fix For: 0.9


 Checkpoints that fail never send a commit message to the tasks.
 Maintaining a map of all pending checkpoints introduces a memory leak, as 
 entries for failed checkpoints will never be removed.
 Approaches to fix this:
   - The source cleans up entries from older checkpoints once a checkpoint is 
 committed (simple implementation in a linked hash map)
   - The commit message could include the optional state handle (source needs 
 not maintain the map)
   - The checkpoint coordinator could send messages for failed checkpoints?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2004) Memory leak in presence of failed checkpoints in KafkaSource

2015-05-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2004?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553992#comment-14553992
 ] 

ASF GitHub Bot commented on FLINK-2004:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/674#issuecomment-104206104
  
Looks good. Some of the tests look like unit-style tests that are sort of 
lightweight.

May be good to pull those out in to a `PersistentKafkaSourceTest`-

Otherwise, +1 to merge


 Memory leak in presence of failed checkpoints in KafkaSource
 

 Key: FLINK-2004
 URL: https://issues.apache.org/jira/browse/FLINK-2004
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Affects Versions: 0.9
Reporter: Stephan Ewen
Assignee: Robert Metzger
Priority: Critical
 Fix For: 0.9


 Checkpoints that fail never send a commit message to the tasks.
 Maintaining a map of all pending checkpoints introduces a memory leak, as 
 entries for failed checkpoints will never be removed.
 Approaches to fix this:
   - The source cleans up entries from older checkpoints once a checkpoint is 
 committed (simple implementation in a linked hash map)
   - The commit message could include the optional state handle (source needs 
 not maintain the map)
   - The checkpoint coordinator could send messages for failed checkpoints?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2004) Memory leak in presence of failed checkpoints in KafkaSource

2015-05-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2004?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14543782#comment-14543782
 ] 

ASF GitHub Bot commented on FLINK-2004:
---

GitHub user rmetzger opened a pull request:

https://github.com/apache/flink/pull/674

[FLINK-2004] Fix memory leak in presense of failed checkpoints for Kafka 
Source



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rmetzger/flink flink2004

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/674.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #674


commit 27f11822b7db2716f3484def8ad350eb7e0b0893
Author: Robert Metzger rmetz...@apache.org
Date:   2015-05-14T09:45:30Z

[FLINK-2004] Fix memory leak in presense of failed checkpoints in Kafka 
source

commit 36cb4758c200713a97858989ac73f117186ed9dc
Author: Robert Metzger rmetz...@apache.org
Date:   2015-05-14T13:57:18Z

unused imports




 Memory leak in presence of failed checkpoints in KafkaSource
 

 Key: FLINK-2004
 URL: https://issues.apache.org/jira/browse/FLINK-2004
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Affects Versions: 0.9
Reporter: Stephan Ewen
Assignee: Robert Metzger
Priority: Critical
 Fix For: 0.9


 Checkpoints that fail never send a commit message to the tasks.
 Maintaining a map of all pending checkpoints introduces a memory leak, as 
 entries for failed checkpoints will never be removed.
 Approaches to fix this:
   - The source cleans up entries from older checkpoints once a checkpoint is 
 committed (simple implementation in a linked hash map)
   - The commit message could include the optional state handle (source needs 
 not maintain the map)
   - The checkpoint coordinator could send messages for failed checkpoints?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)