[jira] [Commented] (FLINK-2329) Refactor RPCs from within the ExecutionGraph
[ https://issues.apache.org/jira/browse/FLINK-2329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620642#comment-14620642 ] ASF GitHub Bot commented on FLINK-2329: --- Github user mxm commented on the pull request: https://github.com/apache/flink/pull/893#issuecomment-120018025 Very sensible change in anticipation of the upcoming high availability changes. +1 Refactor RPCs from within the ExecutionGraph Key: FLINK-2329 URL: https://issues.apache.org/jira/browse/FLINK-2329 Project: Flink Issue Type: Sub-task Components: JobManager, TaskManager Reporter: Till Rohrmann Assignee: Till Rohrmann Fix For: 0.10 Currently, we store an {{ActorRef}} of the TaskManager into an {{Instance}} object. This {{ActorRef}} is used from within {{Executions}} to interact with the {{TaskManager}}. This is not a nice abstraction since it does not hide implementation details. Since we need to add a leader session ID to messages sent by the {{Executions}} in order to support high availability, we would need to make the leader session ID available to the {{Execution}}. A better solution seems to be to replace the direct {{ActorRef}} interaction with an instance gateway abstraction which encapsulates the communication logic. Having such an abstraction, it will be easy to decorate messages transparently with a leader session ID. Therefore, I propose to refactor the current {{Instance}} communication and to introduce an {{InstanceGateway}} abstraction. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [wip] [FLINK-2288] [FLINK-2302] Setup ZooKeepe...
Github user mxm commented on the pull request: https://github.com/apache/flink/pull/886#issuecomment-119988165 Did we find a solution for the random port problem? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2288) Setup ZooKeeper for distributed coordination
[ https://issues.apache.org/jira/browse/FLINK-2288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620568#comment-14620568 ] ASF GitHub Bot commented on FLINK-2288: --- Github user mxm commented on the pull request: https://github.com/apache/flink/pull/886#issuecomment-12117 Yes that makes sense. So the user will always have to connect to the web interface of the leading job manager, right? We could only circumvent that by separating the web interface from the job manager. Setup ZooKeeper for distributed coordination Key: FLINK-2288 URL: https://issues.apache.org/jira/browse/FLINK-2288 Project: Flink Issue Type: Sub-task Components: JobManager, TaskManager Reporter: Ufuk Celebi Assignee: Ufuk Celebi Fix For: 0.10 Having standby JM instances for job manager high availabilty requires distributed coordination between JM, TM, and clients. For this, we will use ZooKeeper (ZK). Pros: - Proven solution (other projects use it for this as well) - Apache TLP with large community, docs, and library with required recipies like leader election (see below) Related Wiki: https://cwiki.apache.org/confluence/display/FLINK/JobManager+High+Availability -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2339) Prevent asynchronous checkpoint calls from overtaking each other
[ https://issues.apache.org/jira/browse/FLINK-2339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620625#comment-14620625 ] Stephan Ewen commented on FLINK-2339: - I have a patch coming up, currently testing on Travis... Prevent asynchronous checkpoint calls from overtaking each other Key: FLINK-2339 URL: https://issues.apache.org/jira/browse/FLINK-2339 Project: Flink Issue Type: Improvement Components: TaskManager Affects Versions: 0.10 Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 0.10 Currently, when checkpoint state materialization takes very long, and the checkpoint interval is low, the asynchronous calls to trigger checkpoints (on the sources) could overtake prior calls. We can fix that by making sure that all calls are dispatched in order by the same thread, rather than spawning a new thread for each call. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [wip] [FLINK-2288] [FLINK-2302] Setup ZooKeepe...
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/886#issuecomment-119996182 In HA mode, JobManagers start with a random free port. That is fine, because no one connects to them based on a config value, but only based on ZooKeeper entries. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2288) Setup ZooKeeper for distributed coordination
[ https://issues.apache.org/jira/browse/FLINK-2288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620572#comment-14620572 ] ASF GitHub Bot commented on FLINK-2288: --- Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/886#issuecomment-12744 The web interface is, modulo some object which are not serializable, already independent of the JobManager. It should not be a big problem to only have one web server which also retrieves the leading JobManager from ZooKeeper and then serves the information from the leader. On Thu, Jul 9, 2015 at 4:24 PM, Max notificati...@github.com wrote: Yes that makes sense. So the user will always have to connect to the web interface of the leading job manager, right? We could only circumvent that by separating the web interface from the job manager. — Reply to this email directly or view it on GitHub https://github.com/apache/flink/pull/886#issuecomment-12117. Setup ZooKeeper for distributed coordination Key: FLINK-2288 URL: https://issues.apache.org/jira/browse/FLINK-2288 Project: Flink Issue Type: Sub-task Components: JobManager, TaskManager Reporter: Ufuk Celebi Assignee: Ufuk Celebi Fix For: 0.10 Having standby JM instances for job manager high availabilty requires distributed coordination between JM, TM, and clients. For this, we will use ZooKeeper (ZK). Pros: - Proven solution (other projects use it for this as well) - Apache TLP with large community, docs, and library with required recipies like leader election (see below) Related Wiki: https://cwiki.apache.org/confluence/display/FLINK/JobManager+High+Availability -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [wip] [FLINK-2288] [FLINK-2302] Setup ZooKeepe...
Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/886#issuecomment-12744 The web interface is, modulo some object which are not serializable, already independent of the JobManager. It should not be a big problem to only have one web server which also retrieves the leading JobManager from ZooKeeper and then serves the information from the leader. On Thu, Jul 9, 2015 at 4:24 PM, Max notificati...@github.com wrote: Yes that makes sense. So the user will always have to connect to the web interface of the leading job manager, right? We could only circumvent that by separating the web interface from the job manager. â Reply to this email directly or view it on GitHub https://github.com/apache/flink/pull/886#issuecomment-12117. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/896#issuecomment-120029960 Looks like this change breaks the YARN integration. The YARN WordCount no longer works. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2310] Add an Adamic Adar Similarity exa...
Github user shghatge commented on the pull request: https://github.com/apache/flink/pull/892#issuecomment-120036933 Hello @vasia I would like to work on both versions of Adamic Adar. As the JIRA did not ask for an approximate version, it was suggested that I create another JIRA issue which will provide a library method for Adamic Adar which gives approximate solution with the use of bloom filters. I have a query about the bloom filters. Since bloom filters only tell us whether an element belongs to the set or not, if both the vertices have Bloom filters as value, how will we know what to emit? For Example for Vertex 3 '1,4,13' are set and for Vertex 5 '2,4,13' are set. Now when we use the method suggested by you using logical AND we find out the intersection of the Bloom Filters. After this do you suggest that we keep another hashtable that keeps track of a value-vertex relation? Or do we just emit 5,4,1/log(d3) and keep the hashtable as an identity map function? That would mean each vertex has n number of bits as value , where n is the number of vertices in the graph. I hope I was clear in my query. TL;DR We will have to use an identity hash function which implies that each vertex will need n bits of memory as value. Is it okay to use this much memory? If there is some other approach then please let me know. Bloom filters seem to be more useful in finding size of the int ersection or union but here we need to know which Vertices are common. The only other way that I can roughly imagine is that we get the hashed edges in a dataset, just like 5,4,1/(logd3)... Use the same hash function on all the graph edges. Then Join the datasets obtained over field 1 and 2. Please tell me if there is any other efficient way or which one of these two you would prefer? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/896#discussion_r34268803 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java --- @@ -212,19 +218,19 @@ public InputGate getInputGate(int index) { return inputGates; } - @Override - public void reportAccumulators(MapString, Accumulator?, ? accumulators) { - AccumulatorEvent evt; - try { - evt = new AccumulatorEvent(getJobID(), accumulators); - } - catch (IOException e) { - throw new RuntimeException(Cannot serialize accumulators to send them to JobManager, e); - } - - ReportAccumulatorResult accResult = new ReportAccumulatorResult(jobId, executionId, evt); - jobManagerActor.tell(accResult, ActorRef.noSender()); - } +// @Override --- End diff -- This can be properly removed, no? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2292) Report accumulators periodically while job is running
[ https://issues.apache.org/jira/browse/FLINK-2292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620697#comment-14620697 ] ASF GitHub Bot commented on FLINK-2292: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/896#discussion_r34268803 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java --- @@ -212,19 +218,19 @@ public InputGate getInputGate(int index) { return inputGates; } - @Override - public void reportAccumulators(MapString, Accumulator?, ? accumulators) { - AccumulatorEvent evt; - try { - evt = new AccumulatorEvent(getJobID(), accumulators); - } - catch (IOException e) { - throw new RuntimeException(Cannot serialize accumulators to send them to JobManager, e); - } - - ReportAccumulatorResult accResult = new ReportAccumulatorResult(jobId, executionId, evt); - jobManagerActor.tell(accResult, ActorRef.noSender()); - } +// @Override --- End diff -- This can be properly removed, no? Report accumulators periodically while job is running - Key: FLINK-2292 URL: https://issues.apache.org/jira/browse/FLINK-2292 Project: Flink Issue Type: Sub-task Components: JobManager, TaskManager Reporter: Maximilian Michels Assignee: Maximilian Michels Fix For: 0.10 Accumulators should be sent periodically, as part of the heartbeat that sends metrics. This allows them to be updated in real time. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2235] fix calculation of free memory fo...
Github user mxm commented on the pull request: https://github.com/apache/flink/pull/859#issuecomment-119987554 1/4 of the physical memory seems sensible and indeed returns almost the same memory size as the maximum memory setting on my machine. I've adapted the pull request. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2008) PersistentKafkaSource is sometimes emitting tuples multiple times
[ https://issues.apache.org/jira/browse/FLINK-2008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620613#comment-14620613 ] ASF GitHub Bot commented on FLINK-2008: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/895#discussion_r34262391 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java --- @@ -53,6 +62,8 @@ @SuppressWarnings(serial) public class StreamCheckpointingITCase { + private static final Logger LOG = LoggerFactory.getLogger(StreamCheckpointingITCase.class); --- End diff -- Before I decided to undo parts of the checkpointing changes (sending the full state back from the JM to the tasks), I had added an additional test there. I can delete the logger factory. PersistentKafkaSource is sometimes emitting tuples multiple times - Key: FLINK-2008 URL: https://issues.apache.org/jira/browse/FLINK-2008 Project: Flink Issue Type: Bug Components: Kafka Connector, Streaming Affects Versions: 0.9 Reporter: Robert Metzger Assignee: Robert Metzger The PersistentKafkaSource is expected to emit records exactly once. Two test cases of the KafkaITCase are sporadically failing because records are emitted multiple times. Affected tests: {{testPersistentSourceWithOffsetUpdates()}}, after the offsets have been changed manually in ZK: {code} java.lang.RuntimeException: Expected v to be 3, but was 4 on element 0 array=[4, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 2] {code} {{brokerFailureTest()}} also fails: {code} 05/13/2015 08:13:16 Custom source - Stream Sink(1/1) switched to FAILED java.lang.AssertionError: Received tuple with value 21 twice at org.junit.Assert.fail(Assert.java:88) at org.junit.Assert.assertTrue(Assert.java:41) at org.junit.Assert.assertFalse(Assert.java:64) at org.apache.flink.streaming.connectors.kafka.KafkaITCase$15.invoke(KafkaITCase.java:877) at org.apache.flink.streaming.connectors.kafka.KafkaITCase$15.invoke(KafkaITCase.java:859) at org.apache.flink.streaming.api.operators.StreamSink.callUserFunction(StreamSink.java:39) at org.apache.flink.streaming.api.operators.StreamOperator.callUserFunctionAndLogException(StreamOperator.java:137) at org.apache.flink.streaming.api.operators.ChainableStreamOperator.collect(ChainableStreamOperator.java:54) at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:39) at org.apache.flink.streaming.connectors.kafka.api.persistent.PersistentKafkaSource.run(PersistentKafkaSource.java:173) at org.apache.flink.streaming.api.operators.StreamSource.callUserFunction(StreamSource.java:40) at org.apache.flink.streaming.api.operators.StreamOperator.callUserFunctionAndLogException(StreamOperator.java:137) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:34) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:139) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) at java.lang.Thread.run(Thread.java:745) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2008][FLINK-2296] Fix checkpoint commit...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/895#discussion_r34262141 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java --- @@ -53,6 +62,8 @@ @SuppressWarnings(serial) public class StreamCheckpointingITCase { + private static final Logger LOG = LoggerFactory.getLogger(StreamCheckpointingITCase.class); --- End diff -- These changes here also seem accidentally committed --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2008) PersistentKafkaSource is sometimes emitting tuples multiple times
[ https://issues.apache.org/jira/browse/FLINK-2008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620610#comment-14620610 ] ASF GitHub Bot commented on FLINK-2008: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/895#discussion_r34262141 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java --- @@ -53,6 +62,8 @@ @SuppressWarnings(serial) public class StreamCheckpointingITCase { + private static final Logger LOG = LoggerFactory.getLogger(StreamCheckpointingITCase.class); --- End diff -- These changes here also seem accidentally committed PersistentKafkaSource is sometimes emitting tuples multiple times - Key: FLINK-2008 URL: https://issues.apache.org/jira/browse/FLINK-2008 Project: Flink Issue Type: Bug Components: Kafka Connector, Streaming Affects Versions: 0.9 Reporter: Robert Metzger Assignee: Robert Metzger The PersistentKafkaSource is expected to emit records exactly once. Two test cases of the KafkaITCase are sporadically failing because records are emitted multiple times. Affected tests: {{testPersistentSourceWithOffsetUpdates()}}, after the offsets have been changed manually in ZK: {code} java.lang.RuntimeException: Expected v to be 3, but was 4 on element 0 array=[4, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 2] {code} {{brokerFailureTest()}} also fails: {code} 05/13/2015 08:13:16 Custom source - Stream Sink(1/1) switched to FAILED java.lang.AssertionError: Received tuple with value 21 twice at org.junit.Assert.fail(Assert.java:88) at org.junit.Assert.assertTrue(Assert.java:41) at org.junit.Assert.assertFalse(Assert.java:64) at org.apache.flink.streaming.connectors.kafka.KafkaITCase$15.invoke(KafkaITCase.java:877) at org.apache.flink.streaming.connectors.kafka.KafkaITCase$15.invoke(KafkaITCase.java:859) at org.apache.flink.streaming.api.operators.StreamSink.callUserFunction(StreamSink.java:39) at org.apache.flink.streaming.api.operators.StreamOperator.callUserFunctionAndLogException(StreamOperator.java:137) at org.apache.flink.streaming.api.operators.ChainableStreamOperator.collect(ChainableStreamOperator.java:54) at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:39) at org.apache.flink.streaming.connectors.kafka.api.persistent.PersistentKafkaSource.run(PersistentKafkaSource.java:173) at org.apache.flink.streaming.api.operators.StreamSource.callUserFunction(StreamSource.java:40) at org.apache.flink.streaming.api.operators.StreamOperator.callUserFunctionAndLogException(StreamOperator.java:137) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:34) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:139) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) at java.lang.Thread.run(Thread.java:745) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2008) PersistentKafkaSource is sometimes emitting tuples multiple times
[ https://issues.apache.org/jira/browse/FLINK-2008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620609#comment-14620609 ] ASF GitHub Bot commented on FLINK-2008: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/895#discussion_r34262084 --- Diff: flink-tests/src/test/resources/log4j-test.properties --- @@ -18,7 +18,7 @@ # Set root logger level to OFF to not flood build logs # set manually to INFO for debugging purposes -log4j.rootLogger=OFF, testlogger +log4j.rootLogger=INFO, testlogger --- End diff -- was this accidentally committed? PersistentKafkaSource is sometimes emitting tuples multiple times - Key: FLINK-2008 URL: https://issues.apache.org/jira/browse/FLINK-2008 Project: Flink Issue Type: Bug Components: Kafka Connector, Streaming Affects Versions: 0.9 Reporter: Robert Metzger Assignee: Robert Metzger The PersistentKafkaSource is expected to emit records exactly once. Two test cases of the KafkaITCase are sporadically failing because records are emitted multiple times. Affected tests: {{testPersistentSourceWithOffsetUpdates()}}, after the offsets have been changed manually in ZK: {code} java.lang.RuntimeException: Expected v to be 3, but was 4 on element 0 array=[4, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 2] {code} {{brokerFailureTest()}} also fails: {code} 05/13/2015 08:13:16 Custom source - Stream Sink(1/1) switched to FAILED java.lang.AssertionError: Received tuple with value 21 twice at org.junit.Assert.fail(Assert.java:88) at org.junit.Assert.assertTrue(Assert.java:41) at org.junit.Assert.assertFalse(Assert.java:64) at org.apache.flink.streaming.connectors.kafka.KafkaITCase$15.invoke(KafkaITCase.java:877) at org.apache.flink.streaming.connectors.kafka.KafkaITCase$15.invoke(KafkaITCase.java:859) at org.apache.flink.streaming.api.operators.StreamSink.callUserFunction(StreamSink.java:39) at org.apache.flink.streaming.api.operators.StreamOperator.callUserFunctionAndLogException(StreamOperator.java:137) at org.apache.flink.streaming.api.operators.ChainableStreamOperator.collect(ChainableStreamOperator.java:54) at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:39) at org.apache.flink.streaming.connectors.kafka.api.persistent.PersistentKafkaSource.run(PersistentKafkaSource.java:173) at org.apache.flink.streaming.api.operators.StreamSource.callUserFunction(StreamSource.java:40) at org.apache.flink.streaming.api.operators.StreamOperator.callUserFunctionAndLogException(StreamOperator.java:137) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:34) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:139) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) at java.lang.Thread.run(Thread.java:745) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2008][FLINK-2296] Fix checkpoint commit...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/895#discussion_r34262145 --- Diff: flink-tests/src/test/resources/log4j-test.properties --- @@ -18,7 +18,7 @@ # Set root logger level to OFF to not flood build logs # set manually to INFO for debugging purposes -log4j.rootLogger=OFF, testlogger +log4j.rootLogger=INFO, testlogger --- End diff -- Yes --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2008) PersistentKafkaSource is sometimes emitting tuples multiple times
[ https://issues.apache.org/jira/browse/FLINK-2008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620611#comment-14620611 ] ASF GitHub Bot commented on FLINK-2008: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/895#discussion_r34262145 --- Diff: flink-tests/src/test/resources/log4j-test.properties --- @@ -18,7 +18,7 @@ # Set root logger level to OFF to not flood build logs # set manually to INFO for debugging purposes -log4j.rootLogger=OFF, testlogger +log4j.rootLogger=INFO, testlogger --- End diff -- Yes PersistentKafkaSource is sometimes emitting tuples multiple times - Key: FLINK-2008 URL: https://issues.apache.org/jira/browse/FLINK-2008 Project: Flink Issue Type: Bug Components: Kafka Connector, Streaming Affects Versions: 0.9 Reporter: Robert Metzger Assignee: Robert Metzger The PersistentKafkaSource is expected to emit records exactly once. Two test cases of the KafkaITCase are sporadically failing because records are emitted multiple times. Affected tests: {{testPersistentSourceWithOffsetUpdates()}}, after the offsets have been changed manually in ZK: {code} java.lang.RuntimeException: Expected v to be 3, but was 4 on element 0 array=[4, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 2] {code} {{brokerFailureTest()}} also fails: {code} 05/13/2015 08:13:16 Custom source - Stream Sink(1/1) switched to FAILED java.lang.AssertionError: Received tuple with value 21 twice at org.junit.Assert.fail(Assert.java:88) at org.junit.Assert.assertTrue(Assert.java:41) at org.junit.Assert.assertFalse(Assert.java:64) at org.apache.flink.streaming.connectors.kafka.KafkaITCase$15.invoke(KafkaITCase.java:877) at org.apache.flink.streaming.connectors.kafka.KafkaITCase$15.invoke(KafkaITCase.java:859) at org.apache.flink.streaming.api.operators.StreamSink.callUserFunction(StreamSink.java:39) at org.apache.flink.streaming.api.operators.StreamOperator.callUserFunctionAndLogException(StreamOperator.java:137) at org.apache.flink.streaming.api.operators.ChainableStreamOperator.collect(ChainableStreamOperator.java:54) at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:39) at org.apache.flink.streaming.connectors.kafka.api.persistent.PersistentKafkaSource.run(PersistentKafkaSource.java:173) at org.apache.flink.streaming.api.operators.StreamSource.callUserFunction(StreamSource.java:40) at org.apache.flink.streaming.api.operators.StreamOperator.callUserFunctionAndLogException(StreamOperator.java:137) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:34) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:139) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) at java.lang.Thread.run(Thread.java:745) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2288) Setup ZooKeeper for distributed coordination
[ https://issues.apache.org/jira/browse/FLINK-2288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620634#comment-14620634 ] ASF GitHub Bot commented on FLINK-2288: --- Github user mxm commented on the pull request: https://github.com/apache/flink/pull/886#issuecomment-120015233 Alright, I've opened a JIRA for this: https://issues.apache.org/jira/browse/FLINK-2340 Setup ZooKeeper for distributed coordination Key: FLINK-2288 URL: https://issues.apache.org/jira/browse/FLINK-2288 Project: Flink Issue Type: Sub-task Components: JobManager, TaskManager Reporter: Ufuk Celebi Assignee: Ufuk Celebi Fix For: 0.10 Having standby JM instances for job manager high availabilty requires distributed coordination between JM, TM, and clients. For this, we will use ZooKeeper (ZK). Pros: - Proven solution (other projects use it for this as well) - Apache TLP with large community, docs, and library with required recipies like leader election (see below) Related Wiki: https://cwiki.apache.org/confluence/display/FLINK/JobManager+High+Availability -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2292) Report accumulators periodically while job is running
[ https://issues.apache.org/jira/browse/FLINK-2292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620654#comment-14620654 ] ASF GitHub Bot commented on FLINK-2292: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/896#discussion_r34266800 --- Diff: flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java --- @@ -60,6 +60,7 @@ public static void main(String[] args) throws Exception { // set up the execution environment final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); --- End diff -- This must have been accidentally committed Report accumulators periodically while job is running - Key: FLINK-2292 URL: https://issues.apache.org/jira/browse/FLINK-2292 Project: Flink Issue Type: Sub-task Components: JobManager, TaskManager Reporter: Maximilian Michels Assignee: Maximilian Michels Fix For: 0.10 Accumulators should be sent periodically, as part of the heartbeat that sends metrics. This allows them to be updated in real time. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/896#discussion_r34266800 --- Diff: flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java --- @@ -60,6 +60,7 @@ public static void main(String[] args) throws Exception { // set up the execution environment final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); --- End diff -- This must have been accidentally committed --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2235) Local Flink cluster allocates too much memory
[ https://issues.apache.org/jira/browse/FLINK-2235?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620538#comment-14620538 ] ASF GitHub Bot commented on FLINK-2235: --- Github user mxm commented on the pull request: https://github.com/apache/flink/pull/859#issuecomment-119987554 1/4 of the physical memory seems sensible and indeed returns almost the same memory size as the maximum memory setting on my machine. I've adapted the pull request. Local Flink cluster allocates too much memory - Key: FLINK-2235 URL: https://issues.apache.org/jira/browse/FLINK-2235 Project: Flink Issue Type: Bug Components: Local Runtime, TaskManager Affects Versions: 0.9 Environment: Oracle JDK: 1.6.0_65-b14-462 Eclipse Reporter: Maximilian Michels Priority: Minor When executing a Flink job locally, the task manager gets initialized with an insane amount of memory. After a quick look in the code it seems that the call to {{EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag()}} returns a wrong estimate of the heap memory size. Moreover, the same user switched to Oracle JDK 1.8 and that made the error disappear. So I'm guessing this is some Java 1.6 quirk. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [wip] [FLINK-2288] [FLINK-2302] Setup ZooKeepe...
Github user mxm commented on the pull request: https://github.com/apache/flink/pull/886#issuecomment-12117 Yes that makes sense. So the user will always have to connect to the web interface of the leading job manager, right? We could only circumvent that by separating the web interface from the job manager. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2008][FLINK-2296] Fix checkpoint commit...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/895#discussion_r34262472 --- Diff: flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/Utils.java --- @@ -53,13 +62,22 @@ public boolean isEndOfStream(T nextElement) { @Override public byte[] serialize(T element) { - DataOutputSerializer dos = new DataOutputSerializer(16); + if(dos == null) { + dos = new DataOutputSerializer(1); --- End diff -- I think a starting size of `16` is nicer than one of `1` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-2340) Provide standalone mode for web interface of the JobManager
Maximilian Michels created FLINK-2340: - Summary: Provide standalone mode for web interface of the JobManager Key: FLINK-2340 URL: https://issues.apache.org/jira/browse/FLINK-2340 Project: Flink Issue Type: Improvement Components: JobManager Affects Versions: 0.10 Reporter: Maximilian Michels With the latest changes to enable high availability, the web interface's address may switch to one of the standby job manager nodes. As an enhancement, we could decouple the web interface and let it automatically retrieve data from the currently leading job manager. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2341) Deadlock in SpilledSubpartitionViewAsyncIO
Stephan Ewen created FLINK-2341: --- Summary: Deadlock in SpilledSubpartitionViewAsyncIO Key: FLINK-2341 URL: https://issues.apache.org/jira/browse/FLINK-2341 Project: Flink Issue Type: Bug Components: Distributed Runtime Affects Versions: 0.9, 0.10 Reporter: Stephan Ewen Assignee: Ufuk Celebi Priority: Critical Fix For: 0.9, 0.10 It may be that the deadlock is because of the way the {{SpilledSubpartitionViewTest}} is written {code} Found one Java-level deadlock: = pool-25-thread-2: waiting to lock monitor 0x7f66f4932468 (object 0xfa1478f0, a java.lang.Object), which is held by IOManager reader thread #1 IOManager reader thread #1: waiting to lock monitor 0x7f66f4931160 (object 0xfa029768, a java.lang.Object), which is held by pool-25-thread-2 Java stack information for the threads listed above: === pool-25-thread-2: at org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO.notifyError(SpilledSubpartitionViewAsyncIO.java:304) - waiting to lock 0xfa1478f0 (a java.lang.Object) at org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO.onAvailableBuffer(SpilledSubpartitionViewAsyncIO.java:256) at org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO.access$300(SpilledSubpartitionViewAsyncIO.java:42) at org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO$BufferProviderCallback.onEvent(SpilledSubpartitionViewAsyncIO.java:367) at org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO$BufferProviderCallback.onEvent(SpilledSubpartitionViewAsyncIO.java:353) at org.apache.flink.runtime.io.network.util.TestPooledBufferProvider$PooledBufferProviderRecycler.recycle(TestPooledBufferProvider.java:135) - locked 0xfa029768 (a java.lang.Object) at org.apache.flink.runtime.io.network.buffer.Buffer.recycle(Buffer.java:119) - locked 0xfa3a1a20 (a java.lang.Object) at org.apache.flink.runtime.io.network.util.TestSubpartitionConsumer.call(TestSubpartitionConsumer.java:95) at org.apache.flink.runtime.io.network.util.TestSubpartitionConsumer.call(TestSubpartitionConsumer.java:39) at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334) at java.util.concurrent.FutureTask.run(FutureTask.java:166) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1146) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:701) IOManager reader thread #1: at org.apache.flink.runtime.io.network.util.TestPooledBufferProvider$PooledBufferProviderRecycler.recycle(TestPooledBufferProvider.java:127) - waiting to lock 0xfa029768 (a java.lang.Object) at org.apache.flink.runtime.io.network.buffer.Buffer.recycle(Buffer.java:119) - locked 0xfa3a1ea0 (a java.lang.Object) at org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO.returnBufferFromIOThread(SpilledSubpartitionViewAsyncIO.java:270) - locked 0xfa1478f0 (a java.lang.Object) at org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO.access$100(SpilledSubpartitionViewAsyncIO.java:42) at org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO$IOThreadCallback.requestSuccessful(SpilledSubpartitionViewAsyncIO.java:338) at org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO$IOThreadCallback.requestSuccessful(SpilledSubpartitionViewAsyncIO.java:328) at org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.handleProcessedBuffer(AsynchronousFileIOChannel.java:199) at org.apache.flink.runtime.io.disk.iomanager.BufferReadRequest.requestDone(AsynchronousFileIOChannel.java:431) at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$ReaderThread.run(IOManagerAsync.java:377) {code} The full log with the deadlock stack traces can be found here: https://s3.amazonaws.com/archive.travis-ci.org/jobs/70232347/log.txt -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2008][FLINK-2296] Fix checkpoint commit...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/895#discussion_r34261966 --- Diff: docs/apis/streaming_guide.md --- @@ -1300,6 +1300,9 @@ Another way of exposing user defined operator state for the Flink runtime for ch When the user defined function implements the `Checkpointed` interface, the `snapshotState(â¦)` and `restoreState(â¦)` methods will be executed to draw and restore function state. +In addition to that, user functions can also implement the `CheckpointNotifier` interface to receive notifications on completed checkpoints via the `notifyCheckpointComplete(long checkpointId)` method. +Note that there is no guarantee for the user function to receive a notification once the checkpoint is complete. --- End diff -- Let's write it like Note that there is no guarantee for the user function to receive a notification if a failure happens between checkpoint completion and notification. The notifications should hence be treated in a way that notifications from later checkpoints can subsume missing notifications. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2008][FLINK-2296] Fix checkpoint commit...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/895#discussion_r34262084 --- Diff: flink-tests/src/test/resources/log4j-test.properties --- @@ -18,7 +18,7 @@ # Set root logger level to OFF to not flood build logs # set manually to INFO for debugging purposes -log4j.rootLogger=OFF, testlogger +log4j.rootLogger=INFO, testlogger --- End diff -- was this accidentally committed? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2008) PersistentKafkaSource is sometimes emitting tuples multiple times
[ https://issues.apache.org/jira/browse/FLINK-2008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620608#comment-14620608 ] ASF GitHub Bot commented on FLINK-2008: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/895#discussion_r34261966 --- Diff: docs/apis/streaming_guide.md --- @@ -1300,6 +1300,9 @@ Another way of exposing user defined operator state for the Flink runtime for ch When the user defined function implements the `Checkpointed` interface, the `snapshotState(…)` and `restoreState(…)` methods will be executed to draw and restore function state. +In addition to that, user functions can also implement the `CheckpointNotifier` interface to receive notifications on completed checkpoints via the `notifyCheckpointComplete(long checkpointId)` method. +Note that there is no guarantee for the user function to receive a notification once the checkpoint is complete. --- End diff -- Let's write it like Note that there is no guarantee for the user function to receive a notification if a failure happens between checkpoint completion and notification. The notifications should hence be treated in a way that notifications from later checkpoints can subsume missing notifications. PersistentKafkaSource is sometimes emitting tuples multiple times - Key: FLINK-2008 URL: https://issues.apache.org/jira/browse/FLINK-2008 Project: Flink Issue Type: Bug Components: Kafka Connector, Streaming Affects Versions: 0.9 Reporter: Robert Metzger Assignee: Robert Metzger The PersistentKafkaSource is expected to emit records exactly once. Two test cases of the KafkaITCase are sporadically failing because records are emitted multiple times. Affected tests: {{testPersistentSourceWithOffsetUpdates()}}, after the offsets have been changed manually in ZK: {code} java.lang.RuntimeException: Expected v to be 3, but was 4 on element 0 array=[4, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 2] {code} {{brokerFailureTest()}} also fails: {code} 05/13/2015 08:13:16 Custom source - Stream Sink(1/1) switched to FAILED java.lang.AssertionError: Received tuple with value 21 twice at org.junit.Assert.fail(Assert.java:88) at org.junit.Assert.assertTrue(Assert.java:41) at org.junit.Assert.assertFalse(Assert.java:64) at org.apache.flink.streaming.connectors.kafka.KafkaITCase$15.invoke(KafkaITCase.java:877) at org.apache.flink.streaming.connectors.kafka.KafkaITCase$15.invoke(KafkaITCase.java:859) at org.apache.flink.streaming.api.operators.StreamSink.callUserFunction(StreamSink.java:39) at org.apache.flink.streaming.api.operators.StreamOperator.callUserFunctionAndLogException(StreamOperator.java:137) at org.apache.flink.streaming.api.operators.ChainableStreamOperator.collect(ChainableStreamOperator.java:54) at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:39) at org.apache.flink.streaming.connectors.kafka.api.persistent.PersistentKafkaSource.run(PersistentKafkaSource.java:173) at org.apache.flink.streaming.api.operators.StreamSource.callUserFunction(StreamSource.java:40) at org.apache.flink.streaming.api.operators.StreamOperator.callUserFunctionAndLogException(StreamOperator.java:137) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:34) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:139) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) at java.lang.Thread.run(Thread.java:745) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2310] Add an Adamic Adar Similarity exa...
Github user shghatge commented on the pull request: https://github.com/apache/flink/pull/892#issuecomment-120033517 Hello @vasia I would like to work on both versions of Adamic Adar. As the JIRA did not ask for an approximate version, it was suggested that I create another JIRA issue which will provide a library method for Adamic Adar which gives approximate solution with the use of bloom filters. I have a query about the bloom filters. Since bloom filters only tell us whether an element belongs to the set or not, if both the vertices have Bloom filters as value, how will we know what to search for in the other set? For example. for Example for Vertex 3 '1,4,13' are set and for Vertex 5 '2,4,13' are set. Now when we use the method suggested by you, we will find that 4 and 13 are set for 5 too. Now what tuple should it emit? Do you suggest that we keep another hashtable that keeps track of a value-vertex relation? Or do we just emit 5,4,1/log(d3) and keep the hashtable as an identity map function? That would mean each vertex has n number of bits as value , where n is the number of vertices in the graph. I hope I was clear in my query. TL;DR We will have to use an identity hash function which implies that each vertex will need n bits of memory as value. Is it okay to use this much memory? If there is some other approach then please let me know. Bloom filters seem to be more useful in finding size of the intersection or union but here we need to know which Vertices are common. The only other way that I can roughly imagine is that we get the hashed edges in a dataset, just like 5,4,1/(logd3)... Use the same hash function on all the graph edges. Then Join the datasets obtained over field 1 and 2. Please tell me if there is any other efficient way or which one of these two you would prefer? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/896#discussion_r34269026 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java --- @@ -237,12 +246,13 @@ public Task(TaskDeploymentDescriptor tdd, this.memoryManager = checkNotNull(memManager); this.ioManager = checkNotNull(ioManager); - this.broadcastVariableManager =checkNotNull(bcVarManager); + this.broadcastVariableManager = checkNotNull(bcVarManager); + this.accumulatorRegistry = accumulatorRegistry; this.jobManager = checkNotNull(jobManagerActor); this.taskManager = checkNotNull(taskManagerActor); this.actorAskTimeout = new Timeout(checkNotNull(actorAskTimeout)); - --- End diff -- Lot's of auto-reformats... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2292) Report accumulators periodically while job is running
[ https://issues.apache.org/jira/browse/FLINK-2292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620708#comment-14620708 ] ASF GitHub Bot commented on FLINK-2292: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/896#discussion_r34269026 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java --- @@ -237,12 +246,13 @@ public Task(TaskDeploymentDescriptor tdd, this.memoryManager = checkNotNull(memManager); this.ioManager = checkNotNull(ioManager); - this.broadcastVariableManager =checkNotNull(bcVarManager); + this.broadcastVariableManager = checkNotNull(bcVarManager); + this.accumulatorRegistry = accumulatorRegistry; this.jobManager = checkNotNull(jobManagerActor); this.taskManager = checkNotNull(taskManagerActor); this.actorAskTimeout = new Timeout(checkNotNull(actorAskTimeout)); - --- End diff -- Lot's of auto-reformats... Report accumulators periodically while job is running - Key: FLINK-2292 URL: https://issues.apache.org/jira/browse/FLINK-2292 Project: Flink Issue Type: Sub-task Components: JobManager, TaskManager Reporter: Maximilian Michels Assignee: Maximilian Michels Fix For: 0.10 Accumulators should be sent periodically, as part of the heartbeat that sends metrics. This allows them to be updated in real time. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2292) Report accumulators periodically while job is running
[ https://issues.apache.org/jira/browse/FLINK-2292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620707#comment-14620707 ] ASF GitHub Bot commented on FLINK-2292: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/896#discussion_r34268940 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java --- @@ -172,13 +173,20 @@ /** The library cache, from which the task can request its required JAR files */ private final LibraryCacheManager libraryCache; - + /** The cache for user-defined files that the invokable requires */ private final FileCache fileCache; - + /** The gateway to the network stack, which handles inputs and produced results */ private final NetworkEnvironment network; + /** The registry of this task which enables live reporting of accumulators */ + private final AccumulatorRegistry accumulatorRegistry; + + public AccumulatorRegistry getAccumulatorRegistry() { --- End diff -- The structure of the class declares fields in one section, and getters in another. Would be good to follow that, makes the code easier to navigate. Report accumulators periodically while job is running - Key: FLINK-2292 URL: https://issues.apache.org/jira/browse/FLINK-2292 Project: Flink Issue Type: Sub-task Components: JobManager, TaskManager Reporter: Maximilian Michels Assignee: Maximilian Michels Fix For: 0.10 Accumulators should be sent periodically, as part of the heartbeat that sends metrics. This allows them to be updated in real time. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2292) Report accumulators periodically while job is running
[ https://issues.apache.org/jira/browse/FLINK-2292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620750#comment-14620750 ] ASF GitHub Bot commented on FLINK-2292: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/896#discussion_r34271710 --- Diff: flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java --- @@ -60,6 +60,7 @@ public static void main(String[] args) throws Exception { // set up the execution environment final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); --- End diff -- Probably the reason that the YARN tests are broken... Report accumulators periodically while job is running - Key: FLINK-2292 URL: https://issues.apache.org/jira/browse/FLINK-2292 Project: Flink Issue Type: Sub-task Components: JobManager, TaskManager Reporter: Maximilian Michels Assignee: Maximilian Michels Fix For: 0.10 Accumulators should be sent periodically, as part of the heartbeat that sends metrics. This allows them to be updated in real time. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/896#issuecomment-120054333 The is a potential modification conflict: Drawing a snapshot for serialization and registering a new accumulator can lead to a ConcurrentModificationException in the drawing of the snapshot. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/896#discussion_r34273441 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java --- @@ -601,6 +636,11 @@ void markFinished() { } } + synchronized (accumulatorLock) { --- End diff -- Ignore this comment. The lock is good :-) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2292) Report accumulators periodically while job is running
[ https://issues.apache.org/jira/browse/FLINK-2292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620786#comment-14620786 ] ASF GitHub Bot commented on FLINK-2292: --- Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/896#issuecomment-120058469 The naming of the accumulators refers sometimes to flink vs. user-defined, and sometimes to internal vs. external. Can we make this consistent? I actually like the flink vs. user-defined naming better. Report accumulators periodically while job is running - Key: FLINK-2292 URL: https://issues.apache.org/jira/browse/FLINK-2292 Project: Flink Issue Type: Sub-task Components: JobManager, TaskManager Reporter: Maximilian Michels Assignee: Maximilian Michels Fix For: 0.10 Accumulators should be sent periodically, as part of the heartbeat that sends metrics. This allows them to be updated in real time. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2292) Report accumulators periodically while job is running
[ https://issues.apache.org/jira/browse/FLINK-2292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620776#comment-14620776 ] ASF GitHub Bot commented on FLINK-2292: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/896#discussion_r34273136 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java --- @@ -601,6 +636,11 @@ void markFinished() { } } + synchronized (accumulatorLock) { --- End diff -- This lock here seems redundant. No place is looking for those two to be in sync. Report accumulators periodically while job is running - Key: FLINK-2292 URL: https://issues.apache.org/jira/browse/FLINK-2292 Project: Flink Issue Type: Sub-task Components: JobManager, TaskManager Reporter: Maximilian Michels Assignee: Maximilian Michels Fix For: 0.10 Accumulators should be sent periodically, as part of the heartbeat that sends metrics. This allows them to be updated in real time. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/896#discussion_r34274318 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java --- @@ -172,13 +173,20 @@ /** The library cache, from which the task can request its required JAR files */ private final LibraryCacheManager libraryCache; - + /** The cache for user-defined files that the invokable requires */ private final FileCache fileCache; - + /** The gateway to the network stack, which handles inputs and produced results */ private final NetworkEnvironment network; + /** The registry of this task which enables live reporting of accumulators */ + private final AccumulatorRegistry accumulatorRegistry; --- End diff -- Since the AccumulatorRegistry is only used task-internally, and always retrieved form there, it should be initialized internally. Saves one more constructor parameter and helps with separation of concerns. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2292) Report accumulators periodically while job is running
[ https://issues.apache.org/jira/browse/FLINK-2292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620768#comment-14620768 ] ASF GitHub Bot commented on FLINK-2292: --- Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/896#issuecomment-120054333 The is a potential modification conflict: Drawing a snapshot for serialization and registering a new accumulator can lead to a ConcurrentModificationException in the drawing of the snapshot. Report accumulators periodically while job is running - Key: FLINK-2292 URL: https://issues.apache.org/jira/browse/FLINK-2292 Project: Flink Issue Type: Sub-task Components: JobManager, TaskManager Reporter: Maximilian Michels Assignee: Maximilian Michels Fix For: 0.10 Accumulators should be sent periodically, as part of the heartbeat that sends metrics. This allows them to be updated in real time. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2292) Report accumulators periodically while job is running
[ https://issues.apache.org/jira/browse/FLINK-2292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620801#comment-14620801 ] ASF GitHub Bot commented on FLINK-2292: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/896#discussion_r34274318 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java --- @@ -172,13 +173,20 @@ /** The library cache, from which the task can request its required JAR files */ private final LibraryCacheManager libraryCache; - + /** The cache for user-defined files that the invokable requires */ private final FileCache fileCache; - + /** The gateway to the network stack, which handles inputs and produced results */ private final NetworkEnvironment network; + /** The registry of this task which enables live reporting of accumulators */ + private final AccumulatorRegistry accumulatorRegistry; --- End diff -- Since the AccumulatorRegistry is only used task-internally, and always retrieved form there, it should be initialized internally. Saves one more constructor parameter and helps with separation of concerns. Report accumulators periodically while job is running - Key: FLINK-2292 URL: https://issues.apache.org/jira/browse/FLINK-2292 Project: Flink Issue Type: Sub-task Components: JobManager, TaskManager Reporter: Maximilian Michels Assignee: Maximilian Michels Fix For: 0.10 Accumulators should be sent periodically, as part of the heartbeat that sends metrics. This allows them to be updated in real time. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2292) Report accumulators periodically while job is running
[ https://issues.apache.org/jira/browse/FLINK-2292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620810#comment-14620810 ] ASF GitHub Bot commented on FLINK-2292: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/896#discussion_r34274983 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorRegistry.java --- @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.accumulators; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.accumulators.Accumulator; +import org.apache.flink.api.common.accumulators.LongCounter; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + + +/** + * Main accumulator registry which encapsulates internal and user-defined accumulators. + */ +public class AccumulatorRegistry { --- End diff -- I do not understand the differentiation between the implementation of the `Internal` and `External` registry. From the usage pattern, both are accesses and initialized with a hash map. One time the hash map is created by the caller, once by the registry. I have not found a place where it would not work that the registry always creates the map immediately. Report accumulators periodically while job is running - Key: FLINK-2292 URL: https://issues.apache.org/jira/browse/FLINK-2292 Project: Flink Issue Type: Sub-task Components: JobManager, TaskManager Reporter: Maximilian Michels Assignee: Maximilian Michels Fix For: 0.10 Accumulators should be sent periodically, as part of the heartbeat that sends metrics. This allows them to be updated in real time. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/896#discussion_r34274983 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorRegistry.java --- @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.accumulators; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.accumulators.Accumulator; +import org.apache.flink.api.common.accumulators.LongCounter; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + + +/** + * Main accumulator registry which encapsulates internal and user-defined accumulators. + */ +public class AccumulatorRegistry { --- End diff -- I do not understand the differentiation between the implementation of the `Internal` and `External` registry. From the usage pattern, both are accesses and initialized with a hash map. One time the hash map is created by the caller, once by the registry. I have not found a place where it would not work that the registry always creates the map immediately. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2008][FLINK-2296] Fix checkpoint commit...
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/895#issuecomment-120065921 Except for the issue mentioned by Gyula (the double commit of the head), this looks good. I would like to merge this later today or tomorrow. Could address Gyula's comment while merging... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2292) Report accumulators periodically while job is running
[ https://issues.apache.org/jira/browse/FLINK-2292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620724#comment-14620724 ] ASF GitHub Bot commented on FLINK-2292: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/896#discussion_r34270017 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingAbstractRecordReader.java --- @@ -57,6 +58,12 @@ private final BarrierBuffer barrierBuffer; + /** +* Counters for the number of bytes read and records processed. +*/ + private LongCounter numRecordsRead = null; --- End diff -- `null` initializations are actually redundant. They still get executed (for OpenJDK javac and Oracle javac), so it is actually overhead for no reason. Report accumulators periodically while job is running - Key: FLINK-2292 URL: https://issues.apache.org/jira/browse/FLINK-2292 Project: Flink Issue Type: Sub-task Components: JobManager, TaskManager Reporter: Maximilian Michels Assignee: Maximilian Michels Fix For: 0.10 Accumulators should be sent periodically, as part of the heartbeat that sends metrics. This allows them to be updated in real time. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/896#discussion_r34270017 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingAbstractRecordReader.java --- @@ -57,6 +58,12 @@ private final BarrierBuffer barrierBuffer; + /** +* Counters for the number of bytes read and records processed. +*/ + private LongCounter numRecordsRead = null; --- End diff -- `null` initializations are actually redundant. They still get executed (for OpenJDK javac and Oracle javac), so it is actually overhead for no reason. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2340) Provide standalone mode for web interface of the JobManager
[ https://issues.apache.org/jira/browse/FLINK-2340?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620767#comment-14620767 ] Ufuk Celebi commented on FLINK-2340: +1 I think separation is the way to go. Provide standalone mode for web interface of the JobManager --- Key: FLINK-2340 URL: https://issues.apache.org/jira/browse/FLINK-2340 Project: Flink Issue Type: Improvement Components: JobManager Affects Versions: 0.10 Reporter: Maximilian Michels With the latest changes to enable high availability, the web interface's address may switch to one of the standby job manager nodes. As an enhancement, we could decouple the web interface and let it automatically retrieve data from the currently leading job manager. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/896#issuecomment-120058469 The naming of the accumulators refers sometimes to flink vs. user-defined, and sometimes to internal vs. external. Can we make this consistent? I actually like the flink vs. user-defined naming better. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/896#discussion_r34271710 --- Diff: flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java --- @@ -60,6 +60,7 @@ public static void main(String[] args) throws Exception { // set up the execution environment final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); --- End diff -- Probably the reason that the YARN tests are broken... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2341) Deadlock in SpilledSubpartitionViewAsyncIO
[ https://issues.apache.org/jira/browse/FLINK-2341?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620782#comment-14620782 ] Ufuk Celebi commented on FLINK-2341: Thanks for the stacktrace. I will look into it soon. The asynchronous variant is not used by default, so this does not affect any user until it's fixed. Deadlock in SpilledSubpartitionViewAsyncIO -- Key: FLINK-2341 URL: https://issues.apache.org/jira/browse/FLINK-2341 Project: Flink Issue Type: Bug Components: Distributed Runtime Affects Versions: 0.9, 0.10 Reporter: Stephan Ewen Assignee: Ufuk Celebi Priority: Critical Fix For: 0.9, 0.10 It may be that the deadlock is because of the way the {{SpilledSubpartitionViewTest}} is written {code} Found one Java-level deadlock: = pool-25-thread-2: waiting to lock monitor 0x7f66f4932468 (object 0xfa1478f0, a java.lang.Object), which is held by IOManager reader thread #1 IOManager reader thread #1: waiting to lock monitor 0x7f66f4931160 (object 0xfa029768, a java.lang.Object), which is held by pool-25-thread-2 Java stack information for the threads listed above: === pool-25-thread-2: at org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO.notifyError(SpilledSubpartitionViewAsyncIO.java:304) - waiting to lock 0xfa1478f0 (a java.lang.Object) at org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO.onAvailableBuffer(SpilledSubpartitionViewAsyncIO.java:256) at org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO.access$300(SpilledSubpartitionViewAsyncIO.java:42) at org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO$BufferProviderCallback.onEvent(SpilledSubpartitionViewAsyncIO.java:367) at org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO$BufferProviderCallback.onEvent(SpilledSubpartitionViewAsyncIO.java:353) at org.apache.flink.runtime.io.network.util.TestPooledBufferProvider$PooledBufferProviderRecycler.recycle(TestPooledBufferProvider.java:135) - locked 0xfa029768 (a java.lang.Object) at org.apache.flink.runtime.io.network.buffer.Buffer.recycle(Buffer.java:119) - locked 0xfa3a1a20 (a java.lang.Object) at org.apache.flink.runtime.io.network.util.TestSubpartitionConsumer.call(TestSubpartitionConsumer.java:95) at org.apache.flink.runtime.io.network.util.TestSubpartitionConsumer.call(TestSubpartitionConsumer.java:39) at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334) at java.util.concurrent.FutureTask.run(FutureTask.java:166) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1146) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:701) IOManager reader thread #1: at org.apache.flink.runtime.io.network.util.TestPooledBufferProvider$PooledBufferProviderRecycler.recycle(TestPooledBufferProvider.java:127) - waiting to lock 0xfa029768 (a java.lang.Object) at org.apache.flink.runtime.io.network.buffer.Buffer.recycle(Buffer.java:119) - locked 0xfa3a1ea0 (a java.lang.Object) at org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO.returnBufferFromIOThread(SpilledSubpartitionViewAsyncIO.java:270) - locked 0xfa1478f0 (a java.lang.Object) at org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO.access$100(SpilledSubpartitionViewAsyncIO.java:42) at org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO$IOThreadCallback.requestSuccessful(SpilledSubpartitionViewAsyncIO.java:338) at org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO$IOThreadCallback.requestSuccessful(SpilledSubpartitionViewAsyncIO.java:328) at org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.handleProcessedBuffer(AsynchronousFileIOChannel.java:199) at org.apache.flink.runtime.io.disk.iomanager.BufferReadRequest.requestDone(AsynchronousFileIOChannel.java:431) at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$ReaderThread.run(IOManagerAsync.java:377) {code} The full log with the deadlock stack traces can be found here: https://s3.amazonaws.com/archive.travis-ci.org/jobs/70232347/log.txt -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2292) Report accumulators periodically while job is running
[ https://issues.apache.org/jira/browse/FLINK-2292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620780#comment-14620780 ] ASF GitHub Bot commented on FLINK-2292: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/896#discussion_r34273441 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java --- @@ -601,6 +636,11 @@ void markFinished() { } } + synchronized (accumulatorLock) { --- End diff -- Ignore this comment. The lock is good :-) Report accumulators periodically while job is running - Key: FLINK-2292 URL: https://issues.apache.org/jira/browse/FLINK-2292 Project: Flink Issue Type: Sub-task Components: JobManager, TaskManager Reporter: Maximilian Michels Assignee: Maximilian Michels Fix For: 0.10 Accumulators should be sent periodically, as part of the heartbeat that sends metrics. This allows them to be updated in real time. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2292) Report accumulators periodically while job is running
[ https://issues.apache.org/jira/browse/FLINK-2292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620756#comment-14620756 ] ASF GitHub Bot commented on FLINK-2292: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/896#discussion_r34272101 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala --- @@ -306,99 +308,100 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging { if (!isConnected) { log.debug(sDropping message $message because the TaskManager is currently + not connected to a JobManager.) -} +} else { -// we order the messages by frequency, to make sure the code paths for matching -// are as short as possible -message match { + // we order the messages by frequency, to make sure the code paths for matching + // are as short as possible + message match { + +// tell the task about the availability of a new input partition +case UpdateTaskSinglePartitionInfo(executionID, resultID, partitionInfo) = + updateTaskInputPartitions(executionID, List((resultID, partitionInfo))) + +// tell the task about the availability of some new input partitions +case UpdateTaskMultiplePartitionInfos(executionID, partitionInfos) = + updateTaskInputPartitions(executionID, partitionInfos) + +// discards intermediate result partitions of a task execution on this TaskManager +case FailIntermediateResultPartitions(executionID) = + log.info(Discarding the results produced by task execution + executionID) + if (network.isAssociated) { +try { + network.getPartitionManager.releasePartitionsProducedBy(executionID) +} catch { + case t: Throwable = killTaskManagerFatal( +Fatal leak: Unable to release intermediate result partition data, t) +} + } - // tell the task about the availability of a new input partition - case UpdateTaskSinglePartitionInfo(executionID, resultID, partitionInfo) = -updateTaskInputPartitions(executionID, List((resultID, partitionInfo))) +// notifies the TaskManager that the state of a task has changed. +// the TaskManager informs the JobManager and cleans up in case the transition +// was into a terminal state, or in case the JobManager cannot be informed of the +// state transition - // tell the task about the availability of some new input partitions - case UpdateTaskMultiplePartitionInfos(executionID, partitionInfos) = -updateTaskInputPartitions(executionID, partitionInfos) +case updateMsg@UpdateTaskExecutionState(taskExecutionState: TaskExecutionState) = - // discards intermediate result partitions of a task execution on this TaskManager - case FailIntermediateResultPartitions(executionID) = -log.info(Discarding the results produced by task execution + executionID) -if (network.isAssociated) { - try { - network.getPartitionManager.releasePartitionsProducedBy(executionID) - } catch { -case t: Throwable = killTaskManagerFatal( -Fatal leak: Unable to release intermediate result partition data, t) - } -} + // we receive these from our tasks and forward them to the JobManager --- End diff -- Here is a lot of changed code that was seemingly edited without need (has nothing to do with the accumulators). Since that is pretty sensitive code, I feel very hesitant to commit these massive edits. What was the reason for these changes in the first place? Report accumulators periodically while job is running - Key: FLINK-2292 URL: https://issues.apache.org/jira/browse/FLINK-2292 Project: Flink Issue Type: Sub-task Components: JobManager, TaskManager Reporter: Maximilian Michels Assignee: Maximilian Michels Fix For: 0.10 Accumulators should be sent periodically, as part of the heartbeat that sends metrics. This allows them to be updated in real time. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/896#discussion_r34272101 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala --- @@ -306,99 +308,100 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging { if (!isConnected) { log.debug(sDropping message $message because the TaskManager is currently + not connected to a JobManager.) -} +} else { -// we order the messages by frequency, to make sure the code paths for matching -// are as short as possible -message match { + // we order the messages by frequency, to make sure the code paths for matching + // are as short as possible + message match { + +// tell the task about the availability of a new input partition +case UpdateTaskSinglePartitionInfo(executionID, resultID, partitionInfo) = + updateTaskInputPartitions(executionID, List((resultID, partitionInfo))) + +// tell the task about the availability of some new input partitions +case UpdateTaskMultiplePartitionInfos(executionID, partitionInfos) = + updateTaskInputPartitions(executionID, partitionInfos) + +// discards intermediate result partitions of a task execution on this TaskManager +case FailIntermediateResultPartitions(executionID) = + log.info(Discarding the results produced by task execution + executionID) + if (network.isAssociated) { +try { + network.getPartitionManager.releasePartitionsProducedBy(executionID) +} catch { + case t: Throwable = killTaskManagerFatal( +Fatal leak: Unable to release intermediate result partition data, t) +} + } - // tell the task about the availability of a new input partition - case UpdateTaskSinglePartitionInfo(executionID, resultID, partitionInfo) = -updateTaskInputPartitions(executionID, List((resultID, partitionInfo))) +// notifies the TaskManager that the state of a task has changed. +// the TaskManager informs the JobManager and cleans up in case the transition +// was into a terminal state, or in case the JobManager cannot be informed of the +// state transition - // tell the task about the availability of some new input partitions - case UpdateTaskMultiplePartitionInfos(executionID, partitionInfos) = -updateTaskInputPartitions(executionID, partitionInfos) +case updateMsg@UpdateTaskExecutionState(taskExecutionState: TaskExecutionState) = - // discards intermediate result partitions of a task execution on this TaskManager - case FailIntermediateResultPartitions(executionID) = -log.info(Discarding the results produced by task execution + executionID) -if (network.isAssociated) { - try { - network.getPartitionManager.releasePartitionsProducedBy(executionID) - } catch { -case t: Throwable = killTaskManagerFatal( -Fatal leak: Unable to release intermediate result partition data, t) - } -} + // we receive these from our tasks and forward them to the JobManager --- End diff -- Here is a lot of changed code that was seemingly edited without need (has nothing to do with the accumulators). Since that is pretty sensitive code, I feel very hesitant to commit these massive edits. What was the reason for these changes in the first place? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2292) Report accumulators periodically while job is running
[ https://issues.apache.org/jira/browse/FLINK-2292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620773#comment-14620773 ] ASF GitHub Bot commented on FLINK-2292: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/896#discussion_r34273000 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java --- @@ -131,6 +133,35 @@ private SerializedValueStateHandle? operatorState; + /* Lock for updating the accumulators atomically. */ --- End diff -- Why not follow the style of the remaining class, with respect to empty lines between fields? Report accumulators periodically while job is running - Key: FLINK-2292 URL: https://issues.apache.org/jira/browse/FLINK-2292 Project: Flink Issue Type: Sub-task Components: JobManager, TaskManager Reporter: Maximilian Michels Assignee: Maximilian Michels Fix For: 0.10 Accumulators should be sent periodically, as part of the heartbeat that sends metrics. This allows them to be updated in real time. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/896#discussion_r34273000 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java --- @@ -131,6 +133,35 @@ private SerializedValueStateHandle? operatorState; + /* Lock for updating the accumulators atomically. */ --- End diff -- Why not follow the style of the remaining class, with respect to empty lines between fields? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2280) GenericTypeComparator.compare() does not respect ascending flag
[ https://issues.apache.org/jira/browse/FLINK-2280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620081#comment-14620081 ] ASF GitHub Bot commented on FLINK-2280: --- Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/894#issuecomment-119865296 Good catch! +1 to merge GenericTypeComparator.compare() does not respect ascending flag --- Key: FLINK-2280 URL: https://issues.apache.org/jira/browse/FLINK-2280 Project: Flink Issue Type: Bug Components: Core Affects Versions: 0.9 Reporter: Fabian Hueske Assignee: Fabian Hueske Priority: Critical Fix For: 0.9.1 The {{GenericTypeComparator.compare()}} method does not respect the {{ascending}} flag that is used for inverted sorting. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2240) Use BloomFilter to minimize probe side records which are spilled to disk in Hybrid-Hash-Join
[ https://issues.apache.org/jira/browse/FLINK-2240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620119#comment-14620119 ] ASF GitHub Bot commented on FLINK-2240: --- Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/888#issuecomment-119873635 Nice, thank you. I will try to take a look soon... Use BloomFilter to minimize probe side records which are spilled to disk in Hybrid-Hash-Join Key: FLINK-2240 URL: https://issues.apache.org/jira/browse/FLINK-2240 Project: Flink Issue Type: Improvement Components: Core Reporter: Chengxiang Li Assignee: Chengxiang Li Priority: Minor In Hybrid-Hash-Join, while small table does not fit into memory, part of the small table data would be spilled to disk, and the counterpart partition of big table data would be spilled to disk in probe phase as well. If we build a BloomFilter while spill small table to disk during build phase, and use it to filter the big table records which tend to be spilled to disk, this may greatly reduce the spilled big table file size, and saved the disk IO cost for writing and further reading. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2240] Use BloomFilter to filter probe r...
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/888#issuecomment-119873635 Nice, thank you. I will try to take a look soon... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2293) Division by Zero Exception
[ https://issues.apache.org/jira/browse/FLINK-2293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620123#comment-14620123 ] Andra Lungu commented on FLINK-2293: I should not have tried anything yesterday... I was too tired to make things work. The flink version on the cluster was updated, however I forgot to update the jar. I am doing this as we speak and will keep you posted once the job finishes. Sorry! Division by Zero Exception -- Key: FLINK-2293 URL: https://issues.apache.org/jira/browse/FLINK-2293 Project: Flink Issue Type: Bug Components: Local Runtime Affects Versions: 0.9, 0.10 Reporter: Andra Lungu Assignee: Stephan Ewen Priority: Critical Fix For: 0.10, 0.9.1 I am basically running an algorithm that simulates a Gather Sum Apply Iteration that performs Traingle Count (Why simulate it? Because you just need a superstep - useless overhead if you use the runGatherSumApply function in Graph). What happens, at a high level: 1). Select neighbors with ID greater than the one corresponding to the current vertex; 2). Propagate the received values to neighbors with higher ID; 3). compute the number of triangles by checking whether trgVertex.getValue().get(srcVertex.getId()); As you can see, I *do not* perform any division at all; code is here: https://github.com/andralungu/gelly-partitioning/blob/master/src/main/java/example/GSATriangleCount.java Now for small graphs, 50MB max, the computation finishes nicely with the correct result. For a 10GB graph, however, I got this: java.lang.ArithmeticException: / by zero at org.apache.flink.runtime.operators.hash.MutableHashTable.insertIntoTable(MutableHashTable.java:836) at org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:819) at org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:508) at org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:544) at org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104) at org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173) at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496) at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at java.lang.Thread.run(Thread.java:722) see the full log here: https://gist.github.com/andralungu/984774f6348269df7951 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2280) GenericTypeComparator.compare() does not respect ascending flag
[ https://issues.apache.org/jira/browse/FLINK-2280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620130#comment-14620130 ] ASF GitHub Bot commented on FLINK-2280: --- Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/894#issuecomment-119875351 Merging GenericTypeComparator.compare() does not respect ascending flag --- Key: FLINK-2280 URL: https://issues.apache.org/jira/browse/FLINK-2280 Project: Flink Issue Type: Bug Components: Core Affects Versions: 0.9 Reporter: Fabian Hueske Assignee: Fabian Hueske Priority: Critical Fix For: 0.9.1 The {{GenericTypeComparator.compare()}} method does not respect the {{ascending}} flag that is used for inverted sorting. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-1239) Fix iteration example getting stuck with large input
[ https://issues.apache.org/jira/browse/FLINK-1239?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gyula Fora closed FLINK-1239. - Resolution: Not A Problem Fix iteration example getting stuck with large input Key: FLINK-1239 URL: https://issues.apache.org/jira/browse/FLINK-1239 Project: Flink Issue Type: Bug Components: Streaming Reporter: Gábor Hermann Assignee: Gábor Hermann When running the streaming iteration example with buffer timeout set to 0 (meaning the StreamRecorWriter gets flushed after every emit in every task), the iteration gets stuck at flushing the output after emitting a record. This happens only on larger number of inputs (eg. 1000 record to iterate on). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-2261) Remove reduce/aggregation from DataStream
[ https://issues.apache.org/jira/browse/FLINK-2261?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gyula Fora closed FLINK-2261. - Resolution: Fixed Remove reduce/aggregation from DataStream - Key: FLINK-2261 URL: https://issues.apache.org/jira/browse/FLINK-2261 Project: Flink Issue Type: Improvement Components: Java API, Scala API, Streaming Affects Versions: 0.10 Reporter: Gyula Fora Assignee: Gyula Fora Currently we have reduce and aggregation methods for non-grouped DataStreams as well, which will produce local aggregates depending on the parallelism of the operator. This behaviour is neither intuitive nor useful as it only produces sensible results if the user specifically sets the parallelism to 1 which should not be encouraged. I would like to remove these methods from the DataStream api and only keep it for GroupedDataStreams and WindowedDataStream where the aggregation is either executed per-key or per-window. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2334) IOException: Channel to path could not be opened
David Heller created FLINK-2334: --- Summary: IOException: Channel to path could not be opened Key: FLINK-2334 URL: https://issues.apache.org/jira/browse/FLINK-2334 Project: Flink Issue Type: Bug Affects Versions: 0.9 Environment: local and cluster environment; Linux and MacOS Reporter: David Heller Priority: Minor We've encountered an IOException due to missing temporary files (see stacktrace at the bottom). It occurred both in local and cluster execution and on MacOS as well as on linux. Data size does not seem to be the reason: the error occurred on a 6.5GB dataset as well as on a small 400MB dataset. Our code uses Bulk iterations and the suspicion is that cached build-side files are accidentally removed too early. As far as we observed it, the exception always happens in an iteration later than the first one (mostly iteration 2). Interestingly, on one occasion the error disappeared consistently when setting the number of maximum iterations higher (from 2 to 6). On another occasion, the exception appeared when adding a simple map operator at the end (holding the identity function). Generally, the error is quite hard to reproduce. The stacktrace: java.io.IOException: Channel to path '/var/folders/xx/0dd3w4jd7fbb4ytmhqxm157hgn/T/flink-io-f5061483-ff59-43dc-883f-79af813d5804/19a70637e025c7ee3919b30239060895.23.channel' could not be opened. at org.apache.flink.runtime.io.disk.iomanager.AbstractFileIOChannel.init(AbstractFileIOChannel.java:61) at org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.init(AsynchronousFileIOChannel.java:86) at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBulkBlockReader.init(AsynchronousBulkBlockReader.java:46) at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBulkBlockReader.init(AsynchronousBulkBlockReader.java:39) at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync.createBulkBlockChannelReader(IOManagerAsync.java:263) at org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:751) at org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:508) at org.apache.flink.runtime.operators.hash.ReOpenableMutableHashTable.prepareNextPartition(ReOpenableMutableHashTable.java:167) at org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:544) at org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104) at org.apache.flink.runtime.operators.AbstractCachedBuildSideMatchDriver.run(AbstractCachedBuildSideMatchDriver.java:155) at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496) at org.apache.flink.runtime.iterative.task.AbstractIterativePactTask.run(AbstractIterativePactTask.java:139) at org.apache.flink.runtime.iterative.task.IterationIntermediatePactTask.run(IterationIntermediatePactTask.java:92) at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.FileNotFoundException: /var/folders/xx/0dd3w4jd7fbb4ytmhqxm157hgn/T/flink-io-f5061483-ff59-43dc-883f-79af813d5804/19a70637e025c7ee3919b30239060895.23.channel (No such file or directory) at java.io.RandomAccessFile.open0(Native Method) at java.io.RandomAccessFile.open(RandomAccessFile.java:316) at java.io.RandomAccessFile.init(RandomAccessFile.java:243) at java.io.RandomAccessFile.init(RandomAccessFile.java:124) at org.apache.flink.runtime.io.disk.iomanager.AbstractFileIOChannel.init(AbstractFileIOChannel.java:57) ... 16 more -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2335) Rework iteration construction in StreamGraph
Gyula Fora created FLINK-2335: - Summary: Rework iteration construction in StreamGraph Key: FLINK-2335 URL: https://issues.apache.org/jira/browse/FLINK-2335 Project: Flink Issue Type: Improvement Components: Streaming Reporter: Gyula Fora Assignee: Gyula Fora Currently the nodes representing the extra sinks and sources are incrementally added to the streamgraph when the user creates the iterative parts of the program. This makes it difficult to enforce different partitioning schemes on the feedback edges and also makes it virtually impossible to handle more iteration heads with different parallelism. The actual nodes in the streamgraph for the iteration sinks/sources should only be created when the program is finalized after the user calls execute and before we create the jobgraph. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2336) ArrayIndexOufOBoundsException in TypeExtractor when mapping
William Saar created FLINK-2336: --- Summary: ArrayIndexOufOBoundsException in TypeExtractor when mapping Key: FLINK-2336 URL: https://issues.apache.org/jira/browse/FLINK-2336 Project: Flink Issue Type: Bug Affects Versions: master Reporter: William Saar The line that causes this is DataStreamO outputStream = insideIterationStream.filter(outputFilter).map(m - m.outputMessage); Problem occurs both when compiled using Javac and Eclipse's JDT compiler (in an environment where simple lambda type tests work) Exception in thread main java.lang.ArrayIndexOutOfBoundsException: -1 at java.util.ArrayList.elementData(Unknown Source) at java.util.ArrayList.get(Unknown Source) at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoFromInputs(TypeExtractor.java:553) at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:468) at org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:370) at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:254) at org.apache.flink.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:91) at org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:605) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2336) ArrayIndexOufOBoundsException in TypeExtractor when mapping
[ https://issues.apache.org/jira/browse/FLINK-2336?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620345#comment-14620345 ] William Saar commented on FLINK-2336: - When replacing the lambda with a MapFunction instance, one gets the following clearer error message Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Type of TypeVariable 'O' in 'class ...' could not be determined. This is most likely a type erasure problem. The type extraction currently supports types with generic variables only in cases where all variables in the return type can be deduced from the input type(s). at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:473) at org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:361) ArrayIndexOufOBoundsException in TypeExtractor when mapping --- Key: FLINK-2336 URL: https://issues.apache.org/jira/browse/FLINK-2336 Project: Flink Issue Type: Bug Affects Versions: master Reporter: William Saar The line that causes this is DataStreamO outputStream = insideIterationStream.filter(outputFilter).map(m - m.outputMessage); Problem occurs both when compiled using Javac and Eclipse's JDT compiler (in an environment where simple lambda type tests work) Exception in thread main java.lang.ArrayIndexOutOfBoundsException: -1 at java.util.ArrayList.elementData(Unknown Source) at java.util.ArrayList.get(Unknown Source) at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoFromInputs(TypeExtractor.java:553) at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:468) at org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:370) at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:254) at org.apache.flink.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:91) at org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:605) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2280] GenericTypeComparator.compare() r...
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/894#issuecomment-119865296 Good catch! +1 to merge --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-2333) Stream Data Sink that periodically rolls files
Stephan Ewen created FLINK-2333: --- Summary: Stream Data Sink that periodically rolls files Key: FLINK-2333 URL: https://issues.apache.org/jira/browse/FLINK-2333 Project: Flink Issue Type: New Feature Components: Streaming Affects Versions: 0.10 Reporter: Stephan Ewen It would be useful to have a file data sink for streams that starts a new file every n elements or records. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2293) Division by Zero Exception
[ https://issues.apache.org/jira/browse/FLINK-2293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620116#comment-14620116 ] Stephan Ewen commented on FLINK-2293: - [~andralungu] The line numbers in your stack trace (where the exception occurs, and the frame below as well) correspond to empty lines or comment lines in the source code of the latest master. See here: https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java#L836 Are you sure you are using the code after I added the patch? Division by Zero Exception -- Key: FLINK-2293 URL: https://issues.apache.org/jira/browse/FLINK-2293 Project: Flink Issue Type: Bug Components: Local Runtime Affects Versions: 0.9, 0.10 Reporter: Andra Lungu Assignee: Stephan Ewen Priority: Critical Fix For: 0.10, 0.9.1 I am basically running an algorithm that simulates a Gather Sum Apply Iteration that performs Traingle Count (Why simulate it? Because you just need a superstep - useless overhead if you use the runGatherSumApply function in Graph). What happens, at a high level: 1). Select neighbors with ID greater than the one corresponding to the current vertex; 2). Propagate the received values to neighbors with higher ID; 3). compute the number of triangles by checking whether trgVertex.getValue().get(srcVertex.getId()); As you can see, I *do not* perform any division at all; code is here: https://github.com/andralungu/gelly-partitioning/blob/master/src/main/java/example/GSATriangleCount.java Now for small graphs, 50MB max, the computation finishes nicely with the correct result. For a 10GB graph, however, I got this: java.lang.ArithmeticException: / by zero at org.apache.flink.runtime.operators.hash.MutableHashTable.insertIntoTable(MutableHashTable.java:836) at org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:819) at org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:508) at org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:544) at org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104) at org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173) at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496) at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at java.lang.Thread.run(Thread.java:722) see the full log here: https://gist.github.com/andralungu/984774f6348269df7951 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2293) Division by Zero Exception
[ https://issues.apache.org/jira/browse/FLINK-2293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620117#comment-14620117 ] Stephan Ewen commented on FLINK-2293: - You can look at the beginning of the log, it writes the code revision out. Can you post which one it is? Division by Zero Exception -- Key: FLINK-2293 URL: https://issues.apache.org/jira/browse/FLINK-2293 Project: Flink Issue Type: Bug Components: Local Runtime Affects Versions: 0.9, 0.10 Reporter: Andra Lungu Assignee: Stephan Ewen Priority: Critical Fix For: 0.10, 0.9.1 I am basically running an algorithm that simulates a Gather Sum Apply Iteration that performs Traingle Count (Why simulate it? Because you just need a superstep - useless overhead if you use the runGatherSumApply function in Graph). What happens, at a high level: 1). Select neighbors with ID greater than the one corresponding to the current vertex; 2). Propagate the received values to neighbors with higher ID; 3). compute the number of triangles by checking whether trgVertex.getValue().get(srcVertex.getId()); As you can see, I *do not* perform any division at all; code is here: https://github.com/andralungu/gelly-partitioning/blob/master/src/main/java/example/GSATriangleCount.java Now for small graphs, 50MB max, the computation finishes nicely with the correct result. For a 10GB graph, however, I got this: java.lang.ArithmeticException: / by zero at org.apache.flink.runtime.operators.hash.MutableHashTable.insertIntoTable(MutableHashTable.java:836) at org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:819) at org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:508) at org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:544) at org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104) at org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173) at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496) at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at java.lang.Thread.run(Thread.java:722) see the full log here: https://gist.github.com/andralungu/984774f6348269df7951 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2280] GenericTypeComparator.compare() r...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/894 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Resolved] (FLINK-1421) Implement a SAMOA Adapter for Flink Streaming
[ https://issues.apache.org/jira/browse/FLINK-1421?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gyula Fora resolved FLINK-1421. --- Resolution: Done Implement a SAMOA Adapter for Flink Streaming - Key: FLINK-1421 URL: https://issues.apache.org/jira/browse/FLINK-1421 Project: Flink Issue Type: New Feature Components: Streaming Reporter: Paris Carbone Assignee: Paris Carbone Original Estimate: 336h Remaining Estimate: 336h Yahoo's Samoa is an experimental incremental machine learning library that builds on an abstract compositional data streaming model to write streaming algorithms. The task is to provide an adapter from SAMOA topologies to Flink-streaming job graphs in order to support Flink as a backend engine for SAMOA tasks. A statup guide can be viewed here : https://docs.google.com/document/d/18glDJDYmnFGT1UGtZimaxZpGeeg1Ch14NgDoymhPk2A/pub The main working branch of the adapter : https://github.com/senorcarbone/samoa/tree/flink-integration -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-2328) Applying more than one transformation on an IterativeDataStream fails
[ https://issues.apache.org/jira/browse/FLINK-2328?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gyula Fora reassigned FLINK-2328: - Assignee: Gyula Fora Applying more than one transformation on an IterativeDataStream fails - Key: FLINK-2328 URL: https://issues.apache.org/jira/browse/FLINK-2328 Project: Flink Issue Type: Bug Components: Streaming Reporter: Gyula Fora Assignee: Gyula Fora Currently the user cannot apply more than one transformation on the IterativeDataStream directly. It fails because instead of creating one iteration source and connecting it to the operators it will try to create two iteration sources which fails on the shared broker slot. A workaround is to use a no-op mapper on the iterative stream and applying the two transformations on that. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-2328) Applying more than one transformation on an IterativeDataStream fails
[ https://issues.apache.org/jira/browse/FLINK-2328?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gyula Fora updated FLINK-2328: -- Component/s: Streaming Applying more than one transformation on an IterativeDataStream fails - Key: FLINK-2328 URL: https://issues.apache.org/jira/browse/FLINK-2328 Project: Flink Issue Type: Bug Components: Streaming Reporter: Gyula Fora Assignee: Gyula Fora Currently the user cannot apply more than one transformation on the IterativeDataStream directly. It fails because instead of creating one iteration source and connecting it to the operators it will try to create two iteration sources which fails on the shared broker slot. A workaround is to use a no-op mapper on the iterative stream and applying the two transformations on that. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2235) Local Flink cluster allocates too much memory
[ https://issues.apache.org/jira/browse/FLINK-2235?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620163#comment-14620163 ] ASF GitHub Bot commented on FLINK-2235: --- Github user mxm commented on the pull request: https://github.com/apache/flink/pull/859#issuecomment-119883278 Typically, programs can allocate as much memory as they like. We only take a fraction of the free physical memory for the manged memory. We could also take only half of the physical memory. Or, alternatively, fail with an exception that the maximum memory for the JVM is not set (-Xmx is missing). In my opinion, it is ok to take a fraction of the physical memory for local execution. Local Flink cluster allocates too much memory - Key: FLINK-2235 URL: https://issues.apache.org/jira/browse/FLINK-2235 Project: Flink Issue Type: Bug Components: Local Runtime, TaskManager Affects Versions: 0.9 Environment: Oracle JDK: 1.6.0_65-b14-462 Eclipse Reporter: Maximilian Michels Priority: Minor When executing a Flink job locally, the task manager gets initialized with an insane amount of memory. After a quick look in the code it seems that the call to {{EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag()}} returns a wrong estimate of the heap memory size. Moreover, the same user switched to Oracle JDK 1.8 and that made the error disappear. So I'm guessing this is some Java 1.6 quirk. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2235] fix calculation of free memory fo...
Github user mxm commented on the pull request: https://github.com/apache/flink/pull/859#issuecomment-119883278 Typically, programs can allocate as much memory as they like. We only take a fraction of the free physical memory for the manged memory. We could also take only half of the physical memory. Or, alternatively, fail with an exception that the maximum memory for the JVM is not set (-Xmx is missing). In my opinion, it is ok to take a fraction of the physical memory for local execution. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2305) Add documenation about Storm compatibility layer
[ https://issues.apache.org/jira/browse/FLINK-2305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620102#comment-14620102 ] ASF GitHub Bot commented on FLINK-2305: --- Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/884#issuecomment-119870990 Let's make it a separate issue. If there is a followup on these two issues, we can merge this, in my opinion. Add documenation about Storm compatibility layer Key: FLINK-2305 URL: https://issues.apache.org/jira/browse/FLINK-2305 Project: Flink Issue Type: Task Components: Documentation Reporter: Matthias J. Sax Assignee: Matthias J. Sax Storm compatibility layer is currently no documented at the project web site. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2305] Add documenation about Storm comp...
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/884#issuecomment-119870990 Let's make it a separate issue. If there is a followup on these two issues, we can merge this, in my opinion. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2280) GenericTypeComparator.compare() does not respect ascending flag
[ https://issues.apache.org/jira/browse/FLINK-2280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620135#comment-14620135 ] ASF GitHub Bot commented on FLINK-2280: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/894 GenericTypeComparator.compare() does not respect ascending flag --- Key: FLINK-2280 URL: https://issues.apache.org/jira/browse/FLINK-2280 Project: Flink Issue Type: Bug Components: Core Affects Versions: 0.9 Reporter: Fabian Hueske Assignee: Fabian Hueske Priority: Critical Fix For: 0.9.1 The {{GenericTypeComparator.compare()}} method does not respect the {{ascending}} flag that is used for inverted sorting. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-2280) GenericTypeComparator.compare() does not respect ascending flag
[ https://issues.apache.org/jira/browse/FLINK-2280?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske closed FLINK-2280. Resolution: Fixed Fixed with 590c6d0644e59d0bc3b5b6f4869f91bc38864232 GenericTypeComparator.compare() does not respect ascending flag --- Key: FLINK-2280 URL: https://issues.apache.org/jira/browse/FLINK-2280 Project: Flink Issue Type: Bug Components: Core Affects Versions: 0.9 Reporter: Fabian Hueske Assignee: Fabian Hueske Priority: Critical Fix For: 0.9.1 The {{GenericTypeComparator.compare()}} method does not respect the {{ascending}} flag that is used for inverted sorting. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-2338) Shut down Storm Topologies cleanly
[ https://issues.apache.org/jira/browse/FLINK-2338?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated FLINK-2338: --- Summary: Shut down Storm Topologies cleanly (was: Shut down Storm Topologies clenaly) Shut down Storm Topologies cleanly Key: FLINK-2338 URL: https://issues.apache.org/jira/browse/FLINK-2338 Project: Flink Issue Type: Improvement Components: flink-contrib Reporter: Matthias J. Sax Assignee: Matthias J. Sax Priority: Minor Currently, it is not possible to stop a Flink streaming program in a clean way. Thus, emulating Storm's kill command is done the hard way resulting in the following exception shown in the log: org.apache.flink.runtime.client.JobExecutionException: Communication with JobManager failed: Lost connection to JobManager akka://flink/user/jobmanager at org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:169) at org.apache.flink.runtime.minicluster.FlinkMiniCluster.submitJobAndWait(FlinkMiniCluster.scala:205) at org.apache.flink.runtime.minicluster.FlinkMiniCluster.submitJobAndWait(FlinkMiniCluster.scala:195) at org.apache.flink.streaming.util.TestStreamEnvironment$1.run(TestStreamEnvironment.java:116) Caused by: java.lang.Exception: Lost connection to JobManager akka://flink/user/jobmanager at org.apache.flink.runtime.client.JobClientActor.onReceive(JobClientActor.java:131) at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46) at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:369) at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:501) at akka.actor.ActorCell.invoke(ActorCell.scala:486) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) at akka.dispatch.Mailbox.run(Mailbox.scala:221) at akka.dispatch.Mailbox.exec(Mailbox.scala:231) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) The exception is expected currently. However, a clean kill is preferable. This can done after the new STOP signal is available (https://issues.apache.org/jira/browse/FLINK-2111). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-2162) Implement adaptive learning rate strategies for SGD
[ https://issues.apache.org/jira/browse/FLINK-2162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann updated FLINK-2162: - Assignee: Ventura Del Monte Implement adaptive learning rate strategies for SGD --- Key: FLINK-2162 URL: https://issues.apache.org/jira/browse/FLINK-2162 Project: Flink Issue Type: Improvement Components: Machine Learning Library Reporter: Till Rohrmann Assignee: Ventura Del Monte Priority: Minor Labels: ML At the moment, the SGD implementation uses a simple adaptive learning rate strategy, {{adaptedLearningRate = initialLearningRate/sqrt(iterationNumber)}}, which makes the optimization algorithm sensitive to the setting of the {{initialLearningRate}}. If this value is chosen wrongly, then the SGD might become instable. There are better ways to calculate the learning rate [1] such as Adagrad [3], Adadelta [4], SGD with momentum [5] others [2]. They promise to result in more stable optimization algorithms which don't require so much hyperparameter tweaking. It might be worthwhile to investigate these approaches. It might also be interesting to look at the implementation of vowpal wabbit [6]. Resources: [1] [http://imgur.com/a/Hqolp] [2] [http://cs.stanford.edu/people/karpathy/convnetjs/demo/trainers.html] [3] [http://www.jmlr.org/papers/volume12/duchi11a/duchi11a.pdf] [4] [http://www.matthewzeiler.com/pubs/googleTR2012/googleTR2012.pdf] [5] [http://www.willamette.edu/~gorr/classes/cs449/momrate.html] [6] [https://github.com/JohnLangford/vowpal_wabbit] -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2162) Implement adaptive learning rate strategies for SGD
[ https://issues.apache.org/jira/browse/FLINK-2162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620427#comment-14620427 ] Till Rohrmann commented on FLINK-2162: -- Great to hear [~ventura] :-) I've assigned you the issue. Implement adaptive learning rate strategies for SGD --- Key: FLINK-2162 URL: https://issues.apache.org/jira/browse/FLINK-2162 Project: Flink Issue Type: Improvement Components: Machine Learning Library Reporter: Till Rohrmann Assignee: Ventura Del Monte Priority: Minor Labels: ML At the moment, the SGD implementation uses a simple adaptive learning rate strategy, {{adaptedLearningRate = initialLearningRate/sqrt(iterationNumber)}}, which makes the optimization algorithm sensitive to the setting of the {{initialLearningRate}}. If this value is chosen wrongly, then the SGD might become instable. There are better ways to calculate the learning rate [1] such as Adagrad [3], Adadelta [4], SGD with momentum [5] others [2]. They promise to result in more stable optimization algorithms which don't require so much hyperparameter tweaking. It might be worthwhile to investigate these approaches. It might also be interesting to look at the implementation of vowpal wabbit [6]. Resources: [1] [http://imgur.com/a/Hqolp] [2] [http://cs.stanford.edu/people/karpathy/convnetjs/demo/trainers.html] [3] [http://www.jmlr.org/papers/volume12/duchi11a/duchi11a.pdf] [4] [http://www.matthewzeiler.com/pubs/googleTR2012/googleTR2012.pdf] [5] [http://www.willamette.edu/~gorr/classes/cs449/momrate.html] [6] [https://github.com/JohnLangford/vowpal_wabbit] -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2337) Multiple SLF4J bindings using Storm compatibility layer
Matthias J. Sax created FLINK-2337: -- Summary: Multiple SLF4J bindings using Storm compatibility layer Key: FLINK-2337 URL: https://issues.apache.org/jira/browse/FLINK-2337 Project: Flink Issue Type: Bug Components: flink-contrib Reporter: Matthias J. Sax Assignee: Matthias J. Sax Priority: Minor Storm depends on logback as slf4j implemenation but Flink uses log4j. The log shows the following conflict: SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/home/cicero/.m2/repository/ch/qos/logback/logback-classic/1.0.13/logback-classic-1.0.13.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/home/cicero/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. Need to exclude logback from storm dependencies to fix this. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2338) Shut down Storm Topologies clenaly
Matthias J. Sax created FLINK-2338: -- Summary: Shut down Storm Topologies clenaly Key: FLINK-2338 URL: https://issues.apache.org/jira/browse/FLINK-2338 Project: Flink Issue Type: Improvement Components: flink-contrib Reporter: Matthias J. Sax Assignee: Matthias J. Sax Priority: Minor Currently, it is not possible to stop a Flink streaming program in a clean way. Thus, emulating Storm's kill command is done the hard way resulting in the following exception shown in the log: org.apache.flink.runtime.client.JobExecutionException: Communication with JobManager failed: Lost connection to JobManager akka://flink/user/jobmanager at org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:169) at org.apache.flink.runtime.minicluster.FlinkMiniCluster.submitJobAndWait(FlinkMiniCluster.scala:205) at org.apache.flink.runtime.minicluster.FlinkMiniCluster.submitJobAndWait(FlinkMiniCluster.scala:195) at org.apache.flink.streaming.util.TestStreamEnvironment$1.run(TestStreamEnvironment.java:116) Caused by: java.lang.Exception: Lost connection to JobManager akka://flink/user/jobmanager at org.apache.flink.runtime.client.JobClientActor.onReceive(JobClientActor.java:131) at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46) at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:369) at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:501) at akka.actor.ActorCell.invoke(ActorCell.scala:486) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) at akka.dispatch.Mailbox.run(Mailbox.scala:221) at akka.dispatch.Mailbox.exec(Mailbox.scala:231) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) The exception is expected currently. However, a clean kill is preferable. This can done after the new STOP signal is available (https://issues.apache.org/jira/browse/FLINK-2111). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2305) Add documenation about Storm compatibility layer
[ https://issues.apache.org/jira/browse/FLINK-2305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620418#comment-14620418 ] ASF GitHub Bot commented on FLINK-2305: --- Github user mjsax commented on the pull request: https://github.com/apache/flink/pull/884#issuecomment-119943363 I just created two JIRAs: - SLF4J: https://issues.apache.org/jira/browse/FLINK-2337 - JobExecutionException: https://issues.apache.org/jira/browse/FLINK-2338 This PR should be ready to get merged. Add documenation about Storm compatibility layer Key: FLINK-2305 URL: https://issues.apache.org/jira/browse/FLINK-2305 Project: Flink Issue Type: Task Components: Documentation Reporter: Matthias J. Sax Assignee: Matthias J. Sax Storm compatibility layer is currently no documented at the project web site. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2305] Add documenation about Storm comp...
Github user mjsax commented on the pull request: https://github.com/apache/flink/pull/884#issuecomment-119943363 I just created two JIRAs: - SLF4J: https://issues.apache.org/jira/browse/FLINK-2337 - JobExecutionException: https://issues.apache.org/jira/browse/FLINK-2338 This PR should be ready to get merged. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2336) ArrayIndexOufOBoundsException in TypeExtractor when mapping
[ https://issues.apache.org/jira/browse/FLINK-2336?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620389#comment-14620389 ] Stephan Ewen commented on FLINK-2336: - Can you post a simpifies data type that reproduces this problem. Then we can make a patch to fix this! ArrayIndexOufOBoundsException in TypeExtractor when mapping --- Key: FLINK-2336 URL: https://issues.apache.org/jira/browse/FLINK-2336 Project: Flink Issue Type: Bug Affects Versions: master Reporter: William Saar The line that causes this is DataStreamO outputStream = insideIterationStream.filter(outputFilter).map(m - m.outputMessage); Problem occurs both when compiled using Javac and Eclipse's JDT compiler (in an environment where simple lambda type tests work) Exception in thread main java.lang.ArrayIndexOutOfBoundsException: -1 at java.util.ArrayList.elementData(Unknown Source) at java.util.ArrayList.get(Unknown Source) at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoFromInputs(TypeExtractor.java:553) at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:468) at org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:370) at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:254) at org.apache.flink.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:91) at org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:605) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2235] fix calculation of free memory fo...
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/859#issuecomment-119891011 Okay, from skimming over some Oracle docs, it seems the default max heap is 1/4 of the physical memory. Let's use that. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2235) Local Flink cluster allocates too much memory
[ https://issues.apache.org/jira/browse/FLINK-2235?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620197#comment-14620197 ] ASF GitHub Bot commented on FLINK-2235: --- Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/859#issuecomment-119891011 Okay, from skimming over some Oracle docs, it seems the default max heap is 1/4 of the physical memory. Let's use that. Local Flink cluster allocates too much memory - Key: FLINK-2235 URL: https://issues.apache.org/jira/browse/FLINK-2235 Project: Flink Issue Type: Bug Components: Local Runtime, TaskManager Affects Versions: 0.9 Environment: Oracle JDK: 1.6.0_65-b14-462 Eclipse Reporter: Maximilian Michels Priority: Minor When executing a Flink job locally, the task manager gets initialized with an insane amount of memory. After a quick look in the code it seems that the call to {{EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag()}} returns a wrong estimate of the heap memory size. Moreover, the same user switched to Oracle JDK 1.8 and that made the error disappear. So I'm guessing this is some Java 1.6 quirk. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2008][FLINK-2296] Fix checkpoint commit...
GitHub user rmetzger opened a pull request: https://github.com/apache/flink/pull/895 [FLINK-2008][FLINK-2296] Fix checkpoint committing KafkaITCase [wip] You can merge this pull request into a Git repository by running: $ git pull https://github.com/rmetzger/flink flink2008 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/895.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #895 commit 0d5773a56acb9f6d7592de9bf5a93f04e6600ca1 Author: Robert Metzger rmetz...@apache.org Date: 2015-06-29T14:52:38Z [FLINK-2008][FLINK-2296] Fix checkpoint committing KafkaITCase --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2008) PersistentKafkaSource is sometimes emitting tuples multiple times
[ https://issues.apache.org/jira/browse/FLINK-2008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620459#comment-14620459 ] ASF GitHub Bot commented on FLINK-2008: --- GitHub user rmetzger opened a pull request: https://github.com/apache/flink/pull/895 [FLINK-2008][FLINK-2296] Fix checkpoint committing KafkaITCase [wip] You can merge this pull request into a Git repository by running: $ git pull https://github.com/rmetzger/flink flink2008 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/895.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #895 commit 0d5773a56acb9f6d7592de9bf5a93f04e6600ca1 Author: Robert Metzger rmetz...@apache.org Date: 2015-06-29T14:52:38Z [FLINK-2008][FLINK-2296] Fix checkpoint committing KafkaITCase PersistentKafkaSource is sometimes emitting tuples multiple times - Key: FLINK-2008 URL: https://issues.apache.org/jira/browse/FLINK-2008 Project: Flink Issue Type: Bug Components: Kafka Connector, Streaming Affects Versions: 0.9 Reporter: Robert Metzger Assignee: Robert Metzger The PersistentKafkaSource is expected to emit records exactly once. Two test cases of the KafkaITCase are sporadically failing because records are emitted multiple times. Affected tests: {{testPersistentSourceWithOffsetUpdates()}}, after the offsets have been changed manually in ZK: {code} java.lang.RuntimeException: Expected v to be 3, but was 4 on element 0 array=[4, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 2] {code} {{brokerFailureTest()}} also fails: {code} 05/13/2015 08:13:16 Custom source - Stream Sink(1/1) switched to FAILED java.lang.AssertionError: Received tuple with value 21 twice at org.junit.Assert.fail(Assert.java:88) at org.junit.Assert.assertTrue(Assert.java:41) at org.junit.Assert.assertFalse(Assert.java:64) at org.apache.flink.streaming.connectors.kafka.KafkaITCase$15.invoke(KafkaITCase.java:877) at org.apache.flink.streaming.connectors.kafka.KafkaITCase$15.invoke(KafkaITCase.java:859) at org.apache.flink.streaming.api.operators.StreamSink.callUserFunction(StreamSink.java:39) at org.apache.flink.streaming.api.operators.StreamOperator.callUserFunctionAndLogException(StreamOperator.java:137) at org.apache.flink.streaming.api.operators.ChainableStreamOperator.collect(ChainableStreamOperator.java:54) at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:39) at org.apache.flink.streaming.connectors.kafka.api.persistent.PersistentKafkaSource.run(PersistentKafkaSource.java:173) at org.apache.flink.streaming.api.operators.StreamSource.callUserFunction(StreamSource.java:40) at org.apache.flink.streaming.api.operators.StreamOperator.callUserFunctionAndLogException(StreamOperator.java:137) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:34) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:139) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) at java.lang.Thread.run(Thread.java:745) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2292) Report accumulators periodically while job is running
[ https://issues.apache.org/jira/browse/FLINK-2292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620523#comment-14620523 ] ASF GitHub Bot commented on FLINK-2292: --- GitHub user mxm opened a pull request: https://github.com/apache/flink/pull/896 [FLINK-2292][FLINK-1573] add live per-task accumulators This refactors the accumulators to accumulate per task execution. The accumulators are reported from the task managers periodically to the job manager via the Heartbeat message. If the execution contains chained tasks, the accumulators are chained as well. The final accumulator results are reported via the UpdateTaskExecutionState message. The accumulators are now saved in the Execution within the ExecutionGraph. This makes the AccumulatorManager obsolete. It has been removed for now. In the future, we might introduce some caching for the web frontend visualization. Two types of accumulators are available: - external (user-defined via the RuntimeContext) - internal (flink metrics defined in the invocables) The internal (built-in) metrics are targeted at users who want to monitor their programs, e.g. through the job manager's web frontend. You can merge this pull request into a Git repository by running: $ git pull https://github.com/mxm/flink live-accumulators Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/896.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #896 commit 7cec1236f087e72b40022bf02a6dbb12d74acbac Author: Maximilian Michels m...@apache.org Date: 2015-07-08T07:23:42Z [FLINK-2292][FLINK-1573] add live per-task accumulators This refactors the accumulators to accumulate per task execution. The accumulators are reported from the task managers periodically to the job manager via the Heartbeat message. If the execution contains chained tasks, the accumulators are chained as well. The final accumulator results are reported via the UpdateTaskExecutionState message. The accumulators are now saved in the Execution within the ExecutionGraph. This makes the AccumulatorManager obsolete. It has been removed for now. In the future, we might introduce some caching for the web frontend visualization. Two types of accumulators are available: - external (user-defined via the RuntimeContext) - internal (flink metrics defined in the invocables) The internal (built-in) metrics are targeted at users who want to monitor their programs, e.g. through the job manager's web frontend. Report accumulators periodically while job is running - Key: FLINK-2292 URL: https://issues.apache.org/jira/browse/FLINK-2292 Project: Flink Issue Type: Sub-task Components: JobManager, TaskManager Reporter: Maximilian Michels Assignee: Maximilian Michels Fix For: 0.10 Accumulators should be sent periodically, as part of the heartbeat that sends metrics. This allows them to be updated in real time. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...
GitHub user mxm opened a pull request: https://github.com/apache/flink/pull/896 [FLINK-2292][FLINK-1573] add live per-task accumulators This refactors the accumulators to accumulate per task execution. The accumulators are reported from the task managers periodically to the job manager via the Heartbeat message. If the execution contains chained tasks, the accumulators are chained as well. The final accumulator results are reported via the UpdateTaskExecutionState message. The accumulators are now saved in the Execution within the ExecutionGraph. This makes the AccumulatorManager obsolete. It has been removed for now. In the future, we might introduce some caching for the web frontend visualization. Two types of accumulators are available: - external (user-defined via the RuntimeContext) - internal (flink metrics defined in the invocables) The internal (built-in) metrics are targeted at users who want to monitor their programs, e.g. through the job manager's web frontend. You can merge this pull request into a Git repository by running: $ git pull https://github.com/mxm/flink live-accumulators Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/896.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #896 commit 7cec1236f087e72b40022bf02a6dbb12d74acbac Author: Maximilian Michels m...@apache.org Date: 2015-07-08T07:23:42Z [FLINK-2292][FLINK-1573] add live per-task accumulators This refactors the accumulators to accumulate per task execution. The accumulators are reported from the task managers periodically to the job manager via the Heartbeat message. If the execution contains chained tasks, the accumulators are chained as well. The final accumulator results are reported via the UpdateTaskExecutionState message. The accumulators are now saved in the Execution within the ExecutionGraph. This makes the AccumulatorManager obsolete. It has been removed for now. In the future, we might introduce some caching for the web frontend visualization. Two types of accumulators are available: - external (user-defined via the RuntimeContext) - internal (flink metrics defined in the invocables) The internal (built-in) metrics are targeted at users who want to monitor their programs, e.g. through the job manager's web frontend. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/896#discussion_r34272308 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala --- @@ -1017,6 +1040,9 @@ object TaskManager { val HEARTBEAT_INTERVAL: FiniteDuration = 5000 milliseconds + /* Interval to send accumulators to the job manager */ + val ACCUMULATOR_REPORT_INTERVAL: FiniteDuration = 10 seconds --- End diff -- This variable is nowhere ever used. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2292) Report accumulators periodically while job is running
[ https://issues.apache.org/jira/browse/FLINK-2292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620762#comment-14620762 ] ASF GitHub Bot commented on FLINK-2292: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/896#discussion_r34272308 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala --- @@ -1017,6 +1040,9 @@ object TaskManager { val HEARTBEAT_INTERVAL: FiniteDuration = 5000 milliseconds + /* Interval to send accumulators to the job manager */ + val ACCUMULATOR_REPORT_INTERVAL: FiniteDuration = 10 seconds --- End diff -- This variable is nowhere ever used. Report accumulators periodically while job is running - Key: FLINK-2292 URL: https://issues.apache.org/jira/browse/FLINK-2292 Project: Flink Issue Type: Sub-task Components: JobManager, TaskManager Reporter: Maximilian Michels Assignee: Maximilian Michels Fix For: 0.10 Accumulators should be sent periodically, as part of the heartbeat that sends metrics. This allows them to be updated in real time. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/896#discussion_r34273136 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java --- @@ -601,6 +636,11 @@ void markFinished() { } } + synchronized (accumulatorLock) { --- End diff -- This lock here seems redundant. No place is looking for those two to be in sync. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2292) Report accumulators periodically while job is running
[ https://issues.apache.org/jira/browse/FLINK-2292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620819#comment-14620819 ] ASF GitHub Bot commented on FLINK-2292: --- Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/896#issuecomment-120065197 This pattern seems to repeat in many places: ```java AccumulatorRegistry accumulatorRegistry = getEnvironment().getAccumulatorRegistry(); AccumulatorRegistry.Internal internalRegistry = accumulatorRegistry.getInternal(); this.recordsOutCounter = internalRegistry.createLongCounter(AccumulatorRegistry.Internal.NUM_RECORDS_OUT); this.bytesOutCounter = internalRegistry.createLongCounter(AccumulatorRegistry.Internal.NUM_BYTES_OUT); AccumulatorRegistry.External externalRegistry = accumulatorRegistry.getExternal(); this.accumulatorMap = new HashMapString, Accumulator?, ?(); externalRegistry.setMap(this.accumulatorMap); ``` I think the code would be simpler is the registry simply always had a created map for internal and external accumulators. Also, a reporter object would help. The code would then look like: ```java AccumulatorRegistry accumulatos = getEnvironment().getAccumulatorRegistry(); ReadWriteReporter reporter = accumulatos.getReadWriteReporter(); writer.setStatsReporter(reporter); this.accumulatorMap = accumulatos.getUserMap(); ``` Report accumulators periodically while job is running - Key: FLINK-2292 URL: https://issues.apache.org/jira/browse/FLINK-2292 Project: Flink Issue Type: Sub-task Components: JobManager, TaskManager Reporter: Maximilian Michels Assignee: Maximilian Michels Fix For: 0.10 Accumulators should be sent periodically, as part of the heartbeat that sends metrics. This allows them to be updated in real time. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [wip] [FLINK-2288] [FLINK-2302] Setup ZooKeepe...
Github user mxm commented on the pull request: https://github.com/apache/flink/pull/886#issuecomment-120015233 Alright, I've opened a JIRA for this: https://issues.apache.org/jira/browse/FLINK-2340 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2329) Refactor RPCs from within the ExecutionGraph
[ https://issues.apache.org/jira/browse/FLINK-2329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620638#comment-14620638 ] ASF GitHub Bot commented on FLINK-2329: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/893#discussion_r34264492 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java --- @@ -43,8 +41,8 @@ /** The lock on which to synchronize allocations and failure state changes */ private final Object instanceLock = new Object(); - /** The actor ref to the task manager represented by this taskManager. */ - private final ActorRef taskManager; + /** The instacne gateway to communicate with the instance */ --- End diff -- typo Refactor RPCs from within the ExecutionGraph Key: FLINK-2329 URL: https://issues.apache.org/jira/browse/FLINK-2329 Project: Flink Issue Type: Sub-task Components: JobManager, TaskManager Reporter: Till Rohrmann Assignee: Till Rohrmann Fix For: 0.10 Currently, we store an {{ActorRef}} of the TaskManager into an {{Instance}} object. This {{ActorRef}} is used from within {{Executions}} to interact with the {{TaskManager}}. This is not a nice abstraction since it does not hide implementation details. Since we need to add a leader session ID to messages sent by the {{Executions}} in order to support high availability, we would need to make the leader session ID available to the {{Execution}}. A better solution seems to be to replace the direct {{ActorRef}} interaction with an instance gateway abstraction which encapsulates the communication logic. Having such an abstraction, it will be easy to decorate messages transparently with a leader session ID. Therefore, I propose to refactor the current {{Instance}} communication and to introduce an {{InstanceGateway}} abstraction. -- This message was sent by Atlassian JIRA (v6.3.4#6332)