[jira] [Commented] (FLINK-2329) Refactor RPCs from within the ExecutionGraph

2015-07-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620642#comment-14620642
 ] 

ASF GitHub Bot commented on FLINK-2329:
---

Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/893#issuecomment-120018025
  
Very sensible change in anticipation of the upcoming high availability 
changes.

+1


 Refactor RPCs from within the ExecutionGraph
 

 Key: FLINK-2329
 URL: https://issues.apache.org/jira/browse/FLINK-2329
 Project: Flink
  Issue Type: Sub-task
  Components: JobManager, TaskManager
Reporter: Till Rohrmann
Assignee: Till Rohrmann
 Fix For: 0.10


 Currently, we store an {{ActorRef}} of the TaskManager into an {{Instance}} 
 object. This {{ActorRef}} is used from within {{Executions}} to interact with 
 the {{TaskManager}}. This is not a nice abstraction since it does not hide 
 implementation details. 
 Since we need to add a leader session ID to messages sent by the 
 {{Executions}} in order to support high availability, we would need to make 
 the leader session ID available to the {{Execution}}. A better solution seems 
 to be to replace the direct {{ActorRef}} interaction with an instance gateway 
 abstraction which encapsulates the communication logic. Having such an 
 abstraction, it will be easy to decorate messages transparently with a leader 
 session ID. Therefore, I propose to refactor the current {{Instance}} 
 communication and to introduce an {{InstanceGateway}} abstraction.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [wip] [FLINK-2288] [FLINK-2302] Setup ZooKeepe...

2015-07-09 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/886#issuecomment-119988165
  
Did we find a solution for the random port problem?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2288) Setup ZooKeeper for distributed coordination

2015-07-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620568#comment-14620568
 ] 

ASF GitHub Bot commented on FLINK-2288:
---

Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/886#issuecomment-12117
  
Yes that makes sense. So the user will always have to connect to the web 
interface of the leading job manager, right? We could only circumvent that by 
separating the web interface from the job manager.


 Setup ZooKeeper for distributed coordination
 

 Key: FLINK-2288
 URL: https://issues.apache.org/jira/browse/FLINK-2288
 Project: Flink
  Issue Type: Sub-task
  Components: JobManager, TaskManager
Reporter: Ufuk Celebi
Assignee: Ufuk Celebi
 Fix For: 0.10


 Having standby JM instances for job manager high availabilty requires 
 distributed coordination between JM, TM, and clients. For this, we will use 
 ZooKeeper (ZK).
 Pros:
 - Proven solution (other projects use it for this as well)
 - Apache TLP with large community, docs, and library with required recipies 
 like leader election (see below)
 Related Wiki: 
 https://cwiki.apache.org/confluence/display/FLINK/JobManager+High+Availability



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2339) Prevent asynchronous checkpoint calls from overtaking each other

2015-07-09 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620625#comment-14620625
 ] 

Stephan Ewen commented on FLINK-2339:
-

I have a patch coming up, currently testing on Travis...

 Prevent asynchronous checkpoint calls from overtaking each other
 

 Key: FLINK-2339
 URL: https://issues.apache.org/jira/browse/FLINK-2339
 Project: Flink
  Issue Type: Improvement
  Components: TaskManager
Affects Versions: 0.10
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 0.10


 Currently, when checkpoint state materialization takes very long, and the 
 checkpoint interval is low, the asynchronous calls to trigger checkpoints (on 
 the sources) could overtake prior calls.
 We can fix that by making sure that all calls are dispatched in order by the 
 same thread, rather than spawning a new thread for each call.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [wip] [FLINK-2288] [FLINK-2302] Setup ZooKeepe...

2015-07-09 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/886#issuecomment-119996182
  
In HA mode, JobManagers start with a random free port. That is fine, 
because no one connects to them based on a config value, but only based on 
ZooKeeper entries.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2288) Setup ZooKeeper for distributed coordination

2015-07-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620572#comment-14620572
 ] 

ASF GitHub Bot commented on FLINK-2288:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/886#issuecomment-12744
  
The web interface is, modulo some object which are not serializable,
already independent of the JobManager. It should not be a big problem to
only have one web server which also retrieves the leading JobManager from
ZooKeeper and then serves the information from the leader.

On Thu, Jul 9, 2015 at 4:24 PM, Max notificati...@github.com wrote:

 Yes that makes sense. So the user will always have to connect to the web
 interface of the leading job manager, right? We could only circumvent that
 by separating the web interface from the job manager.

 —
 Reply to this email directly or view it on GitHub
 https://github.com/apache/flink/pull/886#issuecomment-12117.




 Setup ZooKeeper for distributed coordination
 

 Key: FLINK-2288
 URL: https://issues.apache.org/jira/browse/FLINK-2288
 Project: Flink
  Issue Type: Sub-task
  Components: JobManager, TaskManager
Reporter: Ufuk Celebi
Assignee: Ufuk Celebi
 Fix For: 0.10


 Having standby JM instances for job manager high availabilty requires 
 distributed coordination between JM, TM, and clients. For this, we will use 
 ZooKeeper (ZK).
 Pros:
 - Proven solution (other projects use it for this as well)
 - Apache TLP with large community, docs, and library with required recipies 
 like leader election (see below)
 Related Wiki: 
 https://cwiki.apache.org/confluence/display/FLINK/JobManager+High+Availability



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [wip] [FLINK-2288] [FLINK-2302] Setup ZooKeepe...

2015-07-09 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/886#issuecomment-12744
  
The web interface is, modulo some object which are not serializable,
already independent of the JobManager. It should not be a big problem to
only have one web server which also retrieves the leading JobManager from
ZooKeeper and then serves the information from the leader.

On Thu, Jul 9, 2015 at 4:24 PM, Max notificati...@github.com wrote:

 Yes that makes sense. So the user will always have to connect to the web
 interface of the leading job manager, right? We could only circumvent that
 by separating the web interface from the job manager.

 —
 Reply to this email directly or view it on GitHub
 https://github.com/apache/flink/pull/886#issuecomment-12117.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...

2015-07-09 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/896#issuecomment-120029960
  
Looks like this change breaks the YARN integration. The YARN WordCount no 
longer works.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2310] Add an Adamic Adar Similarity exa...

2015-07-09 Thread shghatge
Github user shghatge commented on the pull request:

https://github.com/apache/flink/pull/892#issuecomment-120036933
  
Hello @vasia 
I would like to work on both versions of Adamic Adar. As the JIRA did not 
ask for an approximate version, it was suggested that I create another JIRA 
issue which will provide a library method for Adamic Adar which gives 
approximate solution with the use of bloom filters.
I have a query about the bloom filters. Since bloom filters only tell us 
whether an element belongs to the set or not, if both the vertices have Bloom 
filters as value, how will we know what to emit? For Example for Vertex 3 
'1,4,13' are set and for Vertex 5 '2,4,13' are set. Now when we use the method 
suggested by you using logical AND we find out the intersection of the Bloom 
Filters. After this do you suggest that we keep another hashtable that keeps 
track of a value-vertex relation? Or do we just emit 5,4,1/log(d3) and keep 
the hashtable as an identity map function? That would mean each vertex has n 
number of bits as value , where n is the number of vertices in the graph. I 
hope I was clear in my query. TL;DR We will have to use an identity hash 
function which implies that each vertex will need n bits of memory as value. Is 
it okay to use this much memory? If there is some other approach then please 
let me know. Bloom filters seem to be more useful in finding size of the int
 ersection or union but here we need to know which Vertices are common. The 
only other way that I can roughly imagine is that we get the hashed edges in a 
dataset, just like 5,4,1/(logd3)... Use the same hash function on all the graph 
edges. Then Join the datasets obtained over field 1 and 2. 
Please tell me if there is any other efficient way or which one of these 
two you would prefer?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...

2015-07-09 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/896#discussion_r34268803
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
 ---
@@ -212,19 +218,19 @@ public InputGate getInputGate(int index) {
return inputGates;
}
 
-   @Override
-   public void reportAccumulators(MapString, Accumulator?, ? 
accumulators) {
-   AccumulatorEvent evt;
-   try {
-   evt = new AccumulatorEvent(getJobID(), accumulators);
-   }
-   catch (IOException e) {
-   throw new RuntimeException(Cannot serialize 
accumulators to send them to JobManager, e);
-   }
-
-   ReportAccumulatorResult accResult = new 
ReportAccumulatorResult(jobId, executionId, evt);
-   jobManagerActor.tell(accResult, ActorRef.noSender());
-   }
+// @Override
--- End diff --

This can be properly removed, no?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2292) Report accumulators periodically while job is running

2015-07-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620697#comment-14620697
 ] 

ASF GitHub Bot commented on FLINK-2292:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/896#discussion_r34268803
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
 ---
@@ -212,19 +218,19 @@ public InputGate getInputGate(int index) {
return inputGates;
}
 
-   @Override
-   public void reportAccumulators(MapString, Accumulator?, ? 
accumulators) {
-   AccumulatorEvent evt;
-   try {
-   evt = new AccumulatorEvent(getJobID(), accumulators);
-   }
-   catch (IOException e) {
-   throw new RuntimeException(Cannot serialize 
accumulators to send them to JobManager, e);
-   }
-
-   ReportAccumulatorResult accResult = new 
ReportAccumulatorResult(jobId, executionId, evt);
-   jobManagerActor.tell(accResult, ActorRef.noSender());
-   }
+// @Override
--- End diff --

This can be properly removed, no?


 Report accumulators periodically while job is running
 -

 Key: FLINK-2292
 URL: https://issues.apache.org/jira/browse/FLINK-2292
 Project: Flink
  Issue Type: Sub-task
  Components: JobManager, TaskManager
Reporter: Maximilian Michels
Assignee: Maximilian Michels
 Fix For: 0.10


 Accumulators should be sent periodically, as part of the heartbeat that sends 
 metrics. This allows them to be updated in real time.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2235] fix calculation of free memory fo...

2015-07-09 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/859#issuecomment-119987554
  
1/4 of the physical memory seems sensible and indeed returns almost the 
same memory size as the maximum memory setting on my machine. I've adapted the 
pull request.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2008) PersistentKafkaSource is sometimes emitting tuples multiple times

2015-07-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620613#comment-14620613
 ] 

ASF GitHub Bot commented on FLINK-2008:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/895#discussion_r34262391
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
 ---
@@ -53,6 +62,8 @@
 @SuppressWarnings(serial)
 public class StreamCheckpointingITCase {
 
+   private static final Logger LOG = 
LoggerFactory.getLogger(StreamCheckpointingITCase.class);
--- End diff --

Before I decided to undo parts of the checkpointing changes (sending the 
full state back  from the JM to the tasks), I had added an additional test 
there.

I can delete the logger factory.


 PersistentKafkaSource is sometimes emitting tuples multiple times
 -

 Key: FLINK-2008
 URL: https://issues.apache.org/jira/browse/FLINK-2008
 Project: Flink
  Issue Type: Bug
  Components: Kafka Connector, Streaming
Affects Versions: 0.9
Reporter: Robert Metzger
Assignee: Robert Metzger

 The PersistentKafkaSource is expected to emit records exactly once.
 Two test cases of the KafkaITCase are sporadically failing because records 
 are emitted multiple times.
 Affected tests:
 {{testPersistentSourceWithOffsetUpdates()}}, after the offsets have been 
 changed manually in ZK:
 {code}
 java.lang.RuntimeException: Expected v to be 3, but was 4 on element 0 
 array=[4, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 
 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 
 2]
 {code}
 {{brokerFailureTest()}} also fails:
 {code}
 05/13/2015 08:13:16   Custom source - Stream Sink(1/1) switched to FAILED 
 java.lang.AssertionError: Received tuple with value 21 twice
   at org.junit.Assert.fail(Assert.java:88)
   at org.junit.Assert.assertTrue(Assert.java:41)
   at org.junit.Assert.assertFalse(Assert.java:64)
   at 
 org.apache.flink.streaming.connectors.kafka.KafkaITCase$15.invoke(KafkaITCase.java:877)
   at 
 org.apache.flink.streaming.connectors.kafka.KafkaITCase$15.invoke(KafkaITCase.java:859)
   at 
 org.apache.flink.streaming.api.operators.StreamSink.callUserFunction(StreamSink.java:39)
   at 
 org.apache.flink.streaming.api.operators.StreamOperator.callUserFunctionAndLogException(StreamOperator.java:137)
   at 
 org.apache.flink.streaming.api.operators.ChainableStreamOperator.collect(ChainableStreamOperator.java:54)
   at 
 org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:39)
   at 
 org.apache.flink.streaming.connectors.kafka.api.persistent.PersistentKafkaSource.run(PersistentKafkaSource.java:173)
   at 
 org.apache.flink.streaming.api.operators.StreamSource.callUserFunction(StreamSource.java:40)
   at 
 org.apache.flink.streaming.api.operators.StreamOperator.callUserFunctionAndLogException(StreamOperator.java:137)
   at 
 org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:34)
   at 
 org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:139)
   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
   at java.lang.Thread.run(Thread.java:745)
 {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2008][FLINK-2296] Fix checkpoint commit...

2015-07-09 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/895#discussion_r34262141
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
 ---
@@ -53,6 +62,8 @@
 @SuppressWarnings(serial)
 public class StreamCheckpointingITCase {
 
+   private static final Logger LOG = 
LoggerFactory.getLogger(StreamCheckpointingITCase.class);
--- End diff --

These changes here also seem accidentally committed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2008) PersistentKafkaSource is sometimes emitting tuples multiple times

2015-07-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620610#comment-14620610
 ] 

ASF GitHub Bot commented on FLINK-2008:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/895#discussion_r34262141
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
 ---
@@ -53,6 +62,8 @@
 @SuppressWarnings(serial)
 public class StreamCheckpointingITCase {
 
+   private static final Logger LOG = 
LoggerFactory.getLogger(StreamCheckpointingITCase.class);
--- End diff --

These changes here also seem accidentally committed


 PersistentKafkaSource is sometimes emitting tuples multiple times
 -

 Key: FLINK-2008
 URL: https://issues.apache.org/jira/browse/FLINK-2008
 Project: Flink
  Issue Type: Bug
  Components: Kafka Connector, Streaming
Affects Versions: 0.9
Reporter: Robert Metzger
Assignee: Robert Metzger

 The PersistentKafkaSource is expected to emit records exactly once.
 Two test cases of the KafkaITCase are sporadically failing because records 
 are emitted multiple times.
 Affected tests:
 {{testPersistentSourceWithOffsetUpdates()}}, after the offsets have been 
 changed manually in ZK:
 {code}
 java.lang.RuntimeException: Expected v to be 3, but was 4 on element 0 
 array=[4, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 
 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 
 2]
 {code}
 {{brokerFailureTest()}} also fails:
 {code}
 05/13/2015 08:13:16   Custom source - Stream Sink(1/1) switched to FAILED 
 java.lang.AssertionError: Received tuple with value 21 twice
   at org.junit.Assert.fail(Assert.java:88)
   at org.junit.Assert.assertTrue(Assert.java:41)
   at org.junit.Assert.assertFalse(Assert.java:64)
   at 
 org.apache.flink.streaming.connectors.kafka.KafkaITCase$15.invoke(KafkaITCase.java:877)
   at 
 org.apache.flink.streaming.connectors.kafka.KafkaITCase$15.invoke(KafkaITCase.java:859)
   at 
 org.apache.flink.streaming.api.operators.StreamSink.callUserFunction(StreamSink.java:39)
   at 
 org.apache.flink.streaming.api.operators.StreamOperator.callUserFunctionAndLogException(StreamOperator.java:137)
   at 
 org.apache.flink.streaming.api.operators.ChainableStreamOperator.collect(ChainableStreamOperator.java:54)
   at 
 org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:39)
   at 
 org.apache.flink.streaming.connectors.kafka.api.persistent.PersistentKafkaSource.run(PersistentKafkaSource.java:173)
   at 
 org.apache.flink.streaming.api.operators.StreamSource.callUserFunction(StreamSource.java:40)
   at 
 org.apache.flink.streaming.api.operators.StreamOperator.callUserFunctionAndLogException(StreamOperator.java:137)
   at 
 org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:34)
   at 
 org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:139)
   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
   at java.lang.Thread.run(Thread.java:745)
 {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2008) PersistentKafkaSource is sometimes emitting tuples multiple times

2015-07-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620609#comment-14620609
 ] 

ASF GitHub Bot commented on FLINK-2008:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/895#discussion_r34262084
  
--- Diff: flink-tests/src/test/resources/log4j-test.properties ---
@@ -18,7 +18,7 @@
 
 # Set root logger level to OFF to not flood build logs
 # set manually to INFO for debugging purposes
-log4j.rootLogger=OFF, testlogger
+log4j.rootLogger=INFO, testlogger
--- End diff --

was this accidentally committed?


 PersistentKafkaSource is sometimes emitting tuples multiple times
 -

 Key: FLINK-2008
 URL: https://issues.apache.org/jira/browse/FLINK-2008
 Project: Flink
  Issue Type: Bug
  Components: Kafka Connector, Streaming
Affects Versions: 0.9
Reporter: Robert Metzger
Assignee: Robert Metzger

 The PersistentKafkaSource is expected to emit records exactly once.
 Two test cases of the KafkaITCase are sporadically failing because records 
 are emitted multiple times.
 Affected tests:
 {{testPersistentSourceWithOffsetUpdates()}}, after the offsets have been 
 changed manually in ZK:
 {code}
 java.lang.RuntimeException: Expected v to be 3, but was 4 on element 0 
 array=[4, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 
 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 
 2]
 {code}
 {{brokerFailureTest()}} also fails:
 {code}
 05/13/2015 08:13:16   Custom source - Stream Sink(1/1) switched to FAILED 
 java.lang.AssertionError: Received tuple with value 21 twice
   at org.junit.Assert.fail(Assert.java:88)
   at org.junit.Assert.assertTrue(Assert.java:41)
   at org.junit.Assert.assertFalse(Assert.java:64)
   at 
 org.apache.flink.streaming.connectors.kafka.KafkaITCase$15.invoke(KafkaITCase.java:877)
   at 
 org.apache.flink.streaming.connectors.kafka.KafkaITCase$15.invoke(KafkaITCase.java:859)
   at 
 org.apache.flink.streaming.api.operators.StreamSink.callUserFunction(StreamSink.java:39)
   at 
 org.apache.flink.streaming.api.operators.StreamOperator.callUserFunctionAndLogException(StreamOperator.java:137)
   at 
 org.apache.flink.streaming.api.operators.ChainableStreamOperator.collect(ChainableStreamOperator.java:54)
   at 
 org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:39)
   at 
 org.apache.flink.streaming.connectors.kafka.api.persistent.PersistentKafkaSource.run(PersistentKafkaSource.java:173)
   at 
 org.apache.flink.streaming.api.operators.StreamSource.callUserFunction(StreamSource.java:40)
   at 
 org.apache.flink.streaming.api.operators.StreamOperator.callUserFunctionAndLogException(StreamOperator.java:137)
   at 
 org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:34)
   at 
 org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:139)
   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
   at java.lang.Thread.run(Thread.java:745)
 {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2008][FLINK-2296] Fix checkpoint commit...

2015-07-09 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/895#discussion_r34262145
  
--- Diff: flink-tests/src/test/resources/log4j-test.properties ---
@@ -18,7 +18,7 @@
 
 # Set root logger level to OFF to not flood build logs
 # set manually to INFO for debugging purposes
-log4j.rootLogger=OFF, testlogger
+log4j.rootLogger=INFO, testlogger
--- End diff --

Yes


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2008) PersistentKafkaSource is sometimes emitting tuples multiple times

2015-07-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620611#comment-14620611
 ] 

ASF GitHub Bot commented on FLINK-2008:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/895#discussion_r34262145
  
--- Diff: flink-tests/src/test/resources/log4j-test.properties ---
@@ -18,7 +18,7 @@
 
 # Set root logger level to OFF to not flood build logs
 # set manually to INFO for debugging purposes
-log4j.rootLogger=OFF, testlogger
+log4j.rootLogger=INFO, testlogger
--- End diff --

Yes


 PersistentKafkaSource is sometimes emitting tuples multiple times
 -

 Key: FLINK-2008
 URL: https://issues.apache.org/jira/browse/FLINK-2008
 Project: Flink
  Issue Type: Bug
  Components: Kafka Connector, Streaming
Affects Versions: 0.9
Reporter: Robert Metzger
Assignee: Robert Metzger

 The PersistentKafkaSource is expected to emit records exactly once.
 Two test cases of the KafkaITCase are sporadically failing because records 
 are emitted multiple times.
 Affected tests:
 {{testPersistentSourceWithOffsetUpdates()}}, after the offsets have been 
 changed manually in ZK:
 {code}
 java.lang.RuntimeException: Expected v to be 3, but was 4 on element 0 
 array=[4, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 
 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 
 2]
 {code}
 {{brokerFailureTest()}} also fails:
 {code}
 05/13/2015 08:13:16   Custom source - Stream Sink(1/1) switched to FAILED 
 java.lang.AssertionError: Received tuple with value 21 twice
   at org.junit.Assert.fail(Assert.java:88)
   at org.junit.Assert.assertTrue(Assert.java:41)
   at org.junit.Assert.assertFalse(Assert.java:64)
   at 
 org.apache.flink.streaming.connectors.kafka.KafkaITCase$15.invoke(KafkaITCase.java:877)
   at 
 org.apache.flink.streaming.connectors.kafka.KafkaITCase$15.invoke(KafkaITCase.java:859)
   at 
 org.apache.flink.streaming.api.operators.StreamSink.callUserFunction(StreamSink.java:39)
   at 
 org.apache.flink.streaming.api.operators.StreamOperator.callUserFunctionAndLogException(StreamOperator.java:137)
   at 
 org.apache.flink.streaming.api.operators.ChainableStreamOperator.collect(ChainableStreamOperator.java:54)
   at 
 org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:39)
   at 
 org.apache.flink.streaming.connectors.kafka.api.persistent.PersistentKafkaSource.run(PersistentKafkaSource.java:173)
   at 
 org.apache.flink.streaming.api.operators.StreamSource.callUserFunction(StreamSource.java:40)
   at 
 org.apache.flink.streaming.api.operators.StreamOperator.callUserFunctionAndLogException(StreamOperator.java:137)
   at 
 org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:34)
   at 
 org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:139)
   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
   at java.lang.Thread.run(Thread.java:745)
 {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2288) Setup ZooKeeper for distributed coordination

2015-07-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620634#comment-14620634
 ] 

ASF GitHub Bot commented on FLINK-2288:
---

Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/886#issuecomment-120015233
  
Alright, I've opened a JIRA for this: 
https://issues.apache.org/jira/browse/FLINK-2340


 Setup ZooKeeper for distributed coordination
 

 Key: FLINK-2288
 URL: https://issues.apache.org/jira/browse/FLINK-2288
 Project: Flink
  Issue Type: Sub-task
  Components: JobManager, TaskManager
Reporter: Ufuk Celebi
Assignee: Ufuk Celebi
 Fix For: 0.10


 Having standby JM instances for job manager high availabilty requires 
 distributed coordination between JM, TM, and clients. For this, we will use 
 ZooKeeper (ZK).
 Pros:
 - Proven solution (other projects use it for this as well)
 - Apache TLP with large community, docs, and library with required recipies 
 like leader election (see below)
 Related Wiki: 
 https://cwiki.apache.org/confluence/display/FLINK/JobManager+High+Availability



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2292) Report accumulators periodically while job is running

2015-07-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620654#comment-14620654
 ] 

ASF GitHub Bot commented on FLINK-2292:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/896#discussion_r34266800
  
--- Diff: 
flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java
 ---
@@ -60,6 +60,7 @@ public static void main(String[] args) throws Exception {

// set up the execution environment
final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+   env.setParallelism(1);
--- End diff --

This must have been accidentally committed


 Report accumulators periodically while job is running
 -

 Key: FLINK-2292
 URL: https://issues.apache.org/jira/browse/FLINK-2292
 Project: Flink
  Issue Type: Sub-task
  Components: JobManager, TaskManager
Reporter: Maximilian Michels
Assignee: Maximilian Michels
 Fix For: 0.10


 Accumulators should be sent periodically, as part of the heartbeat that sends 
 metrics. This allows them to be updated in real time.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...

2015-07-09 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/896#discussion_r34266800
  
--- Diff: 
flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java
 ---
@@ -60,6 +60,7 @@ public static void main(String[] args) throws Exception {

// set up the execution environment
final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+   env.setParallelism(1);
--- End diff --

This must have been accidentally committed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2235) Local Flink cluster allocates too much memory

2015-07-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2235?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620538#comment-14620538
 ] 

ASF GitHub Bot commented on FLINK-2235:
---

Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/859#issuecomment-119987554
  
1/4 of the physical memory seems sensible and indeed returns almost the 
same memory size as the maximum memory setting on my machine. I've adapted the 
pull request.


 Local Flink cluster allocates too much memory
 -

 Key: FLINK-2235
 URL: https://issues.apache.org/jira/browse/FLINK-2235
 Project: Flink
  Issue Type: Bug
  Components: Local Runtime, TaskManager
Affects Versions: 0.9
 Environment: Oracle JDK: 1.6.0_65-b14-462
 Eclipse
Reporter: Maximilian Michels
Priority: Minor

 When executing a Flink job locally, the task manager gets initialized with an 
 insane amount of memory. After a quick look in the code it seems that the 
 call to {{EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag()}} 
 returns a wrong estimate of the heap memory size.
 Moreover, the same user switched to Oracle JDK 1.8 and that made the error 
 disappear. So I'm guessing this is some Java 1.6 quirk.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [wip] [FLINK-2288] [FLINK-2302] Setup ZooKeepe...

2015-07-09 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/886#issuecomment-12117
  
Yes that makes sense. So the user will always have to connect to the web 
interface of the leading job manager, right? We could only circumvent that by 
separating the web interface from the job manager.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2008][FLINK-2296] Fix checkpoint commit...

2015-07-09 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/895#discussion_r34262472
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/Utils.java
 ---
@@ -53,13 +62,22 @@ public boolean isEndOfStream(T nextElement) {
 
@Override
public byte[] serialize(T element) {
-   DataOutputSerializer dos = new DataOutputSerializer(16);
+   if(dos == null) {
+   dos = new DataOutputSerializer(1);
--- End diff --

I think a starting size of `16` is nicer than one of `1`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-2340) Provide standalone mode for web interface of the JobManager

2015-07-09 Thread Maximilian Michels (JIRA)
Maximilian Michels created FLINK-2340:
-

 Summary: Provide standalone mode for web interface of the 
JobManager
 Key: FLINK-2340
 URL: https://issues.apache.org/jira/browse/FLINK-2340
 Project: Flink
  Issue Type: Improvement
  Components: JobManager
Affects Versions: 0.10
Reporter: Maximilian Michels


With the latest changes to enable high availability, the web interface's 
address may switch to one of the standby job manager nodes. As an enhancement, 
we could decouple the web interface and let it automatically retrieve data from 
the currently leading job manager.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2341) Deadlock in SpilledSubpartitionViewAsyncIO

2015-07-09 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-2341:
---

 Summary: Deadlock in SpilledSubpartitionViewAsyncIO
 Key: FLINK-2341
 URL: https://issues.apache.org/jira/browse/FLINK-2341
 Project: Flink
  Issue Type: Bug
  Components: Distributed Runtime
Affects Versions: 0.9, 0.10
Reporter: Stephan Ewen
Assignee: Ufuk Celebi
Priority: Critical
 Fix For: 0.9, 0.10


It may be that the deadlock is because of the way the 
{{SpilledSubpartitionViewTest}} is written

{code}
Found one Java-level deadlock:
=
pool-25-thread-2:
  waiting to lock monitor 0x7f66f4932468 (object 0xfa1478f0, a 
java.lang.Object),
  which is held by IOManager reader thread #1
IOManager reader thread #1:
  waiting to lock monitor 0x7f66f4931160 (object 0xfa029768, a 
java.lang.Object),
  which is held by pool-25-thread-2

Java stack information for the threads listed above:
===
pool-25-thread-2:
at 
org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO.notifyError(SpilledSubpartitionViewAsyncIO.java:304)
- waiting to lock 0xfa1478f0 (a java.lang.Object)
at 
org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO.onAvailableBuffer(SpilledSubpartitionViewAsyncIO.java:256)
at 
org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO.access$300(SpilledSubpartitionViewAsyncIO.java:42)
at 
org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO$BufferProviderCallback.onEvent(SpilledSubpartitionViewAsyncIO.java:367)
at 
org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO$BufferProviderCallback.onEvent(SpilledSubpartitionViewAsyncIO.java:353)
at 
org.apache.flink.runtime.io.network.util.TestPooledBufferProvider$PooledBufferProviderRecycler.recycle(TestPooledBufferProvider.java:135)
- locked 0xfa029768 (a java.lang.Object)
at 
org.apache.flink.runtime.io.network.buffer.Buffer.recycle(Buffer.java:119)
- locked 0xfa3a1a20 (a java.lang.Object)
at 
org.apache.flink.runtime.io.network.util.TestSubpartitionConsumer.call(TestSubpartitionConsumer.java:95)
at 
org.apache.flink.runtime.io.network.util.TestSubpartitionConsumer.call(TestSubpartitionConsumer.java:39)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
at java.util.concurrent.FutureTask.run(FutureTask.java:166)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1146)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:701)
IOManager reader thread #1:
at 
org.apache.flink.runtime.io.network.util.TestPooledBufferProvider$PooledBufferProviderRecycler.recycle(TestPooledBufferProvider.java:127)
- waiting to lock 0xfa029768 (a java.lang.Object)
at 
org.apache.flink.runtime.io.network.buffer.Buffer.recycle(Buffer.java:119)
- locked 0xfa3a1ea0 (a java.lang.Object)
at 
org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO.returnBufferFromIOThread(SpilledSubpartitionViewAsyncIO.java:270)
- locked 0xfa1478f0 (a java.lang.Object)
at 
org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO.access$100(SpilledSubpartitionViewAsyncIO.java:42)
at 
org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO$IOThreadCallback.requestSuccessful(SpilledSubpartitionViewAsyncIO.java:338)
at 
org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO$IOThreadCallback.requestSuccessful(SpilledSubpartitionViewAsyncIO.java:328)
at 
org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.handleProcessedBuffer(AsynchronousFileIOChannel.java:199)
at 
org.apache.flink.runtime.io.disk.iomanager.BufferReadRequest.requestDone(AsynchronousFileIOChannel.java:431)
at 
org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$ReaderThread.run(IOManagerAsync.java:377)
{code}

The full log with the deadlock stack traces can be found here:

https://s3.amazonaws.com/archive.travis-ci.org/jobs/70232347/log.txt



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2008][FLINK-2296] Fix checkpoint commit...

2015-07-09 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/895#discussion_r34261966
  
--- Diff: docs/apis/streaming_guide.md ---
@@ -1300,6 +1300,9 @@ Another way of exposing user defined operator state 
for the Flink runtime for ch
 
 When the user defined function implements the `Checkpointed` interface, 
the `snapshotState(…)` and `restoreState(…)` methods will be executed to 
draw and restore function state.
 
+In addition to that, user functions can also implement the 
`CheckpointNotifier` interface to receive notifications on completed 
checkpoints via the `notifyCheckpointComplete(long checkpointId)` method.
+Note that there is no guarantee for the user function to receive a 
notification once the checkpoint is complete.
--- End diff --

Let's write it like Note that there is no guarantee for the user function 
to receive a notification if a failure happens between checkpoint completion 
and notification. The notifications should hence be treated in a way that 
notifications from later checkpoints can subsume missing notifications.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2008][FLINK-2296] Fix checkpoint commit...

2015-07-09 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/895#discussion_r34262084
  
--- Diff: flink-tests/src/test/resources/log4j-test.properties ---
@@ -18,7 +18,7 @@
 
 # Set root logger level to OFF to not flood build logs
 # set manually to INFO for debugging purposes
-log4j.rootLogger=OFF, testlogger
+log4j.rootLogger=INFO, testlogger
--- End diff --

was this accidentally committed?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2008) PersistentKafkaSource is sometimes emitting tuples multiple times

2015-07-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620608#comment-14620608
 ] 

ASF GitHub Bot commented on FLINK-2008:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/895#discussion_r34261966
  
--- Diff: docs/apis/streaming_guide.md ---
@@ -1300,6 +1300,9 @@ Another way of exposing user defined operator state 
for the Flink runtime for ch
 
 When the user defined function implements the `Checkpointed` interface, 
the `snapshotState(…)` and `restoreState(…)` methods will be executed to draw 
and restore function state.
 
+In addition to that, user functions can also implement the 
`CheckpointNotifier` interface to receive notifications on completed 
checkpoints via the `notifyCheckpointComplete(long checkpointId)` method.
+Note that there is no guarantee for the user function to receive a 
notification once the checkpoint is complete.
--- End diff --

Let's write it like Note that there is no guarantee for the user function 
to receive a notification if a failure happens between checkpoint completion 
and notification. The notifications should hence be treated in a way that 
notifications from later checkpoints can subsume missing notifications.


 PersistentKafkaSource is sometimes emitting tuples multiple times
 -

 Key: FLINK-2008
 URL: https://issues.apache.org/jira/browse/FLINK-2008
 Project: Flink
  Issue Type: Bug
  Components: Kafka Connector, Streaming
Affects Versions: 0.9
Reporter: Robert Metzger
Assignee: Robert Metzger

 The PersistentKafkaSource is expected to emit records exactly once.
 Two test cases of the KafkaITCase are sporadically failing because records 
 are emitted multiple times.
 Affected tests:
 {{testPersistentSourceWithOffsetUpdates()}}, after the offsets have been 
 changed manually in ZK:
 {code}
 java.lang.RuntimeException: Expected v to be 3, but was 4 on element 0 
 array=[4, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 
 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 
 2]
 {code}
 {{brokerFailureTest()}} also fails:
 {code}
 05/13/2015 08:13:16   Custom source - Stream Sink(1/1) switched to FAILED 
 java.lang.AssertionError: Received tuple with value 21 twice
   at org.junit.Assert.fail(Assert.java:88)
   at org.junit.Assert.assertTrue(Assert.java:41)
   at org.junit.Assert.assertFalse(Assert.java:64)
   at 
 org.apache.flink.streaming.connectors.kafka.KafkaITCase$15.invoke(KafkaITCase.java:877)
   at 
 org.apache.flink.streaming.connectors.kafka.KafkaITCase$15.invoke(KafkaITCase.java:859)
   at 
 org.apache.flink.streaming.api.operators.StreamSink.callUserFunction(StreamSink.java:39)
   at 
 org.apache.flink.streaming.api.operators.StreamOperator.callUserFunctionAndLogException(StreamOperator.java:137)
   at 
 org.apache.flink.streaming.api.operators.ChainableStreamOperator.collect(ChainableStreamOperator.java:54)
   at 
 org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:39)
   at 
 org.apache.flink.streaming.connectors.kafka.api.persistent.PersistentKafkaSource.run(PersistentKafkaSource.java:173)
   at 
 org.apache.flink.streaming.api.operators.StreamSource.callUserFunction(StreamSource.java:40)
   at 
 org.apache.flink.streaming.api.operators.StreamOperator.callUserFunctionAndLogException(StreamOperator.java:137)
   at 
 org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:34)
   at 
 org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:139)
   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
   at java.lang.Thread.run(Thread.java:745)
 {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2310] Add an Adamic Adar Similarity exa...

2015-07-09 Thread shghatge
Github user shghatge commented on the pull request:

https://github.com/apache/flink/pull/892#issuecomment-120033517
  
Hello @vasia 
I would like to work on both versions of Adamic Adar. As the JIRA did not 
ask for an approximate version, it was suggested that I create another JIRA 
issue which will provide a library method for Adamic Adar which gives 
approximate solution with the use of bloom filters.
I have a query about the bloom filters. Since bloom filters only tell us 
whether an element belongs to the set or not, if both the vertices have Bloom 
filters as value, how will we know what to search for in the other set? For 
example. for Example for Vertex 3 '1,4,13' are set and for Vertex 5 '2,4,13' 
are set. Now when we use the method suggested by you, we will find that 4 and 
13 are set for 5 too. Now what tuple should it emit? Do you suggest that we 
keep another hashtable that keeps track of a value-vertex relation? Or do we 
just emit 5,4,1/log(d3) and keep the hashtable as an identity map function? 
That would mean each vertex has n number of bits as value , where n is the 
number of vertices in the graph. I hope I was clear in my query. TL;DR We will 
have to use an identity hash function which implies that each vertex will need 
n bits of memory as value. Is it okay to use this much memory? If there is some 
other approach then please let me know. Bloom filters seem to be more
  useful in finding size of the intersection or union but here we need to know 
which Vertices are common. The only other way that I can roughly imagine is 
that we get the hashed edges in a dataset, just like 5,4,1/(logd3)... Use the 
same hash function on all the graph edges. Then Join the datasets obtained over 
field 1 and 2. 
Please tell me if there is any other efficient way or which one of these 
two you would prefer?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...

2015-07-09 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/896#discussion_r34269026
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---
@@ -237,12 +246,13 @@ public Task(TaskDeploymentDescriptor tdd,
 
this.memoryManager = checkNotNull(memManager);
this.ioManager = checkNotNull(ioManager);
-   this.broadcastVariableManager =checkNotNull(bcVarManager);
+   this.broadcastVariableManager = checkNotNull(bcVarManager);
+   this.accumulatorRegistry = accumulatorRegistry;
 
this.jobManager = checkNotNull(jobManagerActor);
this.taskManager = checkNotNull(taskManagerActor);
this.actorAskTimeout = new 
Timeout(checkNotNull(actorAskTimeout));
-   
--- End diff --

Lot's of auto-reformats...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2292) Report accumulators periodically while job is running

2015-07-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620708#comment-14620708
 ] 

ASF GitHub Bot commented on FLINK-2292:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/896#discussion_r34269026
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---
@@ -237,12 +246,13 @@ public Task(TaskDeploymentDescriptor tdd,
 
this.memoryManager = checkNotNull(memManager);
this.ioManager = checkNotNull(ioManager);
-   this.broadcastVariableManager =checkNotNull(bcVarManager);
+   this.broadcastVariableManager = checkNotNull(bcVarManager);
+   this.accumulatorRegistry = accumulatorRegistry;
 
this.jobManager = checkNotNull(jobManagerActor);
this.taskManager = checkNotNull(taskManagerActor);
this.actorAskTimeout = new 
Timeout(checkNotNull(actorAskTimeout));
-   
--- End diff --

Lot's of auto-reformats...


 Report accumulators periodically while job is running
 -

 Key: FLINK-2292
 URL: https://issues.apache.org/jira/browse/FLINK-2292
 Project: Flink
  Issue Type: Sub-task
  Components: JobManager, TaskManager
Reporter: Maximilian Michels
Assignee: Maximilian Michels
 Fix For: 0.10


 Accumulators should be sent periodically, as part of the heartbeat that sends 
 metrics. This allows them to be updated in real time.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2292) Report accumulators periodically while job is running

2015-07-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620707#comment-14620707
 ] 

ASF GitHub Bot commented on FLINK-2292:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/896#discussion_r34268940
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---
@@ -172,13 +173,20 @@
 
/** The library cache, from which the task can request its required JAR 
files */
private final LibraryCacheManager libraryCache;
-   
+
/** The cache for user-defined files that the invokable requires */
private final FileCache fileCache;
-   
+
/** The gateway to the network stack, which handles inputs and produced 
results */
private final NetworkEnvironment network;
 
+   /** The registry of this task which enables live reporting of 
accumulators */
+   private final AccumulatorRegistry accumulatorRegistry;
+
+   public AccumulatorRegistry getAccumulatorRegistry() {
--- End diff --

The structure of the class declares fields in one section, and getters in 
another. Would be good to follow that, makes the code easier to navigate.


 Report accumulators periodically while job is running
 -

 Key: FLINK-2292
 URL: https://issues.apache.org/jira/browse/FLINK-2292
 Project: Flink
  Issue Type: Sub-task
  Components: JobManager, TaskManager
Reporter: Maximilian Michels
Assignee: Maximilian Michels
 Fix For: 0.10


 Accumulators should be sent periodically, as part of the heartbeat that sends 
 metrics. This allows them to be updated in real time.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2292) Report accumulators periodically while job is running

2015-07-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620750#comment-14620750
 ] 

ASF GitHub Bot commented on FLINK-2292:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/896#discussion_r34271710
  
--- Diff: 
flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java
 ---
@@ -60,6 +60,7 @@ public static void main(String[] args) throws Exception {

// set up the execution environment
final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+   env.setParallelism(1);
--- End diff --

Probably the reason that the YARN tests are broken...


 Report accumulators periodically while job is running
 -

 Key: FLINK-2292
 URL: https://issues.apache.org/jira/browse/FLINK-2292
 Project: Flink
  Issue Type: Sub-task
  Components: JobManager, TaskManager
Reporter: Maximilian Michels
Assignee: Maximilian Michels
 Fix For: 0.10


 Accumulators should be sent periodically, as part of the heartbeat that sends 
 metrics. This allows them to be updated in real time.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...

2015-07-09 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/896#issuecomment-120054333
  
The is a potential modification conflict: Drawing a snapshot for 
serialization and registering a new accumulator can lead to a 
ConcurrentModificationException in the drawing of the snapshot.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...

2015-07-09 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/896#discussion_r34273441
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ---
@@ -601,6 +636,11 @@ void markFinished() {
}
}
 
+   synchronized (accumulatorLock) {
--- End diff --

Ignore this comment. The lock is good :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2292) Report accumulators periodically while job is running

2015-07-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620786#comment-14620786
 ] 

ASF GitHub Bot commented on FLINK-2292:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/896#issuecomment-120058469
  
The naming of the accumulators refers sometimes to flink vs. 
user-defined, and sometimes to internal vs. external. Can we make this 
consistent? I actually like the flink vs. user-defined naming better.


 Report accumulators periodically while job is running
 -

 Key: FLINK-2292
 URL: https://issues.apache.org/jira/browse/FLINK-2292
 Project: Flink
  Issue Type: Sub-task
  Components: JobManager, TaskManager
Reporter: Maximilian Michels
Assignee: Maximilian Michels
 Fix For: 0.10


 Accumulators should be sent periodically, as part of the heartbeat that sends 
 metrics. This allows them to be updated in real time.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2292) Report accumulators periodically while job is running

2015-07-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620776#comment-14620776
 ] 

ASF GitHub Bot commented on FLINK-2292:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/896#discussion_r34273136
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ---
@@ -601,6 +636,11 @@ void markFinished() {
}
}
 
+   synchronized (accumulatorLock) {
--- End diff --

This lock here seems redundant. No place is looking for those two to be in 
sync.


 Report accumulators periodically while job is running
 -

 Key: FLINK-2292
 URL: https://issues.apache.org/jira/browse/FLINK-2292
 Project: Flink
  Issue Type: Sub-task
  Components: JobManager, TaskManager
Reporter: Maximilian Michels
Assignee: Maximilian Michels
 Fix For: 0.10


 Accumulators should be sent periodically, as part of the heartbeat that sends 
 metrics. This allows them to be updated in real time.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...

2015-07-09 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/896#discussion_r34274318
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---
@@ -172,13 +173,20 @@
 
/** The library cache, from which the task can request its required JAR 
files */
private final LibraryCacheManager libraryCache;
-   
+
/** The cache for user-defined files that the invokable requires */
private final FileCache fileCache;
-   
+
/** The gateway to the network stack, which handles inputs and produced 
results */
private final NetworkEnvironment network;
 
+   /** The registry of this task which enables live reporting of 
accumulators */
+   private final AccumulatorRegistry accumulatorRegistry;
--- End diff --

Since the AccumulatorRegistry is only used task-internally, and always 
retrieved form there, it should be initialized internally. Saves one more 
constructor parameter and helps with separation of concerns.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2292) Report accumulators periodically while job is running

2015-07-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620768#comment-14620768
 ] 

ASF GitHub Bot commented on FLINK-2292:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/896#issuecomment-120054333
  
The is a potential modification conflict: Drawing a snapshot for 
serialization and registering a new accumulator can lead to a 
ConcurrentModificationException in the drawing of the snapshot.


 Report accumulators periodically while job is running
 -

 Key: FLINK-2292
 URL: https://issues.apache.org/jira/browse/FLINK-2292
 Project: Flink
  Issue Type: Sub-task
  Components: JobManager, TaskManager
Reporter: Maximilian Michels
Assignee: Maximilian Michels
 Fix For: 0.10


 Accumulators should be sent periodically, as part of the heartbeat that sends 
 metrics. This allows them to be updated in real time.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2292) Report accumulators periodically while job is running

2015-07-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620801#comment-14620801
 ] 

ASF GitHub Bot commented on FLINK-2292:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/896#discussion_r34274318
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---
@@ -172,13 +173,20 @@
 
/** The library cache, from which the task can request its required JAR 
files */
private final LibraryCacheManager libraryCache;
-   
+
/** The cache for user-defined files that the invokable requires */
private final FileCache fileCache;
-   
+
/** The gateway to the network stack, which handles inputs and produced 
results */
private final NetworkEnvironment network;
 
+   /** The registry of this task which enables live reporting of 
accumulators */
+   private final AccumulatorRegistry accumulatorRegistry;
--- End diff --

Since the AccumulatorRegistry is only used task-internally, and always 
retrieved form there, it should be initialized internally. Saves one more 
constructor parameter and helps with separation of concerns.


 Report accumulators periodically while job is running
 -

 Key: FLINK-2292
 URL: https://issues.apache.org/jira/browse/FLINK-2292
 Project: Flink
  Issue Type: Sub-task
  Components: JobManager, TaskManager
Reporter: Maximilian Michels
Assignee: Maximilian Michels
 Fix For: 0.10


 Accumulators should be sent periodically, as part of the heartbeat that sends 
 metrics. This allows them to be updated in real time.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2292) Report accumulators periodically while job is running

2015-07-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620810#comment-14620810
 ] 

ASF GitHub Bot commented on FLINK-2292:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/896#discussion_r34274983
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorRegistry.java
 ---
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.accumulators;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+
+/**
+ * Main accumulator registry which encapsulates internal and user-defined 
accumulators.
+ */
+public class AccumulatorRegistry {
--- End diff --

I do not understand the differentiation between the implementation of the 
`Internal` and `External` registry. From the usage pattern, both are accesses 
and initialized with a hash map. One time the hash map is created by the 
caller, once by the registry. I have not found a place where it would not work 
that the registry always creates the map immediately.


 Report accumulators periodically while job is running
 -

 Key: FLINK-2292
 URL: https://issues.apache.org/jira/browse/FLINK-2292
 Project: Flink
  Issue Type: Sub-task
  Components: JobManager, TaskManager
Reporter: Maximilian Michels
Assignee: Maximilian Michels
 Fix For: 0.10


 Accumulators should be sent periodically, as part of the heartbeat that sends 
 metrics. This allows them to be updated in real time.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...

2015-07-09 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/896#discussion_r34274983
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorRegistry.java
 ---
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.accumulators;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+
+/**
+ * Main accumulator registry which encapsulates internal and user-defined 
accumulators.
+ */
+public class AccumulatorRegistry {
--- End diff --

I do not understand the differentiation between the implementation of the 
`Internal` and `External` registry. From the usage pattern, both are accesses 
and initialized with a hash map. One time the hash map is created by the 
caller, once by the registry. I have not found a place where it would not work 
that the registry always creates the map immediately.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2008][FLINK-2296] Fix checkpoint commit...

2015-07-09 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/895#issuecomment-120065921
  
Except for the issue mentioned by Gyula (the double commit of the head), 
this looks good. I would like to merge this later today or tomorrow. Could 
address Gyula's comment while merging...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2292) Report accumulators periodically while job is running

2015-07-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620724#comment-14620724
 ] 

ASF GitHub Bot commented on FLINK-2292:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/896#discussion_r34270017
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingAbstractRecordReader.java
 ---
@@ -57,6 +58,12 @@
 
private final BarrierBuffer barrierBuffer;
 
+   /**
+* Counters for the number of bytes read and records processed.
+*/
+   private LongCounter numRecordsRead = null;
--- End diff --

`null` initializations are actually redundant. They still get executed (for 
OpenJDK javac and Oracle javac), so it is actually overhead for no reason.


 Report accumulators periodically while job is running
 -

 Key: FLINK-2292
 URL: https://issues.apache.org/jira/browse/FLINK-2292
 Project: Flink
  Issue Type: Sub-task
  Components: JobManager, TaskManager
Reporter: Maximilian Michels
Assignee: Maximilian Michels
 Fix For: 0.10


 Accumulators should be sent periodically, as part of the heartbeat that sends 
 metrics. This allows them to be updated in real time.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...

2015-07-09 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/896#discussion_r34270017
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingAbstractRecordReader.java
 ---
@@ -57,6 +58,12 @@
 
private final BarrierBuffer barrierBuffer;
 
+   /**
+* Counters for the number of bytes read and records processed.
+*/
+   private LongCounter numRecordsRead = null;
--- End diff --

`null` initializations are actually redundant. They still get executed (for 
OpenJDK javac and Oracle javac), so it is actually overhead for no reason.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2340) Provide standalone mode for web interface of the JobManager

2015-07-09 Thread Ufuk Celebi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2340?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620767#comment-14620767
 ] 

Ufuk Celebi commented on FLINK-2340:


+1 I think separation is the way to go.

 Provide standalone mode for web interface of the JobManager
 ---

 Key: FLINK-2340
 URL: https://issues.apache.org/jira/browse/FLINK-2340
 Project: Flink
  Issue Type: Improvement
  Components: JobManager
Affects Versions: 0.10
Reporter: Maximilian Michels

 With the latest changes to enable high availability, the web interface's 
 address may switch to one of the standby job manager nodes. As an 
 enhancement, we could decouple the web interface and let it automatically 
 retrieve data from the currently leading job manager.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...

2015-07-09 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/896#issuecomment-120058469
  
The naming of the accumulators refers sometimes to flink vs. 
user-defined, and sometimes to internal vs. external. Can we make this 
consistent? I actually like the flink vs. user-defined naming better.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...

2015-07-09 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/896#discussion_r34271710
  
--- Diff: 
flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java
 ---
@@ -60,6 +60,7 @@ public static void main(String[] args) throws Exception {

// set up the execution environment
final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+   env.setParallelism(1);
--- End diff --

Probably the reason that the YARN tests are broken...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2341) Deadlock in SpilledSubpartitionViewAsyncIO

2015-07-09 Thread Ufuk Celebi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2341?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620782#comment-14620782
 ] 

Ufuk Celebi commented on FLINK-2341:


Thanks for the stacktrace. I will look into it soon. The asynchronous variant 
is not used by default, so this does not affect any user until it's fixed.

 Deadlock in SpilledSubpartitionViewAsyncIO
 --

 Key: FLINK-2341
 URL: https://issues.apache.org/jira/browse/FLINK-2341
 Project: Flink
  Issue Type: Bug
  Components: Distributed Runtime
Affects Versions: 0.9, 0.10
Reporter: Stephan Ewen
Assignee: Ufuk Celebi
Priority: Critical
 Fix For: 0.9, 0.10


 It may be that the deadlock is because of the way the 
 {{SpilledSubpartitionViewTest}} is written
 {code}
 Found one Java-level deadlock:
 =
 pool-25-thread-2:
   waiting to lock monitor 0x7f66f4932468 (object 0xfa1478f0, a 
 java.lang.Object),
   which is held by IOManager reader thread #1
 IOManager reader thread #1:
   waiting to lock monitor 0x7f66f4931160 (object 0xfa029768, a 
 java.lang.Object),
   which is held by pool-25-thread-2
 Java stack information for the threads listed above:
 ===
 pool-25-thread-2:
   at 
 org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO.notifyError(SpilledSubpartitionViewAsyncIO.java:304)
   - waiting to lock 0xfa1478f0 (a java.lang.Object)
   at 
 org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO.onAvailableBuffer(SpilledSubpartitionViewAsyncIO.java:256)
   at 
 org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO.access$300(SpilledSubpartitionViewAsyncIO.java:42)
   at 
 org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO$BufferProviderCallback.onEvent(SpilledSubpartitionViewAsyncIO.java:367)
   at 
 org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO$BufferProviderCallback.onEvent(SpilledSubpartitionViewAsyncIO.java:353)
   at 
 org.apache.flink.runtime.io.network.util.TestPooledBufferProvider$PooledBufferProviderRecycler.recycle(TestPooledBufferProvider.java:135)
   - locked 0xfa029768 (a java.lang.Object)
   at 
 org.apache.flink.runtime.io.network.buffer.Buffer.recycle(Buffer.java:119)
   - locked 0xfa3a1a20 (a java.lang.Object)
   at 
 org.apache.flink.runtime.io.network.util.TestSubpartitionConsumer.call(TestSubpartitionConsumer.java:95)
   at 
 org.apache.flink.runtime.io.network.util.TestSubpartitionConsumer.call(TestSubpartitionConsumer.java:39)
   at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
   at java.util.concurrent.FutureTask.run(FutureTask.java:166)
   at 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1146)
   at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
   at java.lang.Thread.run(Thread.java:701)
 IOManager reader thread #1:
   at 
 org.apache.flink.runtime.io.network.util.TestPooledBufferProvider$PooledBufferProviderRecycler.recycle(TestPooledBufferProvider.java:127)
   - waiting to lock 0xfa029768 (a java.lang.Object)
   at 
 org.apache.flink.runtime.io.network.buffer.Buffer.recycle(Buffer.java:119)
   - locked 0xfa3a1ea0 (a java.lang.Object)
   at 
 org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO.returnBufferFromIOThread(SpilledSubpartitionViewAsyncIO.java:270)
   - locked 0xfa1478f0 (a java.lang.Object)
   at 
 org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO.access$100(SpilledSubpartitionViewAsyncIO.java:42)
   at 
 org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO$IOThreadCallback.requestSuccessful(SpilledSubpartitionViewAsyncIO.java:338)
   at 
 org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO$IOThreadCallback.requestSuccessful(SpilledSubpartitionViewAsyncIO.java:328)
   at 
 org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.handleProcessedBuffer(AsynchronousFileIOChannel.java:199)
   at 
 org.apache.flink.runtime.io.disk.iomanager.BufferReadRequest.requestDone(AsynchronousFileIOChannel.java:431)
   at 
 org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$ReaderThread.run(IOManagerAsync.java:377)
 {code}
 The full log with the deadlock stack traces can be found here:
 https://s3.amazonaws.com/archive.travis-ci.org/jobs/70232347/log.txt



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2292) Report accumulators periodically while job is running

2015-07-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620780#comment-14620780
 ] 

ASF GitHub Bot commented on FLINK-2292:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/896#discussion_r34273441
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ---
@@ -601,6 +636,11 @@ void markFinished() {
}
}
 
+   synchronized (accumulatorLock) {
--- End diff --

Ignore this comment. The lock is good :-)


 Report accumulators periodically while job is running
 -

 Key: FLINK-2292
 URL: https://issues.apache.org/jira/browse/FLINK-2292
 Project: Flink
  Issue Type: Sub-task
  Components: JobManager, TaskManager
Reporter: Maximilian Michels
Assignee: Maximilian Michels
 Fix For: 0.10


 Accumulators should be sent periodically, as part of the heartbeat that sends 
 metrics. This allows them to be updated in real time.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2292) Report accumulators periodically while job is running

2015-07-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620756#comment-14620756
 ] 

ASF GitHub Bot commented on FLINK-2292:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/896#discussion_r34272101
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -306,99 +308,100 @@ extends Actor with ActorLogMessages with 
ActorSynchronousLogging {
 if (!isConnected) {
   log.debug(sDropping message $message because the TaskManager is 
currently  +
 not connected to a JobManager.)
-}
+} else {
 
-// we order the messages by frequency, to make sure the code paths for 
matching
-// are as short as possible
-message match {
+  // we order the messages by frequency, to make sure the code paths 
for matching
+  // are as short as possible
+  message match {
+
+// tell the task about the availability of a new input partition
+case UpdateTaskSinglePartitionInfo(executionID, resultID, 
partitionInfo) =
+  updateTaskInputPartitions(executionID, List((resultID, 
partitionInfo)))
+
+// tell the task about the availability of some new input 
partitions
+case UpdateTaskMultiplePartitionInfos(executionID, partitionInfos) 
=
+  updateTaskInputPartitions(executionID, partitionInfos)
+
+// discards intermediate result partitions of a task execution on 
this TaskManager
+case FailIntermediateResultPartitions(executionID) =
+  log.info(Discarding the results produced by task execution  + 
executionID)
+  if (network.isAssociated) {
+try {
+  
network.getPartitionManager.releasePartitionsProducedBy(executionID)
+} catch {
+  case t: Throwable = killTaskManagerFatal(
+Fatal leak: Unable to release intermediate result 
partition data, t)
+}
+  }
 
-  // tell the task about the availability of a new input partition
-  case UpdateTaskSinglePartitionInfo(executionID, resultID, 
partitionInfo) =
-updateTaskInputPartitions(executionID, List((resultID, 
partitionInfo)))
+// notifies the TaskManager that the state of a task has changed.
+// the TaskManager informs the JobManager and cleans up in case 
the transition
+// was into a terminal state, or in case the JobManager cannot be 
informed of the
+// state transition
 
-  // tell the task about the availability of some new input partitions
-  case UpdateTaskMultiplePartitionInfos(executionID, partitionInfos) =
-updateTaskInputPartitions(executionID, partitionInfos)
+case updateMsg@UpdateTaskExecutionState(taskExecutionState: 
TaskExecutionState) =
 
-  // discards intermediate result partitions of a task execution on 
this TaskManager
-  case FailIntermediateResultPartitions(executionID) =
-log.info(Discarding the results produced by task execution  + 
executionID)
-if (network.isAssociated) {
-  try {
-
network.getPartitionManager.releasePartitionsProducedBy(executionID)
-  } catch {
-case t: Throwable = killTaskManagerFatal(
-Fatal leak: Unable to release intermediate result 
partition data, t)
-  }
-}
+  // we receive these from our tasks and forward them to the 
JobManager
--- End diff --

Here is a lot of changed code that was seemingly edited without need (has 
nothing to do with the accumulators). Since that is pretty sensitive code, I 
feel very hesitant to commit these massive edits. What was the reason for these 
changes in the first place?


 Report accumulators periodically while job is running
 -

 Key: FLINK-2292
 URL: https://issues.apache.org/jira/browse/FLINK-2292
 Project: Flink
  Issue Type: Sub-task
  Components: JobManager, TaskManager
Reporter: Maximilian Michels
Assignee: Maximilian Michels
 Fix For: 0.10


 Accumulators should be sent periodically, as part of the heartbeat that sends 
 metrics. This allows them to be updated in real time.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...

2015-07-09 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/896#discussion_r34272101
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -306,99 +308,100 @@ extends Actor with ActorLogMessages with 
ActorSynchronousLogging {
 if (!isConnected) {
   log.debug(sDropping message $message because the TaskManager is 
currently  +
 not connected to a JobManager.)
-}
+} else {
 
-// we order the messages by frequency, to make sure the code paths for 
matching
-// are as short as possible
-message match {
+  // we order the messages by frequency, to make sure the code paths 
for matching
+  // are as short as possible
+  message match {
+
+// tell the task about the availability of a new input partition
+case UpdateTaskSinglePartitionInfo(executionID, resultID, 
partitionInfo) =
+  updateTaskInputPartitions(executionID, List((resultID, 
partitionInfo)))
+
+// tell the task about the availability of some new input 
partitions
+case UpdateTaskMultiplePartitionInfos(executionID, partitionInfos) 
=
+  updateTaskInputPartitions(executionID, partitionInfos)
+
+// discards intermediate result partitions of a task execution on 
this TaskManager
+case FailIntermediateResultPartitions(executionID) =
+  log.info(Discarding the results produced by task execution  + 
executionID)
+  if (network.isAssociated) {
+try {
+  
network.getPartitionManager.releasePartitionsProducedBy(executionID)
+} catch {
+  case t: Throwable = killTaskManagerFatal(
+Fatal leak: Unable to release intermediate result 
partition data, t)
+}
+  }
 
-  // tell the task about the availability of a new input partition
-  case UpdateTaskSinglePartitionInfo(executionID, resultID, 
partitionInfo) =
-updateTaskInputPartitions(executionID, List((resultID, 
partitionInfo)))
+// notifies the TaskManager that the state of a task has changed.
+// the TaskManager informs the JobManager and cleans up in case 
the transition
+// was into a terminal state, or in case the JobManager cannot be 
informed of the
+// state transition
 
-  // tell the task about the availability of some new input partitions
-  case UpdateTaskMultiplePartitionInfos(executionID, partitionInfos) =
-updateTaskInputPartitions(executionID, partitionInfos)
+case updateMsg@UpdateTaskExecutionState(taskExecutionState: 
TaskExecutionState) =
 
-  // discards intermediate result partitions of a task execution on 
this TaskManager
-  case FailIntermediateResultPartitions(executionID) =
-log.info(Discarding the results produced by task execution  + 
executionID)
-if (network.isAssociated) {
-  try {
-
network.getPartitionManager.releasePartitionsProducedBy(executionID)
-  } catch {
-case t: Throwable = killTaskManagerFatal(
-Fatal leak: Unable to release intermediate result 
partition data, t)
-  }
-}
+  // we receive these from our tasks and forward them to the 
JobManager
--- End diff --

Here is a lot of changed code that was seemingly edited without need (has 
nothing to do with the accumulators). Since that is pretty sensitive code, I 
feel very hesitant to commit these massive edits. What was the reason for these 
changes in the first place?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2292) Report accumulators periodically while job is running

2015-07-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620773#comment-14620773
 ] 

ASF GitHub Bot commented on FLINK-2292:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/896#discussion_r34273000
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ---
@@ -131,6 +133,35 @@

private SerializedValueStateHandle? operatorState;
 
+   /* Lock for updating the accumulators atomically. */
--- End diff --

Why not follow the style of the remaining class, with respect to empty 
lines between fields?


 Report accumulators periodically while job is running
 -

 Key: FLINK-2292
 URL: https://issues.apache.org/jira/browse/FLINK-2292
 Project: Flink
  Issue Type: Sub-task
  Components: JobManager, TaskManager
Reporter: Maximilian Michels
Assignee: Maximilian Michels
 Fix For: 0.10


 Accumulators should be sent periodically, as part of the heartbeat that sends 
 metrics. This allows them to be updated in real time.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...

2015-07-09 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/896#discussion_r34273000
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ---
@@ -131,6 +133,35 @@

private SerializedValueStateHandle? operatorState;
 
+   /* Lock for updating the accumulators atomically. */
--- End diff --

Why not follow the style of the remaining class, with respect to empty 
lines between fields?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2280) GenericTypeComparator.compare() does not respect ascending flag

2015-07-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620081#comment-14620081
 ] 

ASF GitHub Bot commented on FLINK-2280:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/894#issuecomment-119865296
  
Good catch!

+1 to merge


 GenericTypeComparator.compare() does not respect ascending flag
 ---

 Key: FLINK-2280
 URL: https://issues.apache.org/jira/browse/FLINK-2280
 Project: Flink
  Issue Type: Bug
  Components: Core
Affects Versions: 0.9
Reporter: Fabian Hueske
Assignee: Fabian Hueske
Priority: Critical
 Fix For: 0.9.1


 The {{GenericTypeComparator.compare()}} method does not respect the 
 {{ascending}} flag that is used for inverted sorting.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2240) Use BloomFilter to minimize probe side records which are spilled to disk in Hybrid-Hash-Join

2015-07-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620119#comment-14620119
 ] 

ASF GitHub Bot commented on FLINK-2240:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/888#issuecomment-119873635
  
Nice, thank you.

I will try to take a look soon...


 Use BloomFilter to minimize probe side records which are spilled to disk in 
 Hybrid-Hash-Join
 

 Key: FLINK-2240
 URL: https://issues.apache.org/jira/browse/FLINK-2240
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Chengxiang Li
Assignee: Chengxiang Li
Priority: Minor

 In Hybrid-Hash-Join, while small table does not fit into memory, part of the 
 small table data would be spilled to disk, and the counterpart partition of 
 big table data would be spilled to disk in probe phase as well. If we build a 
 BloomFilter while spill small table to disk during build phase, and use it to 
 filter the big table records which tend to be spilled to disk, this may 
 greatly  reduce the spilled big table file size, and saved the disk IO cost 
 for writing and further reading.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2240] Use BloomFilter to filter probe r...

2015-07-09 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/888#issuecomment-119873635
  
Nice, thank you.

I will try to take a look soon...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2293) Division by Zero Exception

2015-07-09 Thread Andra Lungu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620123#comment-14620123
 ] 

Andra Lungu commented on FLINK-2293:


I should not have tried anything yesterday... I was too tired to make things 
work. The flink version on the cluster was updated, however I forgot to update 
the jar. I am doing this as we speak and will keep you posted once the job 
finishes. Sorry! 

 Division by Zero Exception
 --

 Key: FLINK-2293
 URL: https://issues.apache.org/jira/browse/FLINK-2293
 Project: Flink
  Issue Type: Bug
  Components: Local Runtime
Affects Versions: 0.9, 0.10
Reporter: Andra Lungu
Assignee: Stephan Ewen
Priority: Critical
 Fix For: 0.10, 0.9.1


 I am basically running an algorithm that simulates a Gather Sum Apply 
 Iteration that performs Traingle Count (Why simulate it? Because you just 
 need a superstep - useless overhead if you use the runGatherSumApply 
 function in Graph).
 What happens, at a high level:
 1). Select neighbors with ID greater than the one corresponding to the 
 current vertex;
 2). Propagate the received values to neighbors with higher ID;
 3). compute the number of triangles by checking whether
 trgVertex.getValue().get(srcVertex.getId());
 As you can see, I *do not* perform any division at all;
 code is here: 
 https://github.com/andralungu/gelly-partitioning/blob/master/src/main/java/example/GSATriangleCount.java
 Now for small graphs, 50MB max, the computation finishes nicely with the 
 correct result. For a 10GB graph, however, I got this:
 java.lang.ArithmeticException: / by zero
 at 
 org.apache.flink.runtime.operators.hash.MutableHashTable.insertIntoTable(MutableHashTable.java:836)
 at 
 org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:819)
 at 
 org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:508)
 at 
 org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:544)
 at 
 org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104)
 at 
 org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173)
 at 
 org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
 at 
 org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
 at java.lang.Thread.run(Thread.java:722)
 see the full log here: https://gist.github.com/andralungu/984774f6348269df7951



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2280) GenericTypeComparator.compare() does not respect ascending flag

2015-07-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620130#comment-14620130
 ] 

ASF GitHub Bot commented on FLINK-2280:
---

Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/894#issuecomment-119875351
  
Merging


 GenericTypeComparator.compare() does not respect ascending flag
 ---

 Key: FLINK-2280
 URL: https://issues.apache.org/jira/browse/FLINK-2280
 Project: Flink
  Issue Type: Bug
  Components: Core
Affects Versions: 0.9
Reporter: Fabian Hueske
Assignee: Fabian Hueske
Priority: Critical
 Fix For: 0.9.1


 The {{GenericTypeComparator.compare()}} method does not respect the 
 {{ascending}} flag that is used for inverted sorting.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-1239) Fix iteration example getting stuck with large input

2015-07-09 Thread Gyula Fora (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-1239?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora closed FLINK-1239.
-
Resolution: Not A Problem

 Fix iteration example getting stuck with large input
 

 Key: FLINK-1239
 URL: https://issues.apache.org/jira/browse/FLINK-1239
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Reporter: Gábor Hermann
Assignee: Gábor Hermann

 When running the streaming iteration example with buffer timeout set to 0 
 (meaning the StreamRecorWriter gets flushed after every emit in every task), 
 the iteration gets stuck at flushing the output after emitting a record. This 
 happens only on larger number of inputs (eg. 1000 record to iterate on).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-2261) Remove reduce/aggregation from DataStream

2015-07-09 Thread Gyula Fora (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2261?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora closed FLINK-2261.
-
Resolution: Fixed

 Remove reduce/aggregation from DataStream
 -

 Key: FLINK-2261
 URL: https://issues.apache.org/jira/browse/FLINK-2261
 Project: Flink
  Issue Type: Improvement
  Components: Java API, Scala API, Streaming
Affects Versions: 0.10
Reporter: Gyula Fora
Assignee: Gyula Fora

 Currently we have reduce and aggregation methods for non-grouped DataStreams 
 as well, which will produce local aggregates depending on the parallelism of 
 the operator.
 This behaviour is neither intuitive nor useful as it only produces sensible 
 results if the user specifically sets the parallelism to 1 which should not 
 be encouraged. 
 I would like to remove these methods from the DataStream api and only keep it 
 for GroupedDataStreams and WindowedDataStream where the aggregation is either 
 executed per-key or per-window.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2334) IOException: Channel to path could not be opened

2015-07-09 Thread David Heller (JIRA)
David Heller created FLINK-2334:
---

 Summary: IOException: Channel to path could not be opened
 Key: FLINK-2334
 URL: https://issues.apache.org/jira/browse/FLINK-2334
 Project: Flink
  Issue Type: Bug
Affects Versions: 0.9
 Environment: local and cluster environment; Linux and MacOS
Reporter: David Heller
Priority: Minor


We've encountered an IOException due to missing temporary files (see stacktrace 
at the bottom). It occurred both in local and cluster execution and on MacOS as 
well as on linux. Data size does not seem to be the reason: the error occurred 
on a 6.5GB dataset as well as on a small 400MB dataset.

Our code uses Bulk iterations and the suspicion is that cached build-side files 
are accidentally removed too early. As far as we observed it, the exception 
always happens in an iteration later than the first one (mostly iteration 2).

Interestingly, on one occasion the error disappeared consistently when setting 
the number of maximum iterations higher (from 2 to 6).
On another occasion, the exception appeared when adding a simple map operator 
at the end (holding the identity function).
Generally, the error is quite hard to reproduce.

The stacktrace:

java.io.IOException: Channel to path 
'/var/folders/xx/0dd3w4jd7fbb4ytmhqxm157hgn/T/flink-io-f5061483-ff59-43dc-883f-79af813d5804/19a70637e025c7ee3919b30239060895.23.channel'
 could not be opened.
at 
org.apache.flink.runtime.io.disk.iomanager.AbstractFileIOChannel.init(AbstractFileIOChannel.java:61)
at 
org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.init(AsynchronousFileIOChannel.java:86)
at 
org.apache.flink.runtime.io.disk.iomanager.AsynchronousBulkBlockReader.init(AsynchronousBulkBlockReader.java:46)
at 
org.apache.flink.runtime.io.disk.iomanager.AsynchronousBulkBlockReader.init(AsynchronousBulkBlockReader.java:39)
at 
org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync.createBulkBlockChannelReader(IOManagerAsync.java:263)
at 
org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:751)
at 
org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:508)
at 
org.apache.flink.runtime.operators.hash.ReOpenableMutableHashTable.prepareNextPartition(ReOpenableMutableHashTable.java:167)
at 
org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:544)
at 
org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104)
at 
org.apache.flink.runtime.operators.AbstractCachedBuildSideMatchDriver.run(AbstractCachedBuildSideMatchDriver.java:155)
at 
org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
at 
org.apache.flink.runtime.iterative.task.AbstractIterativePactTask.run(AbstractIterativePactTask.java:139)
at 
org.apache.flink.runtime.iterative.task.IterationIntermediatePactTask.run(IterationIntermediatePactTask.java:92)
at 
org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)

Caused by: java.io.FileNotFoundException: 
/var/folders/xx/0dd3w4jd7fbb4ytmhqxm157hgn/T/flink-io-f5061483-ff59-43dc-883f-79af813d5804/19a70637e025c7ee3919b30239060895.23.channel
 (No such file or directory)
at java.io.RandomAccessFile.open0(Native Method)
at java.io.RandomAccessFile.open(RandomAccessFile.java:316)
at java.io.RandomAccessFile.init(RandomAccessFile.java:243)
at java.io.RandomAccessFile.init(RandomAccessFile.java:124)
at 
org.apache.flink.runtime.io.disk.iomanager.AbstractFileIOChannel.init(AbstractFileIOChannel.java:57)
... 16 more



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2335) Rework iteration construction in StreamGraph

2015-07-09 Thread Gyula Fora (JIRA)
Gyula Fora created FLINK-2335:
-

 Summary: Rework iteration construction in StreamGraph
 Key: FLINK-2335
 URL: https://issues.apache.org/jira/browse/FLINK-2335
 Project: Flink
  Issue Type: Improvement
  Components: Streaming
Reporter: Gyula Fora
Assignee: Gyula Fora


Currently the nodes representing the extra sinks and sources are incrementally 
added to the streamgraph when the user creates the iterative parts of the 
program.

This makes it difficult to enforce different partitioning schemes on the 
feedback edges and also makes it virtually impossible to handle more iteration 
heads with different parallelism.

The actual nodes in the streamgraph for the iteration sinks/sources should only 
be created when the program is finalized  after the user calls execute and 
before we create the jobgraph.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2336) ArrayIndexOufOBoundsException in TypeExtractor when mapping

2015-07-09 Thread William Saar (JIRA)
William Saar created FLINK-2336:
---

 Summary: ArrayIndexOufOBoundsException in TypeExtractor when 
mapping
 Key: FLINK-2336
 URL: https://issues.apache.org/jira/browse/FLINK-2336
 Project: Flink
  Issue Type: Bug
Affects Versions: master
Reporter: William Saar


The line that causes this is
DataStreamO outputStream = insideIterationStream.filter(outputFilter).map(m 
- m.outputMessage);

Problem occurs both when compiled using Javac and Eclipse's JDT compiler (in an 
environment where simple lambda type tests work)

Exception in thread main java.lang.ArrayIndexOutOfBoundsException: -1
at java.util.ArrayList.elementData(Unknown Source)
at java.util.ArrayList.get(Unknown Source)
at 
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoFromInputs(TypeExtractor.java:553)
at 
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:468)
at 
org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:370)
at 
org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:254)
at 
org.apache.flink.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:91)
at 
org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:605)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2336) ArrayIndexOufOBoundsException in TypeExtractor when mapping

2015-07-09 Thread William Saar (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2336?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620345#comment-14620345
 ] 

William Saar commented on FLINK-2336:
-

When replacing the lambda with a MapFunction instance, one gets the following 
clearer error message

Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Type of 
TypeVariable 'O' in 'class ...' could not be determined. This is most likely a 
type erasure problem. The type extraction currently supports types with generic 
variables only in cases where all variables in the return type can be deduced 
from the input type(s).
at 
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:473)
at 
org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:361)

 ArrayIndexOufOBoundsException in TypeExtractor when mapping
 ---

 Key: FLINK-2336
 URL: https://issues.apache.org/jira/browse/FLINK-2336
 Project: Flink
  Issue Type: Bug
Affects Versions: master
Reporter: William Saar

 The line that causes this is
 DataStreamO outputStream = insideIterationStream.filter(outputFilter).map(m 
 - m.outputMessage);
 Problem occurs both when compiled using Javac and Eclipse's JDT compiler (in 
 an environment where simple lambda type tests work)
 Exception in thread main java.lang.ArrayIndexOutOfBoundsException: -1
   at java.util.ArrayList.elementData(Unknown Source)
   at java.util.ArrayList.get(Unknown Source)
   at 
 org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoFromInputs(TypeExtractor.java:553)
   at 
 org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:468)
   at 
 org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:370)
   at 
 org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:254)
   at 
 org.apache.flink.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:91)
   at 
 org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:605)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2280] GenericTypeComparator.compare() r...

2015-07-09 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/894#issuecomment-119865296
  
Good catch!

+1 to merge


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-2333) Stream Data Sink that periodically rolls files

2015-07-09 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-2333:
---

 Summary: Stream Data Sink that periodically rolls files 
 Key: FLINK-2333
 URL: https://issues.apache.org/jira/browse/FLINK-2333
 Project: Flink
  Issue Type: New Feature
  Components: Streaming
Affects Versions: 0.10
Reporter: Stephan Ewen


It would be useful to have a file data sink for streams that starts a new file 
every n elements or records.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2293) Division by Zero Exception

2015-07-09 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620116#comment-14620116
 ] 

Stephan Ewen commented on FLINK-2293:
-

[~andralungu] The line numbers in your stack trace (where the exception occurs, 
and the frame below as well) correspond to empty lines or comment lines in the 
source code of the latest master.

See here: 
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java#L836

Are you sure you are using the code after I added the patch?

 Division by Zero Exception
 --

 Key: FLINK-2293
 URL: https://issues.apache.org/jira/browse/FLINK-2293
 Project: Flink
  Issue Type: Bug
  Components: Local Runtime
Affects Versions: 0.9, 0.10
Reporter: Andra Lungu
Assignee: Stephan Ewen
Priority: Critical
 Fix For: 0.10, 0.9.1


 I am basically running an algorithm that simulates a Gather Sum Apply 
 Iteration that performs Traingle Count (Why simulate it? Because you just 
 need a superstep - useless overhead if you use the runGatherSumApply 
 function in Graph).
 What happens, at a high level:
 1). Select neighbors with ID greater than the one corresponding to the 
 current vertex;
 2). Propagate the received values to neighbors with higher ID;
 3). compute the number of triangles by checking whether
 trgVertex.getValue().get(srcVertex.getId());
 As you can see, I *do not* perform any division at all;
 code is here: 
 https://github.com/andralungu/gelly-partitioning/blob/master/src/main/java/example/GSATriangleCount.java
 Now for small graphs, 50MB max, the computation finishes nicely with the 
 correct result. For a 10GB graph, however, I got this:
 java.lang.ArithmeticException: / by zero
 at 
 org.apache.flink.runtime.operators.hash.MutableHashTable.insertIntoTable(MutableHashTable.java:836)
 at 
 org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:819)
 at 
 org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:508)
 at 
 org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:544)
 at 
 org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104)
 at 
 org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173)
 at 
 org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
 at 
 org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
 at java.lang.Thread.run(Thread.java:722)
 see the full log here: https://gist.github.com/andralungu/984774f6348269df7951



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2293) Division by Zero Exception

2015-07-09 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620117#comment-14620117
 ] 

Stephan Ewen commented on FLINK-2293:
-

You can look at the beginning of the log, it writes the code revision out. Can 
you post which one it is?

 Division by Zero Exception
 --

 Key: FLINK-2293
 URL: https://issues.apache.org/jira/browse/FLINK-2293
 Project: Flink
  Issue Type: Bug
  Components: Local Runtime
Affects Versions: 0.9, 0.10
Reporter: Andra Lungu
Assignee: Stephan Ewen
Priority: Critical
 Fix For: 0.10, 0.9.1


 I am basically running an algorithm that simulates a Gather Sum Apply 
 Iteration that performs Traingle Count (Why simulate it? Because you just 
 need a superstep - useless overhead if you use the runGatherSumApply 
 function in Graph).
 What happens, at a high level:
 1). Select neighbors with ID greater than the one corresponding to the 
 current vertex;
 2). Propagate the received values to neighbors with higher ID;
 3). compute the number of triangles by checking whether
 trgVertex.getValue().get(srcVertex.getId());
 As you can see, I *do not* perform any division at all;
 code is here: 
 https://github.com/andralungu/gelly-partitioning/blob/master/src/main/java/example/GSATriangleCount.java
 Now for small graphs, 50MB max, the computation finishes nicely with the 
 correct result. For a 10GB graph, however, I got this:
 java.lang.ArithmeticException: / by zero
 at 
 org.apache.flink.runtime.operators.hash.MutableHashTable.insertIntoTable(MutableHashTable.java:836)
 at 
 org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:819)
 at 
 org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:508)
 at 
 org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:544)
 at 
 org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104)
 at 
 org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173)
 at 
 org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
 at 
 org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
 at java.lang.Thread.run(Thread.java:722)
 see the full log here: https://gist.github.com/andralungu/984774f6348269df7951



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2280] GenericTypeComparator.compare() r...

2015-07-09 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/894


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Resolved] (FLINK-1421) Implement a SAMOA Adapter for Flink Streaming

2015-07-09 Thread Gyula Fora (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-1421?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora resolved FLINK-1421.
---
Resolution: Done

 Implement a SAMOA Adapter for Flink Streaming
 -

 Key: FLINK-1421
 URL: https://issues.apache.org/jira/browse/FLINK-1421
 Project: Flink
  Issue Type: New Feature
  Components: Streaming
Reporter: Paris Carbone
Assignee: Paris Carbone
   Original Estimate: 336h
  Remaining Estimate: 336h

 Yahoo's Samoa is an experimental incremental machine learning library that 
 builds on an abstract compositional data streaming model to write streaming 
 algorithms. The task is to provide an adapter from SAMOA topologies to 
 Flink-streaming job graphs in order to support Flink as a backend engine for 
 SAMOA tasks.
 A statup guide can be viewed here :
 https://docs.google.com/document/d/18glDJDYmnFGT1UGtZimaxZpGeeg1Ch14NgDoymhPk2A/pub
 The main working branch of the adapter :
 https://github.com/senorcarbone/samoa/tree/flink-integration



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-2328) Applying more than one transformation on an IterativeDataStream fails

2015-07-09 Thread Gyula Fora (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2328?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora reassigned FLINK-2328:
-

Assignee: Gyula Fora

 Applying more than one transformation on an IterativeDataStream fails
 -

 Key: FLINK-2328
 URL: https://issues.apache.org/jira/browse/FLINK-2328
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Reporter: Gyula Fora
Assignee: Gyula Fora

 Currently the user cannot apply more than one transformation on the 
 IterativeDataStream directly.
 It fails because instead of creating one iteration source and connecting it 
 to the operators it will try to create two iteration sources which fails on 
 the shared broker slot.
 A workaround is to use a no-op mapper on the iterative stream and applying 
 the two transformations on that.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-2328) Applying more than one transformation on an IterativeDataStream fails

2015-07-09 Thread Gyula Fora (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2328?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora updated FLINK-2328:
--
Component/s: Streaming

 Applying more than one transformation on an IterativeDataStream fails
 -

 Key: FLINK-2328
 URL: https://issues.apache.org/jira/browse/FLINK-2328
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Reporter: Gyula Fora
Assignee: Gyula Fora

 Currently the user cannot apply more than one transformation on the 
 IterativeDataStream directly.
 It fails because instead of creating one iteration source and connecting it 
 to the operators it will try to create two iteration sources which fails on 
 the shared broker slot.
 A workaround is to use a no-op mapper on the iterative stream and applying 
 the two transformations on that.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2235) Local Flink cluster allocates too much memory

2015-07-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2235?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620163#comment-14620163
 ] 

ASF GitHub Bot commented on FLINK-2235:
---

Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/859#issuecomment-119883278
  
Typically, programs can allocate as much memory as they like. We only take 
a fraction of the free physical memory for the manged memory. We could also 
take only half of the physical memory. Or, alternatively, fail with an 
exception that the maximum memory for the JVM is not set (-Xmx is missing). In 
my opinion, it is ok to take a fraction of the physical memory for local 
execution.


 Local Flink cluster allocates too much memory
 -

 Key: FLINK-2235
 URL: https://issues.apache.org/jira/browse/FLINK-2235
 Project: Flink
  Issue Type: Bug
  Components: Local Runtime, TaskManager
Affects Versions: 0.9
 Environment: Oracle JDK: 1.6.0_65-b14-462
 Eclipse
Reporter: Maximilian Michels
Priority: Minor

 When executing a Flink job locally, the task manager gets initialized with an 
 insane amount of memory. After a quick look in the code it seems that the 
 call to {{EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag()}} 
 returns a wrong estimate of the heap memory size.
 Moreover, the same user switched to Oracle JDK 1.8 and that made the error 
 disappear. So I'm guessing this is some Java 1.6 quirk.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2235] fix calculation of free memory fo...

2015-07-09 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/859#issuecomment-119883278
  
Typically, programs can allocate as much memory as they like. We only take 
a fraction of the free physical memory for the manged memory. We could also 
take only half of the physical memory. Or, alternatively, fail with an 
exception that the maximum memory for the JVM is not set (-Xmx is missing). In 
my opinion, it is ok to take a fraction of the physical memory for local 
execution.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2305) Add documenation about Storm compatibility layer

2015-07-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620102#comment-14620102
 ] 

ASF GitHub Bot commented on FLINK-2305:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/884#issuecomment-119870990
  
Let's make it a separate issue.

If there is a followup on these two issues, we can merge this, in my 
opinion.


 Add documenation about Storm compatibility layer
 

 Key: FLINK-2305
 URL: https://issues.apache.org/jira/browse/FLINK-2305
 Project: Flink
  Issue Type: Task
  Components: Documentation
Reporter: Matthias J. Sax
Assignee: Matthias J. Sax

 Storm compatibility layer is currently no documented at the project web site.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2305] Add documenation about Storm comp...

2015-07-09 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/884#issuecomment-119870990
  
Let's make it a separate issue.

If there is a followup on these two issues, we can merge this, in my 
opinion.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2280) GenericTypeComparator.compare() does not respect ascending flag

2015-07-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620135#comment-14620135
 ] 

ASF GitHub Bot commented on FLINK-2280:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/894


 GenericTypeComparator.compare() does not respect ascending flag
 ---

 Key: FLINK-2280
 URL: https://issues.apache.org/jira/browse/FLINK-2280
 Project: Flink
  Issue Type: Bug
  Components: Core
Affects Versions: 0.9
Reporter: Fabian Hueske
Assignee: Fabian Hueske
Priority: Critical
 Fix For: 0.9.1


 The {{GenericTypeComparator.compare()}} method does not respect the 
 {{ascending}} flag that is used for inverted sorting.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-2280) GenericTypeComparator.compare() does not respect ascending flag

2015-07-09 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2280?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske closed FLINK-2280.

Resolution: Fixed

Fixed with 590c6d0644e59d0bc3b5b6f4869f91bc38864232

 GenericTypeComparator.compare() does not respect ascending flag
 ---

 Key: FLINK-2280
 URL: https://issues.apache.org/jira/browse/FLINK-2280
 Project: Flink
  Issue Type: Bug
  Components: Core
Affects Versions: 0.9
Reporter: Fabian Hueske
Assignee: Fabian Hueske
Priority: Critical
 Fix For: 0.9.1


 The {{GenericTypeComparator.compare()}} method does not respect the 
 {{ascending}} flag that is used for inverted sorting.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-2338) Shut down Storm Topologies cleanly

2015-07-09 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2338?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated FLINK-2338:
---
Summary: Shut down Storm Topologies cleanly  (was: Shut down Storm 
Topologies clenaly)

 Shut down Storm Topologies cleanly
 

 Key: FLINK-2338
 URL: https://issues.apache.org/jira/browse/FLINK-2338
 Project: Flink
  Issue Type: Improvement
  Components: flink-contrib
Reporter: Matthias J. Sax
Assignee: Matthias J. Sax
Priority: Minor

 Currently, it is not possible to stop a Flink streaming program in a clean 
 way. Thus, emulating Storm's kill command is done the hard way resulting 
 in the following exception shown in the log:
 org.apache.flink.runtime.client.JobExecutionException: Communication with 
 JobManager failed: Lost connection to JobManager akka://flink/user/jobmanager
 at 
 org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:169)
 at 
 org.apache.flink.runtime.minicluster.FlinkMiniCluster.submitJobAndWait(FlinkMiniCluster.scala:205)
 at 
 org.apache.flink.runtime.minicluster.FlinkMiniCluster.submitJobAndWait(FlinkMiniCluster.scala:195)
 at 
 org.apache.flink.streaming.util.TestStreamEnvironment$1.run(TestStreamEnvironment.java:116)
 Caused by: java.lang.Exception: Lost connection to JobManager 
 akka://flink/user/jobmanager
 at 
 org.apache.flink.runtime.client.JobClientActor.onReceive(JobClientActor.java:131)
 at 
 akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)
 at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
 at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
 at 
 akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46)
 at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:369)
 at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:501)
 at akka.actor.ActorCell.invoke(ActorCell.scala:486)
 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
 at akka.dispatch.Mailbox.run(Mailbox.scala:221)
 at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
 at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at 
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at 
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at 
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 The exception is expected currently. However, a clean kill is preferable. 
 This can done after the new STOP signal is available 
 (https://issues.apache.org/jira/browse/FLINK-2111).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-2162) Implement adaptive learning rate strategies for SGD

2015-07-09 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-2162:
-
Assignee: Ventura Del Monte

 Implement adaptive learning rate strategies for SGD
 ---

 Key: FLINK-2162
 URL: https://issues.apache.org/jira/browse/FLINK-2162
 Project: Flink
  Issue Type: Improvement
  Components: Machine Learning Library
Reporter: Till Rohrmann
Assignee: Ventura Del Monte
Priority: Minor
  Labels: ML

 At the moment, the SGD implementation uses a simple adaptive learning rate 
 strategy, {{adaptedLearningRate = 
 initialLearningRate/sqrt(iterationNumber)}}, which makes the optimization 
 algorithm sensitive to the setting of the {{initialLearningRate}}. If this 
 value is chosen wrongly, then the SGD might become instable.
 There are better ways to calculate the learning rate [1] such as Adagrad [3], 
 Adadelta [4], SGD with momentum [5] others [2]. They promise to result in 
 more stable optimization algorithms which don't require so much 
 hyperparameter tweaking. It might be worthwhile to investigate these 
 approaches.
 It might also be interesting to look at the implementation of vowpal wabbit 
 [6].
 Resources:
 [1] [http://imgur.com/a/Hqolp]
 [2] [http://cs.stanford.edu/people/karpathy/convnetjs/demo/trainers.html]
 [3] [http://www.jmlr.org/papers/volume12/duchi11a/duchi11a.pdf]
 [4] [http://www.matthewzeiler.com/pubs/googleTR2012/googleTR2012.pdf]
 [5] [http://www.willamette.edu/~gorr/classes/cs449/momrate.html]
 [6] [https://github.com/JohnLangford/vowpal_wabbit]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2162) Implement adaptive learning rate strategies for SGD

2015-07-09 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620427#comment-14620427
 ] 

Till Rohrmann commented on FLINK-2162:
--

Great to hear [~ventura] :-) I've assigned you the issue.

 Implement adaptive learning rate strategies for SGD
 ---

 Key: FLINK-2162
 URL: https://issues.apache.org/jira/browse/FLINK-2162
 Project: Flink
  Issue Type: Improvement
  Components: Machine Learning Library
Reporter: Till Rohrmann
Assignee: Ventura Del Monte
Priority: Minor
  Labels: ML

 At the moment, the SGD implementation uses a simple adaptive learning rate 
 strategy, {{adaptedLearningRate = 
 initialLearningRate/sqrt(iterationNumber)}}, which makes the optimization 
 algorithm sensitive to the setting of the {{initialLearningRate}}. If this 
 value is chosen wrongly, then the SGD might become instable.
 There are better ways to calculate the learning rate [1] such as Adagrad [3], 
 Adadelta [4], SGD with momentum [5] others [2]. They promise to result in 
 more stable optimization algorithms which don't require so much 
 hyperparameter tweaking. It might be worthwhile to investigate these 
 approaches.
 It might also be interesting to look at the implementation of vowpal wabbit 
 [6].
 Resources:
 [1] [http://imgur.com/a/Hqolp]
 [2] [http://cs.stanford.edu/people/karpathy/convnetjs/demo/trainers.html]
 [3] [http://www.jmlr.org/papers/volume12/duchi11a/duchi11a.pdf]
 [4] [http://www.matthewzeiler.com/pubs/googleTR2012/googleTR2012.pdf]
 [5] [http://www.willamette.edu/~gorr/classes/cs449/momrate.html]
 [6] [https://github.com/JohnLangford/vowpal_wabbit]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2337) Multiple SLF4J bindings using Storm compatibility layer

2015-07-09 Thread Matthias J. Sax (JIRA)
Matthias J. Sax created FLINK-2337:
--

 Summary: Multiple SLF4J bindings using Storm compatibility layer
 Key: FLINK-2337
 URL: https://issues.apache.org/jira/browse/FLINK-2337
 Project: Flink
  Issue Type: Bug
  Components: flink-contrib
Reporter: Matthias J. Sax
Assignee: Matthias J. Sax
Priority: Minor


Storm depends on logback as slf4j implemenation but Flink uses log4j. The log 
shows the following conflict:

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in 
[jar:file:/home/cicero/.m2/repository/ch/qos/logback/logback-classic/1.0.13/logback-classic-1.0.13.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in 
[jar:file:/home/cicero/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.

Need to exclude logback from storm dependencies to fix this.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2338) Shut down Storm Topologies clenaly

2015-07-09 Thread Matthias J. Sax (JIRA)
Matthias J. Sax created FLINK-2338:
--

 Summary: Shut down Storm Topologies clenaly
 Key: FLINK-2338
 URL: https://issues.apache.org/jira/browse/FLINK-2338
 Project: Flink
  Issue Type: Improvement
  Components: flink-contrib
Reporter: Matthias J. Sax
Assignee: Matthias J. Sax
Priority: Minor


Currently, it is not possible to stop a Flink streaming program in a clean way. 
Thus, emulating Storm's kill command is done the hard way resulting in the 
following exception shown in the log:

org.apache.flink.runtime.client.JobExecutionException: Communication with 
JobManager failed: Lost connection to JobManager akka://flink/user/jobmanager
at 
org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:169)
at 
org.apache.flink.runtime.minicluster.FlinkMiniCluster.submitJobAndWait(FlinkMiniCluster.scala:205)
at 
org.apache.flink.runtime.minicluster.FlinkMiniCluster.submitJobAndWait(FlinkMiniCluster.scala:195)
at 
org.apache.flink.streaming.util.TestStreamEnvironment$1.run(TestStreamEnvironment.java:116)
Caused by: java.lang.Exception: Lost connection to JobManager 
akka://flink/user/jobmanager
at 
org.apache.flink.runtime.client.JobClientActor.onReceive(JobClientActor.java:131)
at 
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at 
akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46)
at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:369)
at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:501)
at akka.actor.ActorCell.invoke(ActorCell.scala:486)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
at akka.dispatch.Mailbox.run(Mailbox.scala:221)
at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

The exception is expected currently. However, a clean kill is preferable. 
This can done after the new STOP signal is available 
(https://issues.apache.org/jira/browse/FLINK-2111).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2305) Add documenation about Storm compatibility layer

2015-07-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620418#comment-14620418
 ] 

ASF GitHub Bot commented on FLINK-2305:
---

Github user mjsax commented on the pull request:

https://github.com/apache/flink/pull/884#issuecomment-119943363
  
I just created two JIRAs:
- SLF4J: https://issues.apache.org/jira/browse/FLINK-2337
- JobExecutionException: https://issues.apache.org/jira/browse/FLINK-2338

This PR should be ready to get merged.


 Add documenation about Storm compatibility layer
 

 Key: FLINK-2305
 URL: https://issues.apache.org/jira/browse/FLINK-2305
 Project: Flink
  Issue Type: Task
  Components: Documentation
Reporter: Matthias J. Sax
Assignee: Matthias J. Sax

 Storm compatibility layer is currently no documented at the project web site.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2305] Add documenation about Storm comp...

2015-07-09 Thread mjsax
Github user mjsax commented on the pull request:

https://github.com/apache/flink/pull/884#issuecomment-119943363
  
I just created two JIRAs:
- SLF4J: https://issues.apache.org/jira/browse/FLINK-2337
- JobExecutionException: https://issues.apache.org/jira/browse/FLINK-2338

This PR should be ready to get merged.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2336) ArrayIndexOufOBoundsException in TypeExtractor when mapping

2015-07-09 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2336?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620389#comment-14620389
 ] 

Stephan Ewen commented on FLINK-2336:
-

Can you post a simpifies data type that reproduces this problem. Then we can 
make a patch to fix this!

 ArrayIndexOufOBoundsException in TypeExtractor when mapping
 ---

 Key: FLINK-2336
 URL: https://issues.apache.org/jira/browse/FLINK-2336
 Project: Flink
  Issue Type: Bug
Affects Versions: master
Reporter: William Saar

 The line that causes this is
 DataStreamO outputStream = insideIterationStream.filter(outputFilter).map(m 
 - m.outputMessage);
 Problem occurs both when compiled using Javac and Eclipse's JDT compiler (in 
 an environment where simple lambda type tests work)
 Exception in thread main java.lang.ArrayIndexOutOfBoundsException: -1
   at java.util.ArrayList.elementData(Unknown Source)
   at java.util.ArrayList.get(Unknown Source)
   at 
 org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoFromInputs(TypeExtractor.java:553)
   at 
 org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:468)
   at 
 org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:370)
   at 
 org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:254)
   at 
 org.apache.flink.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:91)
   at 
 org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:605)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2235] fix calculation of free memory fo...

2015-07-09 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/859#issuecomment-119891011
  
Okay, from skimming over some Oracle docs, it seems the default max heap is 
1/4 of the physical memory. Let's use that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2235) Local Flink cluster allocates too much memory

2015-07-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2235?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620197#comment-14620197
 ] 

ASF GitHub Bot commented on FLINK-2235:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/859#issuecomment-119891011
  
Okay, from skimming over some Oracle docs, it seems the default max heap is 
1/4 of the physical memory. Let's use that.


 Local Flink cluster allocates too much memory
 -

 Key: FLINK-2235
 URL: https://issues.apache.org/jira/browse/FLINK-2235
 Project: Flink
  Issue Type: Bug
  Components: Local Runtime, TaskManager
Affects Versions: 0.9
 Environment: Oracle JDK: 1.6.0_65-b14-462
 Eclipse
Reporter: Maximilian Michels
Priority: Minor

 When executing a Flink job locally, the task manager gets initialized with an 
 insane amount of memory. After a quick look in the code it seems that the 
 call to {{EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag()}} 
 returns a wrong estimate of the heap memory size.
 Moreover, the same user switched to Oracle JDK 1.8 and that made the error 
 disappear. So I'm guessing this is some Java 1.6 quirk.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2008][FLINK-2296] Fix checkpoint commit...

2015-07-09 Thread rmetzger
GitHub user rmetzger opened a pull request:

https://github.com/apache/flink/pull/895

[FLINK-2008][FLINK-2296] Fix checkpoint committing  KafkaITCase [wip]



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rmetzger/flink flink2008

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/895.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #895


commit 0d5773a56acb9f6d7592de9bf5a93f04e6600ca1
Author: Robert Metzger rmetz...@apache.org
Date:   2015-06-29T14:52:38Z

[FLINK-2008][FLINK-2296] Fix checkpoint committing  KafkaITCase




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2008) PersistentKafkaSource is sometimes emitting tuples multiple times

2015-07-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620459#comment-14620459
 ] 

ASF GitHub Bot commented on FLINK-2008:
---

GitHub user rmetzger opened a pull request:

https://github.com/apache/flink/pull/895

[FLINK-2008][FLINK-2296] Fix checkpoint committing  KafkaITCase [wip]



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rmetzger/flink flink2008

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/895.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #895


commit 0d5773a56acb9f6d7592de9bf5a93f04e6600ca1
Author: Robert Metzger rmetz...@apache.org
Date:   2015-06-29T14:52:38Z

[FLINK-2008][FLINK-2296] Fix checkpoint committing  KafkaITCase




 PersistentKafkaSource is sometimes emitting tuples multiple times
 -

 Key: FLINK-2008
 URL: https://issues.apache.org/jira/browse/FLINK-2008
 Project: Flink
  Issue Type: Bug
  Components: Kafka Connector, Streaming
Affects Versions: 0.9
Reporter: Robert Metzger
Assignee: Robert Metzger

 The PersistentKafkaSource is expected to emit records exactly once.
 Two test cases of the KafkaITCase are sporadically failing because records 
 are emitted multiple times.
 Affected tests:
 {{testPersistentSourceWithOffsetUpdates()}}, after the offsets have been 
 changed manually in ZK:
 {code}
 java.lang.RuntimeException: Expected v to be 3, but was 4 on element 0 
 array=[4, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 
 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 
 2]
 {code}
 {{brokerFailureTest()}} also fails:
 {code}
 05/13/2015 08:13:16   Custom source - Stream Sink(1/1) switched to FAILED 
 java.lang.AssertionError: Received tuple with value 21 twice
   at org.junit.Assert.fail(Assert.java:88)
   at org.junit.Assert.assertTrue(Assert.java:41)
   at org.junit.Assert.assertFalse(Assert.java:64)
   at 
 org.apache.flink.streaming.connectors.kafka.KafkaITCase$15.invoke(KafkaITCase.java:877)
   at 
 org.apache.flink.streaming.connectors.kafka.KafkaITCase$15.invoke(KafkaITCase.java:859)
   at 
 org.apache.flink.streaming.api.operators.StreamSink.callUserFunction(StreamSink.java:39)
   at 
 org.apache.flink.streaming.api.operators.StreamOperator.callUserFunctionAndLogException(StreamOperator.java:137)
   at 
 org.apache.flink.streaming.api.operators.ChainableStreamOperator.collect(ChainableStreamOperator.java:54)
   at 
 org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:39)
   at 
 org.apache.flink.streaming.connectors.kafka.api.persistent.PersistentKafkaSource.run(PersistentKafkaSource.java:173)
   at 
 org.apache.flink.streaming.api.operators.StreamSource.callUserFunction(StreamSource.java:40)
   at 
 org.apache.flink.streaming.api.operators.StreamOperator.callUserFunctionAndLogException(StreamOperator.java:137)
   at 
 org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:34)
   at 
 org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:139)
   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
   at java.lang.Thread.run(Thread.java:745)
 {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2292) Report accumulators periodically while job is running

2015-07-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620523#comment-14620523
 ] 

ASF GitHub Bot commented on FLINK-2292:
---

GitHub user mxm opened a pull request:

https://github.com/apache/flink/pull/896

[FLINK-2292][FLINK-1573] add live per-task accumulators

This refactors the accumulators to accumulate per task execution. The
accumulators are reported from the task managers periodically to the job
manager via the Heartbeat message. If the execution contains chained
tasks, the accumulators are chained as well. The final accumulator
results are reported via the UpdateTaskExecutionState message.

The accumulators are now saved in the Execution within the
ExecutionGraph. This makes the AccumulatorManager obsolete. It has been
removed for now. In the future, we might introduce some caching for the
web frontend visualization.

Two types of accumulators are available:

- external (user-defined via the RuntimeContext)
- internal (flink metrics defined in the invocables)

The internal (built-in) metrics are targeted at users who want to
monitor their programs, e.g. through the job manager's web frontend.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mxm/flink live-accumulators

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/896.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #896


commit 7cec1236f087e72b40022bf02a6dbb12d74acbac
Author: Maximilian Michels m...@apache.org
Date:   2015-07-08T07:23:42Z

[FLINK-2292][FLINK-1573] add live per-task accumulators

This refactors the accumulators to accumulate per task execution. The
accumulators are reported from the task managers periodically to the job
manager via the Heartbeat message. If the execution contains chained
tasks, the accumulators are chained as well. The final accumulator
results are reported via the UpdateTaskExecutionState message.

The accumulators are now saved in the Execution within the
ExecutionGraph. This makes the AccumulatorManager obsolete. It has been
removed for now. In the future, we might introduce some caching for the
web frontend visualization.

Two types of accumulators are available:

- external (user-defined via the RuntimeContext)
- internal (flink metrics defined in the invocables)

The internal (built-in) metrics are targeted at users who want to
monitor their programs, e.g. through the job manager's web frontend.




 Report accumulators periodically while job is running
 -

 Key: FLINK-2292
 URL: https://issues.apache.org/jira/browse/FLINK-2292
 Project: Flink
  Issue Type: Sub-task
  Components: JobManager, TaskManager
Reporter: Maximilian Michels
Assignee: Maximilian Michels
 Fix For: 0.10


 Accumulators should be sent periodically, as part of the heartbeat that sends 
 metrics. This allows them to be updated in real time.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...

2015-07-09 Thread mxm
GitHub user mxm opened a pull request:

https://github.com/apache/flink/pull/896

[FLINK-2292][FLINK-1573] add live per-task accumulators

This refactors the accumulators to accumulate per task execution. The
accumulators are reported from the task managers periodically to the job
manager via the Heartbeat message. If the execution contains chained
tasks, the accumulators are chained as well. The final accumulator
results are reported via the UpdateTaskExecutionState message.

The accumulators are now saved in the Execution within the
ExecutionGraph. This makes the AccumulatorManager obsolete. It has been
removed for now. In the future, we might introduce some caching for the
web frontend visualization.

Two types of accumulators are available:

- external (user-defined via the RuntimeContext)
- internal (flink metrics defined in the invocables)

The internal (built-in) metrics are targeted at users who want to
monitor their programs, e.g. through the job manager's web frontend.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mxm/flink live-accumulators

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/896.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #896


commit 7cec1236f087e72b40022bf02a6dbb12d74acbac
Author: Maximilian Michels m...@apache.org
Date:   2015-07-08T07:23:42Z

[FLINK-2292][FLINK-1573] add live per-task accumulators

This refactors the accumulators to accumulate per task execution. The
accumulators are reported from the task managers periodically to the job
manager via the Heartbeat message. If the execution contains chained
tasks, the accumulators are chained as well. The final accumulator
results are reported via the UpdateTaskExecutionState message.

The accumulators are now saved in the Execution within the
ExecutionGraph. This makes the AccumulatorManager obsolete. It has been
removed for now. In the future, we might introduce some caching for the
web frontend visualization.

Two types of accumulators are available:

- external (user-defined via the RuntimeContext)
- internal (flink metrics defined in the invocables)

The internal (built-in) metrics are targeted at users who want to
monitor their programs, e.g. through the job manager's web frontend.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...

2015-07-09 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/896#discussion_r34272308
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -1017,6 +1040,9 @@ object TaskManager {
 
   val HEARTBEAT_INTERVAL: FiniteDuration = 5000 milliseconds
 
+  /* Interval to send accumulators to the job manager  */
+  val ACCUMULATOR_REPORT_INTERVAL: FiniteDuration = 10 seconds
--- End diff --

This variable is nowhere ever used.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2292) Report accumulators periodically while job is running

2015-07-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620762#comment-14620762
 ] 

ASF GitHub Bot commented on FLINK-2292:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/896#discussion_r34272308
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -1017,6 +1040,9 @@ object TaskManager {
 
   val HEARTBEAT_INTERVAL: FiniteDuration = 5000 milliseconds
 
+  /* Interval to send accumulators to the job manager  */
+  val ACCUMULATOR_REPORT_INTERVAL: FiniteDuration = 10 seconds
--- End diff --

This variable is nowhere ever used.


 Report accumulators periodically while job is running
 -

 Key: FLINK-2292
 URL: https://issues.apache.org/jira/browse/FLINK-2292
 Project: Flink
  Issue Type: Sub-task
  Components: JobManager, TaskManager
Reporter: Maximilian Michels
Assignee: Maximilian Michels
 Fix For: 0.10


 Accumulators should be sent periodically, as part of the heartbeat that sends 
 metrics. This allows them to be updated in real time.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...

2015-07-09 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/896#discussion_r34273136
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ---
@@ -601,6 +636,11 @@ void markFinished() {
}
}
 
+   synchronized (accumulatorLock) {
--- End diff --

This lock here seems redundant. No place is looking for those two to be in 
sync.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2292) Report accumulators periodically while job is running

2015-07-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620819#comment-14620819
 ] 

ASF GitHub Bot commented on FLINK-2292:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/896#issuecomment-120065197
  
This pattern seems to repeat in many places:
```java
AccumulatorRegistry accumulatorRegistry = 
getEnvironment().getAccumulatorRegistry();
AccumulatorRegistry.Internal internalRegistry = 
accumulatorRegistry.getInternal();
this.recordsOutCounter = 
internalRegistry.createLongCounter(AccumulatorRegistry.Internal.NUM_RECORDS_OUT);
this.bytesOutCounter = 
internalRegistry.createLongCounter(AccumulatorRegistry.Internal.NUM_BYTES_OUT);

AccumulatorRegistry.External externalRegistry = 
accumulatorRegistry.getExternal();
this.accumulatorMap = new HashMapString, Accumulator?, ?();
externalRegistry.setMap(this.accumulatorMap);
```

I think the code would be simpler is the registry simply always had a 
created map for internal and external accumulators. Also, a reporter object 
would help. The code would then look like:
```java
AccumulatorRegistry accumulatos = getEnvironment().getAccumulatorRegistry();
ReadWriteReporter reporter = accumulatos.getReadWriteReporter();
writer.setStatsReporter(reporter);
this.accumulatorMap = accumulatos.getUserMap();
```


 Report accumulators periodically while job is running
 -

 Key: FLINK-2292
 URL: https://issues.apache.org/jira/browse/FLINK-2292
 Project: Flink
  Issue Type: Sub-task
  Components: JobManager, TaskManager
Reporter: Maximilian Michels
Assignee: Maximilian Michels
 Fix For: 0.10


 Accumulators should be sent periodically, as part of the heartbeat that sends 
 metrics. This allows them to be updated in real time.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [wip] [FLINK-2288] [FLINK-2302] Setup ZooKeepe...

2015-07-09 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/886#issuecomment-120015233
  
Alright, I've opened a JIRA for this: 
https://issues.apache.org/jira/browse/FLINK-2340


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2329) Refactor RPCs from within the ExecutionGraph

2015-07-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620638#comment-14620638
 ] 

ASF GitHub Bot commented on FLINK-2329:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/893#discussion_r34264492
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java ---
@@ -43,8 +41,8 @@
/** The lock on which to synchronize allocations and failure state 
changes */
private final Object instanceLock = new Object();
 
-   /** The actor ref to the task manager represented by this taskManager. 
*/
-   private final ActorRef taskManager;
+   /** The instacne gateway to communicate with the instance */
--- End diff --

typo


 Refactor RPCs from within the ExecutionGraph
 

 Key: FLINK-2329
 URL: https://issues.apache.org/jira/browse/FLINK-2329
 Project: Flink
  Issue Type: Sub-task
  Components: JobManager, TaskManager
Reporter: Till Rohrmann
Assignee: Till Rohrmann
 Fix For: 0.10


 Currently, we store an {{ActorRef}} of the TaskManager into an {{Instance}} 
 object. This {{ActorRef}} is used from within {{Executions}} to interact with 
 the {{TaskManager}}. This is not a nice abstraction since it does not hide 
 implementation details. 
 Since we need to add a leader session ID to messages sent by the 
 {{Executions}} in order to support high availability, we would need to make 
 the leader session ID available to the {{Execution}}. A better solution seems 
 to be to replace the direct {{ActorRef}} interaction with an instance gateway 
 abstraction which encapsulates the communication logic. Having such an 
 abstraction, it will be easy to decorate messages transparently with a leader 
 session ID. Therefore, I propose to refactor the current {{Instance}} 
 communication and to introduce an {{InstanceGateway}} abstraction.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


  1   2   >