[GitHub] flink pull request: [FLINK-2458][FLINK-2449]Access distributed cac...

2015-08-16 Thread sachingoel0101
Github user sachingoel0101 commented on the pull request:

https://github.com/apache/flink/pull/970#issuecomment-131496338
  
I'd like to get this merged soon. This removes multiple constructors for 
Runtime contexts and establishes a clean hierarchy, making any changes to the 
constructors easier. This will be useful for two Jiras on exposing task 
configuration and task attempt number to the Runtime context.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2458) Distributed Cache doesn't work with iterations.

2015-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14698561#comment-14698561
 ] 

ASF GitHub Bot commented on FLINK-2458:
---

Github user sachingoel0101 commented on the pull request:

https://github.com/apache/flink/pull/970#issuecomment-131496338
  
I'd like to get this merged soon. This removes multiple constructors for 
Runtime contexts and establishes a clean hierarchy, making any changes to the 
constructors easier. This will be useful for two Jiras on exposing task 
configuration and task attempt number to the Runtime context.


 Distributed Cache doesn't work with iterations.
 ---

 Key: FLINK-2458
 URL: https://issues.apache.org/jira/browse/FLINK-2458
 Project: Flink
  Issue Type: Bug
Reporter: Sachin Goel
Assignee: Sachin Goel

 Distributed cache files cannot be accessed when working in Iterative 
 contexts. While constructing the context in {{AbstractIterativePactTask}}, 
 the distributed entries are not passed on to runtime context.
 My commit will add this and also unify the distributed cache system to work 
 everywhere, including in the Collection Environment.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2531) combining the if branch to improve the performance

2015-08-16 Thread zhangrucong (JIRA)
zhangrucong created FLINK-2531:
--

 Summary: combining the if branch to improve the performance
 Key: FLINK-2531
 URL: https://issues.apache.org/jira/browse/FLINK-2531
 Project: Flink
  Issue Type: Bug
Reporter: zhangrucong
Priority: Minor






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [flink-2532]fix the function name and the vari...

2015-08-16 Thread Rucongzhang
GitHub user Rucongzhang opened a pull request:

https://github.com/apache/flink/pull/1025

[flink-2532]fix the function name and the variable name are the same

In class StreamWindow, in function split, there is a list variable also 
called split. Two split are confusing , and the readable is poor.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Rucongzhang/flink flink-2532

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1025.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1025


commit 85c1c72851e8d91e9a0647ad79f2a11d629cc30e
Author: Rucongzhang zhangruc...@huawei.com
Date:   2015-08-16T08:44:00Z

[flink-2532]fix the function name and the variable name are the same




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2478) The page “FlinkML - Machine Learning for Flink“ https://ci.apache.org/projects/flink/flink-docs-master/libs/ml/ contains a dead link

2015-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14698546#comment-14698546
 ] 

ASF GitHub Bot commented on FLINK-2478:
---

Github user Rucongzhang commented on the pull request:

https://github.com/apache/flink/pull/1021#issuecomment-131493503
  
Yes,stephan  it should throw an exception.I will modify .Thank you very 
much!


 The page “FlinkML - Machine Learning for Flink“  
 https://ci.apache.org/projects/flink/flink-docs-master/libs/ml/ contains a 
 dead link
 -

 Key: FLINK-2478
 URL: https://issues.apache.org/jira/browse/FLINK-2478
 Project: Flink
  Issue Type: Task
  Components: Documentation
Affects Versions: 0.10
Reporter: Slim Baltagi
Assignee: Till Rohrmann
Priority: Minor

 Note that FlinkML is currently not part of the binary distribution. See 
 linking with it for cluster execution here.
 'here' links to a dead link: 
 https://ci.apache.org/projects/flink/flink-docs-master/libs/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution
 The correct link is: 
 https://ci.apache.org/projects/flink/flink-docs-master/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2478]fix the array may have out of boun...

2015-08-16 Thread Rucongzhang
Github user Rucongzhang commented on the pull request:

https://github.com/apache/flink/pull/1021#issuecomment-131493503
  
Yes,stephan  it should throw an exception.I will modify .Thank you very 
much!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2526) Add catch{} for task when it stop running

2015-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2526?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14698556#comment-14698556
 ] 

ASF GitHub Bot commented on FLINK-2526:
---

Github user ffbin commented on the pull request:

https://github.com/apache/flink/pull/1020#issuecomment-131495368
  
@StephanEwen  Thanks. I think you pull request has fixes this.I will close 
my pull request and view your code.


 Add catch{} for task when it stop running 
 --

 Key: FLINK-2526
 URL: https://issues.apache.org/jira/browse/FLINK-2526
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Affects Versions: 0.8.1
Reporter: fangfengbin
Assignee: fangfengbin
Priority: Minor





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2526]Add try-catch for task when it sto...

2015-08-16 Thread ffbin
Github user ffbin closed the pull request at:

https://github.com/apache/flink/pull/1020


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2526]Add try-catch for task when it sto...

2015-08-16 Thread ffbin
Github user ffbin commented on the pull request:

https://github.com/apache/flink/pull/1020#issuecomment-131495368
  
@StephanEwen  Thanks. I think you pull request has fixes this.I will close 
my pull request and view your code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-2530) optimize equal() of AcknowledgeCheckpoint

2015-08-16 Thread fangfengbin (JIRA)
fangfengbin created FLINK-2530:
--

 Summary: optimize equal() of AcknowledgeCheckpoint
 Key: FLINK-2530
 URL: https://issues.apache.org/jira/browse/FLINK-2530
 Project: Flink
  Issue Type: Bug
  Components: Distributed Runtime
Affects Versions: 0.8.1
Reporter: fangfengbin
Assignee: fangfengbin
Priority: Minor






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2531) combining the if branch to improve the performance

2015-08-16 Thread Henry Saputra (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2531?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14698569#comment-14698569
 ] 

Henry Saputra commented on FLINK-2531:
--

Could you please add more description on how to reproduce this ?

 combining the if branch to improve the performance
 

 Key: FLINK-2531
 URL: https://issues.apache.org/jira/browse/FLINK-2531
 Project: Flink
  Issue Type: Bug
Reporter: zhangrucong
Priority: Minor





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2526) Add catch{} for task when it stop running

2015-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2526?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14698557#comment-14698557
 ] 

ASF GitHub Bot commented on FLINK-2526:
---

Github user ffbin closed the pull request at:

https://github.com/apache/flink/pull/1020


 Add catch{} for task when it stop running 
 --

 Key: FLINK-2526
 URL: https://issues.apache.org/jira/browse/FLINK-2526
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Affects Versions: 0.8.1
Reporter: fangfengbin
Assignee: fangfengbin
Priority: Minor





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2462) Wrong exception reporting in streaming jobs

2015-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2462?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14698566#comment-14698566
 ] 

ASF GitHub Bot commented on FLINK-2462:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1017#discussion_r37143081
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
 ---
@@ -27,114 +26,65 @@
 import org.apache.flink.streaming.api.graph.StreamEdge;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TwoInputStreamTaskIN1, IN2, OUT extends StreamTaskOUT, 
TwoInputStreamOperatorIN1, IN2, OUT {
 
-   private static final Logger LOG = 
LoggerFactory.getLogger(TwoInputStreamTask.class);
-
private StreamTwoInputProcessorIN1, IN2 inputProcessor;
+   
+   private volatile boolean running = true;
 
@Override
-   public void registerInputOutput() {
-   try {
-   super.registerInputOutput();
+   public void init() throws Exception {
+   TypeSerializerIN1 inputDeserializer1 = 
configuration.getTypeSerializerIn1(userClassLoader);
+   TypeSerializerIN2 inputDeserializer2 = 
configuration.getTypeSerializerIn2(userClassLoader);

-   TypeSerializerIN1 inputDeserializer1 = 
configuration.getTypeSerializerIn1(userClassLoader);
-   TypeSerializerIN2 inputDeserializer2 = 
configuration.getTypeSerializerIn2(userClassLoader);
+   int numberOfInputs = configuration.getNumberOfInputs();

-   int numberOfInputs = configuration.getNumberOfInputs();
+   ArrayListInputGate inputList1 = new ArrayListInputGate();
+   ArrayListInputGate inputList2 = new ArrayListInputGate();

-   ArrayListInputGate inputList1 = new 
ArrayListInputGate();
-   ArrayListInputGate inputList2 = new 
ArrayListInputGate();
+   ListStreamEdge inEdges = 
configuration.getInPhysicalEdges(userClassLoader);

-   ListStreamEdge inEdges = 
configuration.getInPhysicalEdges(userClassLoader);
-   
-   for (int i = 0; i  numberOfInputs; i++) {
-   int inputType = inEdges.get(i).getTypeNumber();
-   InputGate reader = 
getEnvironment().getInputGate(i);
-   switch (inputType) {
-   case 1:
-   inputList1.add(reader);
-   break;
-   case 2:
-   inputList2.add(reader);
-   break;
-   default:
-   throw new 
RuntimeException(Invalid input type number:  + inputType);
-   }
+   for (int i = 0; i  numberOfInputs; i++) {
+   int inputType = inEdges.get(i).getTypeNumber();
+   InputGate reader = getEnvironment().getInputGate(i);
+   switch (inputType) {
+   case 1:
+   inputList1.add(reader);
+   break;
+   case 2:
--- End diff --

i don't think that's necessary as it is just an index starting at 1. The 
possible values 1 and 2 are clearly related to which inputList the reader is 
added to.


 Wrong exception reporting in streaming jobs
 ---

 Key: FLINK-2462
 URL: https://issues.apache.org/jira/browse/FLINK-2462
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Affects Versions: 0.10
Reporter: Stephan Ewen
Assignee: Stephan Ewen
Priority: Blocker
 Fix For: 0.10


 When streaming tasks are fail and are canceled, they report a plethora of 
 followup exceptions.
 The batch operators have a clear model that makes sure that root causes are 
 reported, and followup exceptions are not reported. That makes debugging much 
 easier.
 A big part of that is to have a single consistent place that logs exceptions, 
 and that has a view of whether the operation is still running, or whether it 
 has been canceled.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2462] [streaming] Major cleanup of stre...

2015-08-16 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1017#discussion_r37143081
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
 ---
@@ -27,114 +26,65 @@
 import org.apache.flink.streaming.api.graph.StreamEdge;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TwoInputStreamTaskIN1, IN2, OUT extends StreamTaskOUT, 
TwoInputStreamOperatorIN1, IN2, OUT {
 
-   private static final Logger LOG = 
LoggerFactory.getLogger(TwoInputStreamTask.class);
-
private StreamTwoInputProcessorIN1, IN2 inputProcessor;
+   
+   private volatile boolean running = true;
 
@Override
-   public void registerInputOutput() {
-   try {
-   super.registerInputOutput();
+   public void init() throws Exception {
+   TypeSerializerIN1 inputDeserializer1 = 
configuration.getTypeSerializerIn1(userClassLoader);
+   TypeSerializerIN2 inputDeserializer2 = 
configuration.getTypeSerializerIn2(userClassLoader);

-   TypeSerializerIN1 inputDeserializer1 = 
configuration.getTypeSerializerIn1(userClassLoader);
-   TypeSerializerIN2 inputDeserializer2 = 
configuration.getTypeSerializerIn2(userClassLoader);
+   int numberOfInputs = configuration.getNumberOfInputs();

-   int numberOfInputs = configuration.getNumberOfInputs();
+   ArrayListInputGate inputList1 = new ArrayListInputGate();
+   ArrayListInputGate inputList2 = new ArrayListInputGate();

-   ArrayListInputGate inputList1 = new 
ArrayListInputGate();
-   ArrayListInputGate inputList2 = new 
ArrayListInputGate();
+   ListStreamEdge inEdges = 
configuration.getInPhysicalEdges(userClassLoader);

-   ListStreamEdge inEdges = 
configuration.getInPhysicalEdges(userClassLoader);
-   
-   for (int i = 0; i  numberOfInputs; i++) {
-   int inputType = inEdges.get(i).getTypeNumber();
-   InputGate reader = 
getEnvironment().getInputGate(i);
-   switch (inputType) {
-   case 1:
-   inputList1.add(reader);
-   break;
-   case 2:
-   inputList2.add(reader);
-   break;
-   default:
-   throw new 
RuntimeException(Invalid input type number:  + inputType);
-   }
+   for (int i = 0; i  numberOfInputs; i++) {
+   int inputType = inEdges.get(i).getTypeNumber();
+   InputGate reader = getEnvironment().getInputGate(i);
+   switch (inputType) {
+   case 1:
+   inputList1.add(reader);
+   break;
+   case 2:
--- End diff --

i don't think that's necessary as it is just an index starting at 1. The 
possible values 1 and 2 are clearly related to which inputList the reader is 
added to.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Closed] (FLINK-2531) combining the if branch to improve the performance

2015-08-16 Thread Henry Saputra (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2531?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Henry Saputra closed FLINK-2531.

Resolution: Cannot Reproduce

Closing it until the reporter add more description to reproduce the issue.

Feel free to re-open when more information is available.

 combining the if branch to improve the performance
 

 Key: FLINK-2531
 URL: https://issues.apache.org/jira/browse/FLINK-2531
 Project: Flink
  Issue Type: Bug
Reporter: zhangrucong
Priority: Minor





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-2524) Add getTaskNameWithSubtasks() to RuntimeContext

2015-08-16 Thread Sachin Goel (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2524?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sachin Goel reassigned FLINK-2524:
--

Assignee: Sachin Goel

 Add getTaskNameWithSubtasks() to RuntimeContext
 -

 Key: FLINK-2524
 URL: https://issues.apache.org/jira/browse/FLINK-2524
 Project: Flink
  Issue Type: Improvement
  Components: Local Runtime
Affects Versions: 0.10
Reporter: Stephan Ewen
Assignee: Sachin Goel
  Labels: easyfix, starter
 Fix For: 0.10


 When printing information to logs or debug output, one frequently needs to 
 identify the statement with the originating task (task name and which 
 subtask).
 In many places, the system and user code manually construct something like 
 MyTask (2/7).
 The {{RuntimeContext}} should offer this, because it is too frequently needed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2458) Distributed Cache doesn't work with iterations.

2015-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14698600#comment-14698600
 ] 

ASF GitHub Bot commented on FLINK-2458:
---

Github user sachingoel0101 commented on a diff in the pull request:

https://github.com/apache/flink/pull/970#discussion_r37143936
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -897,7 +897,7 @@ class TaskManager(
 config.timeout,
 libCache,
 fileCache,
-runtimeInfo)
+new TaskRuntimeInfo(hostname, taskManagerConfig, 
tdd.getAttemptNumber))
--- End diff --

This is to provide access to Task attempt number from Runtime Context. I 
should add a description of the other tickets this resolves.
Is this a good idea though? To fix five issues in one PR? Or should I open 
a separate one and keep this one for just distributed cache?


 Distributed Cache doesn't work with iterations.
 ---

 Key: FLINK-2458
 URL: https://issues.apache.org/jira/browse/FLINK-2458
 Project: Flink
  Issue Type: Bug
Reporter: Sachin Goel
Assignee: Sachin Goel

 Distributed cache files cannot be accessed when working in Iterative 
 contexts. While constructing the context in {{AbstractIterativePactTask}}, 
 the distributed entries are not passed on to runtime context.
 My commit will add this and also unify the distributed cache system to work 
 everywhere, including in the Collection Environment.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2458][FLINK-2449]Access distributed cac...

2015-08-16 Thread sachingoel0101
Github user sachingoel0101 commented on a diff in the pull request:

https://github.com/apache/flink/pull/970#discussion_r37143936
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -897,7 +897,7 @@ class TaskManager(
 config.timeout,
 libCache,
 fileCache,
-runtimeInfo)
+new TaskRuntimeInfo(hostname, taskManagerConfig, 
tdd.getAttemptNumber))
--- End diff --

This is to provide access to Task attempt number from Runtime Context. I 
should add a description of the other tickets this resolves.
Is this a good idea though? To fix five issues in one PR? Or should I open 
a separate one and keep this one for just distributed cache?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2462] [streaming] Major cleanup of stre...

2015-08-16 Thread ffbin
Github user ffbin commented on a diff in the pull request:

https://github.com/apache/flink/pull/1017#discussion_r37142604
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
 ---
@@ -27,114 +26,65 @@
 import org.apache.flink.streaming.api.graph.StreamEdge;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TwoInputStreamTaskIN1, IN2, OUT extends StreamTaskOUT, 
TwoInputStreamOperatorIN1, IN2, OUT {
 
-   private static final Logger LOG = 
LoggerFactory.getLogger(TwoInputStreamTask.class);
-
private StreamTwoInputProcessorIN1, IN2 inputProcessor;
+   
+   private volatile boolean running = true;
 
@Override
-   public void registerInputOutput() {
-   try {
-   super.registerInputOutput();
+   public void init() throws Exception {
+   TypeSerializerIN1 inputDeserializer1 = 
configuration.getTypeSerializerIn1(userClassLoader);
+   TypeSerializerIN2 inputDeserializer2 = 
configuration.getTypeSerializerIn2(userClassLoader);

-   TypeSerializerIN1 inputDeserializer1 = 
configuration.getTypeSerializerIn1(userClassLoader);
-   TypeSerializerIN2 inputDeserializer2 = 
configuration.getTypeSerializerIn2(userClassLoader);
+   int numberOfInputs = configuration.getNumberOfInputs();

-   int numberOfInputs = configuration.getNumberOfInputs();
+   ArrayListInputGate inputList1 = new ArrayListInputGate();
+   ArrayListInputGate inputList2 = new ArrayListInputGate();

-   ArrayListInputGate inputList1 = new 
ArrayListInputGate();
-   ArrayListInputGate inputList2 = new 
ArrayListInputGate();
+   ListStreamEdge inEdges = 
configuration.getInPhysicalEdges(userClassLoader);

-   ListStreamEdge inEdges = 
configuration.getInPhysicalEdges(userClassLoader);
-   
-   for (int i = 0; i  numberOfInputs; i++) {
-   int inputType = inEdges.get(i).getTypeNumber();
-   InputGate reader = 
getEnvironment().getInputGate(i);
-   switch (inputType) {
-   case 1:
-   inputList1.add(reader);
-   break;
-   case 2:
-   inputList2.add(reader);
-   break;
-   default:
-   throw new 
RuntimeException(Invalid input type number:  + inputType);
-   }
+   for (int i = 0; i  numberOfInputs; i++) {
+   int inputType = inEdges.get(i).getTypeNumber();
+   InputGate reader = getEnvironment().getInputGate(i);
+   switch (inputType) {
+   case 1:
+   inputList1.add(reader);
+   break;
+   case 2:
--- End diff --

what about use a enum to instead of 2, it will be easier to understand it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2462) Wrong exception reporting in streaming jobs

2015-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2462?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14698554#comment-14698554
 ] 

ASF GitHub Bot commented on FLINK-2462:
---

Github user ffbin commented on a diff in the pull request:

https://github.com/apache/flink/pull/1017#discussion_r37142604
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
 ---
@@ -27,114 +26,65 @@
 import org.apache.flink.streaming.api.graph.StreamEdge;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TwoInputStreamTaskIN1, IN2, OUT extends StreamTaskOUT, 
TwoInputStreamOperatorIN1, IN2, OUT {
 
-   private static final Logger LOG = 
LoggerFactory.getLogger(TwoInputStreamTask.class);
-
private StreamTwoInputProcessorIN1, IN2 inputProcessor;
+   
+   private volatile boolean running = true;
 
@Override
-   public void registerInputOutput() {
-   try {
-   super.registerInputOutput();
+   public void init() throws Exception {
+   TypeSerializerIN1 inputDeserializer1 = 
configuration.getTypeSerializerIn1(userClassLoader);
+   TypeSerializerIN2 inputDeserializer2 = 
configuration.getTypeSerializerIn2(userClassLoader);

-   TypeSerializerIN1 inputDeserializer1 = 
configuration.getTypeSerializerIn1(userClassLoader);
-   TypeSerializerIN2 inputDeserializer2 = 
configuration.getTypeSerializerIn2(userClassLoader);
+   int numberOfInputs = configuration.getNumberOfInputs();

-   int numberOfInputs = configuration.getNumberOfInputs();
+   ArrayListInputGate inputList1 = new ArrayListInputGate();
+   ArrayListInputGate inputList2 = new ArrayListInputGate();

-   ArrayListInputGate inputList1 = new 
ArrayListInputGate();
-   ArrayListInputGate inputList2 = new 
ArrayListInputGate();
+   ListStreamEdge inEdges = 
configuration.getInPhysicalEdges(userClassLoader);

-   ListStreamEdge inEdges = 
configuration.getInPhysicalEdges(userClassLoader);
-   
-   for (int i = 0; i  numberOfInputs; i++) {
-   int inputType = inEdges.get(i).getTypeNumber();
-   InputGate reader = 
getEnvironment().getInputGate(i);
-   switch (inputType) {
-   case 1:
-   inputList1.add(reader);
-   break;
-   case 2:
-   inputList2.add(reader);
-   break;
-   default:
-   throw new 
RuntimeException(Invalid input type number:  + inputType);
-   }
+   for (int i = 0; i  numberOfInputs; i++) {
+   int inputType = inEdges.get(i).getTypeNumber();
+   InputGate reader = getEnvironment().getInputGate(i);
+   switch (inputType) {
+   case 1:
+   inputList1.add(reader);
+   break;
+   case 2:
--- End diff --

what about use a enum to instead of 2, it will be easier to understand it.


 Wrong exception reporting in streaming jobs
 ---

 Key: FLINK-2462
 URL: https://issues.apache.org/jira/browse/FLINK-2462
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Affects Versions: 0.10
Reporter: Stephan Ewen
Assignee: Stephan Ewen
Priority: Blocker
 Fix For: 0.10


 When streaming tasks are fail and are canceled, they report a plethora of 
 followup exceptions.
 The batch operators have a clear model that makes sure that root causes are 
 reported, and followup exceptions are not reported. That makes debugging much 
 easier.
 A big part of that is to have a single consistent place that logs exceptions, 
 and that has a view of whether the operation is still running, or whether it 
 has been canceled.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2531]combining the if branch to improve...

2015-08-16 Thread Rucongzhang
GitHub user Rucongzhang opened a pull request:

https://github.com/apache/flink/pull/1023

[FLINK-2531]combining the if branch to improve the judge logic

In function receiveAcknowledgeMessage, the if branch 
checkpoint.isFullyAcknowledged() can combine with the if  branch if 
(checkpoint != null  !checkpoint.isDiscarded())  to improve judge logic.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Rucongzhang/flink FLINK-2531

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1023.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1023


commit ab2eddd0ff43c97b472265c5dc7b8bd541fd76ed
Author: Rucongzhang zhangruc...@huawei.com
Date:   2015-08-16T07:56:38Z

[FLINK-2531]combining the if branch to improve the judge logic




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2458) Distributed Cache doesn't work with iterations.

2015-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14698592#comment-14698592
 ] 

ASF GitHub Bot commented on FLINK-2458:
---

Github user sachingoel0101 commented on the pull request:

https://github.com/apache/flink/pull/970#issuecomment-131504643
  
I decided to go ahead and implement things which touch the Runtime Context 
constructors with this PR. This now closes five Jiras, namely 2449, 2458, 2488, 
2496 and 2524. Commit messages are descriptive of each Jira.


 Distributed Cache doesn't work with iterations.
 ---

 Key: FLINK-2458
 URL: https://issues.apache.org/jira/browse/FLINK-2458
 Project: Flink
  Issue Type: Bug
Reporter: Sachin Goel
Assignee: Sachin Goel

 Distributed cache files cannot be accessed when working in Iterative 
 contexts. While constructing the context in {{AbstractIterativePactTask}}, 
 the distributed entries are not passed on to runtime context.
 My commit will add this and also unify the distributed cache system to work 
 everywhere, including in the Collection Environment.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2488) Expose attemptNumber in RuntimeContext

2015-08-16 Thread Sachin Goel (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2488?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14698595#comment-14698595
 ] 

Sachin Goel commented on FLINK-2488:


Implemented in https://github.com/apache/flink/pull/970

 Expose attemptNumber in RuntimeContext
 --

 Key: FLINK-2488
 URL: https://issues.apache.org/jira/browse/FLINK-2488
 Project: Flink
  Issue Type: Improvement
  Components: JobManager, TaskManager
Affects Versions: 0.10
Reporter: Robert Metzger
Assignee: Sachin Goel
Priority: Minor

 It would be nice to expose the attemptNumber of a task in the 
 {{RuntimeContext}}. 
 This would allow user code to behave differently in restart scenarios.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2449) DistCache doesn't work with JavaProgram Collection tests

2015-08-16 Thread Sachin Goel (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14698596#comment-14698596
 ] 

Sachin Goel commented on FLINK-2449:


This functionality is now supported through 
https://github.com/apache/flink/pull/970

 DistCache doesn't work with JavaProgram Collection tests
 

 Key: FLINK-2449
 URL: https://issues.apache.org/jira/browse/FLINK-2449
 Project: Flink
  Issue Type: Bug
  Components: Tests
Reporter: Chesnay Schepler
Assignee: Sachin Goel
Priority: Minor

 When attempting to retrieve a file during the Collection test in 
 JavaProgramTestBase you'll encounter an exception claiming the file was never 
 registered.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2530]optimize equal() of AcknowledgeChe...

2015-08-16 Thread ffbin
GitHub user ffbin opened a pull request:

https://github.com/apache/flink/pull/1024

[FLINK-2530]optimize equal() of AcknowledgeCheckpoint

optimize  repeated check of this.state == null

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ffbin/flink FLINK-2530

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1024.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1024


commit 643d74865c414a6aa466309ef1514c98339d6995
Author: ffbin 869218...@qq.com
Date:   2015-08-16T08:01:46Z

[FLINK-2530]optimize equal() of AcknowledgeCheckpoint




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Reopened] (FLINK-2531) combining the if branch to improve the performance

2015-08-16 Thread Henry Saputra (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2531?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Henry Saputra reopened FLINK-2531:
--

PR is submitted, so re-open the issue. Please add more description to the JIRA  
issue.

 combining the if branch to improve the performance
 

 Key: FLINK-2531
 URL: https://issues.apache.org/jira/browse/FLINK-2531
 Project: Flink
  Issue Type: Bug
Reporter: zhangrucong
Priority: Minor





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2531]combining the if branch to improve...

2015-08-16 Thread Rucongzhang
Github user Rucongzhang commented on the pull request:

https://github.com/apache/flink/pull/1023#issuecomment-131504171
  
@chiwanpark , you are right. isFullyAcknowledged() must be called after 
calling function acknowledgeTask.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [Flink-2030][ml]Data Set Statistics and Histog...

2015-08-16 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/861#discussion_r37143932
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/statistics/FieldStats.scala
 ---
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.statistics
+
+import org.apache.flink.ml.statistics.FieldType._
+
+import scala.collection.mutable
+
+/** Class to represent Field statistics.
+  *
+  * =Parameters=
+  * -[[fieldType]]:
+  *   Type of this field, [[DISCRETE]] or [[CONTINUOUS]]
+  *
+  * For [[DISCRETE]] fields, [[entropy]], [[gini]] and [[categoryCounts]] 
are provided.
+  * For [[CONTINUOUS]] fields, [[min]], [[max]], [[mean]] and [[variance]] 
are provided.
+  *
+  */
+class FieldStats(val fieldType: FieldType) extends Serializable {
+  // field parameters
+  private [statistics] var _min: Double = _
+  private [statistics] var _max: Double = _
+  private [statistics] var _mean: Double = _
+  private [statistics] var _variance: Double = _
+  private [statistics] var _counts: mutable.HashMap[Double,Int] = _
+
--- End diff --

I meant that the space between `private` and `[statistics]` should be 
removed. It is just cosmetic issue but to keep unified code style, we need fix 
this. :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2531]combining the if branch to improve...

2015-08-16 Thread chiwanpark
Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/1023#issuecomment-131503780
  
-1 for merging. I'm not an expert on flink-runtime, so I'm not sure but we 
cannot merge this PR.

Because `acknowledgeTask` method change status of 
`notYetAcknowledgedTasks`, We cannot combine the two if statements. 
`isFullyAcknowledged` method must be called after calling `acknowledgeTask` 
method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2458][FLINK-2449]Access distributed cac...

2015-08-16 Thread sachingoel0101
Github user sachingoel0101 commented on the pull request:

https://github.com/apache/flink/pull/970#issuecomment-131504643
  
I decided to go ahead and implement things which touch the Runtime Context 
constructors with this PR. This now closes five Jiras, namely 2449, 2458, 2488, 
2496 and 2524. Commit messages are descriptive of each Jira.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2530) optimize equal() of AcknowledgeCheckpoint

2015-08-16 Thread Henry Saputra (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2530?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14698571#comment-14698571
 ] 

Henry Saputra commented on FLINK-2530:
--

Could you kindly add more information on the Description on how to reproduce 
this?

 optimize equal() of AcknowledgeCheckpoint
 -

 Key: FLINK-2530
 URL: https://issues.apache.org/jira/browse/FLINK-2530
 Project: Flink
  Issue Type: Bug
  Components: Distributed Runtime
Affects Versions: 0.8.1
Reporter: fangfengbin
Assignee: fangfengbin
Priority: Minor





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2530) optimize equal() of AcknowledgeCheckpoint

2015-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2530?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14698581#comment-14698581
 ] 

ASF GitHub Bot commented on FLINK-2530:
---

GitHub user ffbin opened a pull request:

https://github.com/apache/flink/pull/1024

[FLINK-2530]optimize equal() of AcknowledgeCheckpoint

optimize  repeated check of this.state == null

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ffbin/flink FLINK-2530

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1024.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1024


commit 643d74865c414a6aa466309ef1514c98339d6995
Author: ffbin 869218...@qq.com
Date:   2015-08-16T08:01:46Z

[FLINK-2530]optimize equal() of AcknowledgeCheckpoint




 optimize equal() of AcknowledgeCheckpoint
 -

 Key: FLINK-2530
 URL: https://issues.apache.org/jira/browse/FLINK-2530
 Project: Flink
  Issue Type: Bug
  Components: Distributed Runtime
Affects Versions: 0.8.1
Reporter: fangfengbin
Assignee: fangfengbin
Priority: Minor





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2531) combining the if branch to improve the performance

2015-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2531?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14698583#comment-14698583
 ] 

ASF GitHub Bot commented on FLINK-2531:
---

Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/1023#issuecomment-131503780
  
-1 for merging. I'm not an expert on flink-runtime, so I'm not sure but we 
cannot merge this PR.

Because `acknowledgeTask` method change status of 
`notYetAcknowledgedTasks`, We cannot combine the two if statements. 
`isFullyAcknowledged` method must be called after calling `acknowledgeTask` 
method.


 combining the if branch to improve the performance
 

 Key: FLINK-2531
 URL: https://issues.apache.org/jira/browse/FLINK-2531
 Project: Flink
  Issue Type: Bug
Reporter: zhangrucong
Priority: Minor





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2531) combining the if branch to improve the performance

2015-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2531?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14698588#comment-14698588
 ] 

ASF GitHub Bot commented on FLINK-2531:
---

Github user Rucongzhang commented on the pull request:

https://github.com/apache/flink/pull/1023#issuecomment-131504171
  
@chiwanpark , you are right. isFullyAcknowledged() must be called after 
calling function acknowledgeTask.


 combining the if branch to improve the performance
 

 Key: FLINK-2531
 URL: https://issues.apache.org/jira/browse/FLINK-2531
 Project: Flink
  Issue Type: Bug
Reporter: zhangrucong
Priority: Minor





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2524) Add getTaskNameWithSubtasks() to RuntimeContext

2015-08-16 Thread Sachin Goel (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2524?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14698594#comment-14698594
 ] 

Sachin Goel commented on FLINK-2524:


Implemented in https://github.com/apache/flink/pull/970

 Add getTaskNameWithSubtasks() to RuntimeContext
 -

 Key: FLINK-2524
 URL: https://issues.apache.org/jira/browse/FLINK-2524
 Project: Flink
  Issue Type: Improvement
  Components: Local Runtime
Affects Versions: 0.10
Reporter: Stephan Ewen
Assignee: Sachin Goel
  Labels: easyfix, starter
 Fix For: 0.10


 When printing information to logs or debug output, one frequently needs to 
 identify the statement with the originating task (task name and which 
 subtask).
 In many places, the system and user code manually construct something like 
 MyTask (2/7).
 The {{RuntimeContext}} should offer this, because it is too frequently needed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2496) Expose Task Manager configuration to Runtime Context

2015-08-16 Thread Sachin Goel (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14698593#comment-14698593
 ] 

Sachin Goel commented on FLINK-2496:


This is implemented in https://github.com/apache/flink/pull/970

 Expose Task Manager configuration to Runtime Context
 

 Key: FLINK-2496
 URL: https://issues.apache.org/jira/browse/FLINK-2496
 Project: Flink
  Issue Type: Bug
Reporter: Sachin Goel
Assignee: Sachin Goel

 Building upon Flink-2425, we should extend the access of Task Manager 
 configuration to the Runtime Context.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2458) Distributed Cache doesn't work with iterations.

2015-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14698598#comment-14698598
 ] 

ASF GitHub Bot commented on FLINK-2458:
---

Github user hsaputra commented on a diff in the pull request:

https://github.com/apache/flink/pull/970#discussion_r37143896
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -897,7 +897,7 @@ class TaskManager(
 config.timeout,
 libCache,
 fileCache,
-runtimeInfo)
+new TaskRuntimeInfo(hostname, taskManagerConfig, 
tdd.getAttemptNumber))
--- End diff --

Why is this changed from before?


 Distributed Cache doesn't work with iterations.
 ---

 Key: FLINK-2458
 URL: https://issues.apache.org/jira/browse/FLINK-2458
 Project: Flink
  Issue Type: Bug
Reporter: Sachin Goel
Assignee: Sachin Goel

 Distributed cache files cannot be accessed when working in Iterative 
 contexts. While constructing the context in {{AbstractIterativePactTask}}, 
 the distributed entries are not passed on to runtime context.
 My commit will add this and also unify the distributed cache system to work 
 everywhere, including in the Collection Environment.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2458][FLINK-2449]Access distributed cac...

2015-08-16 Thread hsaputra
Github user hsaputra commented on a diff in the pull request:

https://github.com/apache/flink/pull/970#discussion_r37143896
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -897,7 +897,7 @@ class TaskManager(
 config.timeout,
 libCache,
 fileCache,
-runtimeInfo)
+new TaskRuntimeInfo(hostname, taskManagerConfig, 
tdd.getAttemptNumber))
--- End diff --

Why is this changed from before?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2458) Distributed Cache doesn't work with iterations.

2015-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14698614#comment-14698614
 ] 

ASF GitHub Bot commented on FLINK-2458:
---

Github user sachingoel0101 commented on the pull request:

https://github.com/apache/flink/pull/970#issuecomment-131519550
  
Reverting back to make this PR only about the distributed cache.


 Distributed Cache doesn't work with iterations.
 ---

 Key: FLINK-2458
 URL: https://issues.apache.org/jira/browse/FLINK-2458
 Project: Flink
  Issue Type: Bug
Reporter: Sachin Goel
Assignee: Sachin Goel

 Distributed cache files cannot be accessed when working in Iterative 
 contexts. While constructing the context in {{AbstractIterativePactTask}}, 
 the distributed entries are not passed on to runtime context.
 My commit will add this and also unify the distributed cache system to work 
 everywhere, including in the Collection Environment.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2458][FLINK-2449]Access distributed cac...

2015-08-16 Thread sachingoel0101
Github user sachingoel0101 commented on the pull request:

https://github.com/apache/flink/pull/970#issuecomment-131519550
  
Reverting back to make this PR only about the distributed cache.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [Flink-2030][ml]Data Set Statistics and Histog...

2015-08-16 Thread chiwanpark
Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/861#issuecomment-131512180
  
@sachingoel0101 There is no need to forgive. I just reviewed this PR and 
left my opinions. :) If you feel that my comments is aggressive, I'm sorry 
about that.

About cosmetic issues, It seems okay except `MLUtils.createHistogram` 
method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2530) optimize equal() of AcknowledgeCheckpoint

2015-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2530?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14698603#comment-14698603
 ] 

ASF GitHub Bot commented on FLINK-2530:
---

Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/1024#issuecomment-131512254
  
i don't think these statements are equivalent.

Assume that this.state == null and that.state != null.

In the original version we evaluate that.state == null, which is False, so 
the overall result is False.

In your version we would evaluate (this.state == null || 
this.state.equals(that.state)), which is True, making the overall result true.

Unless i made a mistake, -1.


 optimize equal() of AcknowledgeCheckpoint
 -

 Key: FLINK-2530
 URL: https://issues.apache.org/jira/browse/FLINK-2530
 Project: Flink
  Issue Type: Bug
  Components: Distributed Runtime
Affects Versions: 0.8.1
Reporter: fangfengbin
Assignee: fangfengbin
Priority: Minor





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [Flink-2030][ml]Data Set Statistics and Histog...

2015-08-16 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/861#discussion_r37144073
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/MLUtils.scala ---
@@ -119,4 +120,31 @@ object MLUtils {
 
 stringRepresentation.writeAsText(filePath)
   }
+
+  /** Create a [[DataSet]] of [[OnlineHistogram]] from the input data
+*
+* @param bins Number of bins required. Zero for 
[[CategoricalHistogram]]
+* @param data input [[DataSet]] of [[Double]]
+* @return [[DataSet]] of [[OnlineHistogram]]
+*/
+  private [ml] def createHistogram(data: DataSet[Double], bins: Int): 
DataSet[OnlineHistogram] = {
+val min = data.reduce((x, y) = Math.min(x, y))
+val max = data.reduce((x, y) = Math.max(x, y))
+
+val stats = min.mapWithBcVariable(max){
+  (minimum,maximum) = (minimum - 2 * (maximum - minimum), maximum + 2 
* (maximum - minimum))
+}
+val ret: DataSet[OnlineHistogram] = data.mapWithBcVariable(stats){ (x, 
statistics) =
+  if(bins  0){
+val h = new ContinuousHistogram(1, statistics._1, statistics._2)
+h.loadData(Array((x, 1)))
+h
+  } else{
--- End diff --

Need a space between `else` and `{`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2530]optimize equal() of AcknowledgeChe...

2015-08-16 Thread zentol
Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/1024#issuecomment-131512254
  
i don't think these statements are equivalent.

Assume that this.state == null and that.state != null.

In the original version we evaluate that.state == null, which is False, so 
the overall result is False.

In your version we would evaluate (this.state == null || 
this.state.equals(that.state)), which is True, making the overall result true.

Unless i made a mistake, -1.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [Flink-2030][ml]Data Set Statistics and Histog...

2015-08-16 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/861#discussion_r37144068
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/MLUtils.scala ---
@@ -119,4 +120,31 @@ object MLUtils {
 
 stringRepresentation.writeAsText(filePath)
   }
+
+  /** Create a [[DataSet]] of [[OnlineHistogram]] from the input data
+*
+* @param bins Number of bins required. Zero for 
[[CategoricalHistogram]]
+* @param data input [[DataSet]] of [[Double]]
+* @return [[DataSet]] of [[OnlineHistogram]]
+*/
+  private [ml] def createHistogram(data: DataSet[Double], bins: Int): 
DataSet[OnlineHistogram] = {
+val min = data.reduce((x, y) = Math.min(x, y))
+val max = data.reduce((x, y) = Math.max(x, y))
+
+val stats = min.mapWithBcVariable(max){
+  (minimum,maximum) = (minimum - 2 * (maximum - minimum), maximum + 2 
* (maximum - minimum))
+}
+val ret: DataSet[OnlineHistogram] = data.mapWithBcVariable(stats){ (x, 
statistics) =
+  if(bins  0){
--- End diff --

`if (bins  0) {` would be better.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2530) optimize equal() of AcknowledgeCheckpoint

2015-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2530?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14698638#comment-14698638
 ] 

ASF GitHub Bot commented on FLINK-2530:
---

Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/1024#issuecomment-131534953
  
Looking at the pure logic this would work, but you can't remove that.state 
!= null since that could result in a NullPointerException inside equals.


 optimize equal() of AcknowledgeCheckpoint
 -

 Key: FLINK-2530
 URL: https://issues.apache.org/jira/browse/FLINK-2530
 Project: Flink
  Issue Type: Bug
  Components: Distributed Runtime
Affects Versions: 0.8.1
Reporter: fangfengbin
Assignee: fangfengbin
Priority: Minor





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2530]optimize equal() of AcknowledgeChe...

2015-08-16 Thread zentol
Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/1024#issuecomment-131534953
  
Looking at the pure logic this would work, but you can't remove that.state 
!= null since that could result in a NullPointerException inside equals.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2458][FLINK-2449]Access distributed cac...

2015-08-16 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/970#discussion_r37144152
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -897,7 +897,7 @@ class TaskManager(
 config.timeout,
 libCache,
 fileCache,
-runtimeInfo)
+new TaskRuntimeInfo(hostname, taskManagerConfig, 
tdd.getAttemptNumber))
--- End diff --

generally we try to keep one PR for one issue, exceptions should only be 
done for closely related issues.

why did you decide to add these issues into this PR? ( i have a hard time 
understanding it, since the commits barely touch the same files.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2458) Distributed Cache doesn't work with iterations.

2015-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14698604#comment-14698604
 ] 

ASF GitHub Bot commented on FLINK-2458:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/970#discussion_r37144152
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -897,7 +897,7 @@ class TaskManager(
 config.timeout,
 libCache,
 fileCache,
-runtimeInfo)
+new TaskRuntimeInfo(hostname, taskManagerConfig, 
tdd.getAttemptNumber))
--- End diff --

generally we try to keep one PR for one issue, exceptions should only be 
done for closely related issues.

why did you decide to add these issues into this PR? ( i have a hard time 
understanding it, since the commits barely touch the same files.


 Distributed Cache doesn't work with iterations.
 ---

 Key: FLINK-2458
 URL: https://issues.apache.org/jira/browse/FLINK-2458
 Project: Flink
  Issue Type: Bug
Reporter: Sachin Goel
Assignee: Sachin Goel

 Distributed cache files cannot be accessed when working in Iterative 
 contexts. While constructing the context in {{AbstractIterativePactTask}}, 
 the distributed entries are not passed on to runtime context.
 My commit will add this and also unify the distributed cache system to work 
 everywhere, including in the Collection Environment.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2462] [streaming] Major cleanup of stre...

2015-08-16 Thread gyfora
Github user gyfora commented on a diff in the pull request:

https://github.com/apache/flink/pull/1017#discussion_r37144298
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
 ---
@@ -25,65 +25,68 @@
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.io.BlockingQueueBroker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class StreamIterationTailIN extends OneInputStreamTaskIN, IN {
 
-   private static final Logger LOG = 
LoggerFactory.getLogger(StreamIterationTail.class);
+   private static final Logger LOG = 
LoggerFactory.getLogger(StreamIterationHead.class);
--- End diff --

This is probably a copy-paste error - StreamIterationTail.class


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [Flink-2030][ml]Data Set Statistics and Histog...

2015-08-16 Thread sachingoel0101
Github user sachingoel0101 commented on the pull request:

https://github.com/apache/flink/pull/861#issuecomment-131518423
  
The occasional space issues are because I stopped using Idea's reformat 
tool. It sometimes messes up the indentations. :') 
And no. Your comments weren't aggressive. I was apologizing for the trivial 
oversights in my code.
Addressed the latest comments too. :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2462) Wrong exception reporting in streaming jobs

2015-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2462?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14698612#comment-14698612
 ] 

ASF GitHub Bot commented on FLINK-2462:
---

Github user gyfora commented on a diff in the pull request:

https://github.com/apache/flink/pull/1017#discussion_r37144298
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
 ---
@@ -25,65 +25,68 @@
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.io.BlockingQueueBroker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class StreamIterationTailIN extends OneInputStreamTaskIN, IN {
 
-   private static final Logger LOG = 
LoggerFactory.getLogger(StreamIterationTail.class);
+   private static final Logger LOG = 
LoggerFactory.getLogger(StreamIterationHead.class);
--- End diff --

This is probably a copy-paste error - StreamIterationTail.class


 Wrong exception reporting in streaming jobs
 ---

 Key: FLINK-2462
 URL: https://issues.apache.org/jira/browse/FLINK-2462
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Affects Versions: 0.10
Reporter: Stephan Ewen
Assignee: Stephan Ewen
Priority: Blocker
 Fix For: 0.10


 When streaming tasks are fail and are canceled, they report a plethora of 
 followup exceptions.
 The batch operators have a clear model that makes sure that root causes are 
 reported, and followup exceptions are not reported. That makes debugging much 
 easier.
 A big part of that is to have a single consistent place that logs exceptions, 
 and that has a view of whether the operation is still running, or whether it 
 has been canceled.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2458) Distributed Cache doesn't work with iterations.

2015-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14698610#comment-14698610
 ] 

ASF GitHub Bot commented on FLINK-2458:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/970#discussion_r37144275
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -897,7 +897,7 @@ class TaskManager(
 config.timeout,
 libCache,
 fileCache,
-runtimeInfo)
+new TaskRuntimeInfo(hostname, taskManagerConfig, 
tdd.getAttemptNumber))
--- End diff --

I would prefer if you opened a second PR once this is merged. The issues 
are not really related to each other; the 2nd commit was simply made based on 
the 1st commit. We would end up having two separate discussions in 1 PR, which 
i think is a bad idea.


 Distributed Cache doesn't work with iterations.
 ---

 Key: FLINK-2458
 URL: https://issues.apache.org/jira/browse/FLINK-2458
 Project: Flink
  Issue Type: Bug
Reporter: Sachin Goel
Assignee: Sachin Goel

 Distributed cache files cannot be accessed when working in Iterative 
 contexts. While constructing the context in {{AbstractIterativePactTask}}, 
 the distributed entries are not passed on to runtime context.
 My commit will add this and also unify the distributed cache system to work 
 everywhere, including in the Collection Environment.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2488) Expose attemptNumber in RuntimeContext

2015-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2488?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14698616#comment-14698616
 ] 

ASF GitHub Bot commented on FLINK-2488:
---

GitHub user sachingoel0101 opened a pull request:

https://github.com/apache/flink/pull/1026

[FLINK-2488][FLINK-2496] Expose Task Manager configuration and Task attempt 
number to Runtime context

This PR fixes these issues:
1. [FLINK-2496]Expose Task Manager configuration to Runtime Context 
2. [FLINK-2488]Expose attempt number of task to Runtime Context
3. [FLINK-2524]Add getTaskNameWithSubtasks to Runtime Context

Depends on #970 for simplistic constructors for `RuntimeContext`.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sachingoel0101/flink flink-2488

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1026.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1026


commit a8d138519bfcf24fb63028b6ecae3e384a05b69d
Author: Sachin Goel sachingoel0...@gmail.com
Date:   2015-08-01T13:55:42Z

[FLINK-2458]Access distributed cache entries from Iteration contexts.
[FLINK-2449]Allow use of distributed cache from Collection Environments

commit 5fbdebe5742b26683a2755d80681cc505962fc02
Author: Sachin Goel sachingoel0...@gmail.com
Date:   2015-08-16T07:44:39Z

[FLINK-2496]Expose Task Manager configuration to Runtime Context
[FLINK-2488]Expose attempt number of task to Runtime Context
[FLINK-2524]Add getTaskNameWithSubtasks to Runtime Context




 Expose attemptNumber in RuntimeContext
 --

 Key: FLINK-2488
 URL: https://issues.apache.org/jira/browse/FLINK-2488
 Project: Flink
  Issue Type: Improvement
  Components: JobManager, TaskManager
Affects Versions: 0.10
Reporter: Robert Metzger
Assignee: Sachin Goel
Priority: Minor

 It would be nice to expose the attemptNumber of a task in the 
 {{RuntimeContext}}. 
 This would allow user code to behave differently in restart scenarios.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [Flink-2030][ml]Data Set Statistics and Histog...

2015-08-16 Thread sachingoel0101
Github user sachingoel0101 commented on the pull request:

https://github.com/apache/flink/pull/861#issuecomment-131523559
  
Fixed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [Flink-2030][ml]Data Set Statistics and Histog...

2015-08-16 Thread sachingoel0101
Github user sachingoel0101 commented on the pull request:

https://github.com/apache/flink/pull/861#issuecomment-131509499
  
Hi @chiwanpark , I've fixed the *cosmetic* issues. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [Flink-2030][ml]Data Set Statistics and Histog...

2015-08-16 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/861#discussion_r37144130
  
--- Diff: docs/libs/ml/statistics.md ---
@@ -0,0 +1,100 @@
+---
+mathjax: include
+htmlTitle: FlinkML - Statistics
+title: a href=../mlFlinkML/a - Statistics
+---
+!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+License); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+AS IS BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+--
+
+* This will be replaced by the TOC
+{:toc}
+
+## Description
+
+ The statistics utility provides features such as building histograms over 
data, determining
+ mean, variance, gini impurity, entropy etc. of data.
+
+## Methods
+
+ The Statistics utility provides two major functions: `createHistogram` 
and `dataStats`.
+
+### Creating a histogram
+
+ There are two types of histograms:
+   1. strongContinuous Histograms/strong: These histograms are formed 
on a data set `X:
--- End diff --

`strong` tag can be replaced by `**`. `**Continuous Histograms**` is same 
as `strongContinuous Histograms/strong`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [Flink-2030][ml]Data Set Statistics and Histog...

2015-08-16 Thread chiwanpark
Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/861#issuecomment-131514958
  
I found some points to improve and added line notes. :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [Flink-2030][ml]Data Set Statistics and Histog...

2015-08-16 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/861#discussion_r37144066
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/MLUtils.scala ---
@@ -119,4 +120,31 @@ object MLUtils {
 
 stringRepresentation.writeAsText(filePath)
   }
+
+  /** Create a [[DataSet]] of [[OnlineHistogram]] from the input data
+*
+* @param bins Number of bins required. Zero for 
[[CategoricalHistogram]]
+* @param data input [[DataSet]] of [[Double]]
+* @return [[DataSet]] of [[OnlineHistogram]]
+*/
+  private [ml] def createHistogram(data: DataSet[Double], bins: Int): 
DataSet[OnlineHistogram] = {
--- End diff --

`private[ml]` would be better. :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [Flink-2030][ml]Data Set Statistics and Histog...

2015-08-16 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/861#discussion_r37144175
  
--- Diff: docs/libs/ml/statistics.md ---
@@ -0,0 +1,100 @@
+---
+mathjax: include
+htmlTitle: FlinkML - Statistics
+title: a href=../mlFlinkML/a - Statistics
+---
+!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+License); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+AS IS BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+--
+
+* This will be replaced by the TOC
+{:toc}
+
+## Description
+
+ The statistics utility provides features such as building histograms over 
data, determining
+ mean, variance, gini impurity, entropy etc. of data.
+
+## Methods
+
+ The Statistics utility provides two major functions: `createHistogram` 
and `dataStats`.
+
+### Creating a histogram
+
+ There are two types of histograms:
+   1. strongContinuous Histograms/strong: These histograms are formed 
on a data set `X:
+   DataSet[Double]`
+   when the values in `X` are from a continuous range. These histograms 
support
+   `quantile` and `sum`  operations. Here `quantile(q)` refers to a value 
$x_q$ such that $|x: x
+   \leq x_q| = q * |X|$. Further, `sum(s)` refers to the number of 
elements $x \leq s$, which can
+be construed as a cumulative probability value at $s$[Of course, 
iscaled/i probability].
+   br
+   2. A continuous histogram can be formed by calling 
`X.createHistogram(b)` where `b` is the
+number of bins.
+strongCategorical Histograms/strong: These histograms are formed 
on a data set `X:DataSet[Double]` 
+when the values in `X` are from a discrete distribution. These 
histograms
+support `count(c)` operation which returns the number of elements 
associated with cateogry `c`.
+br
+A categorical histogram can be formed by calling 
`X.createHistogram(0)`.
+
+### Data Statistics
+
+ The `dataStats` function operates on a data set `X: DataSet[Vector]` and 
returns column-wise
+ statistics for `X`. Every field of `X` is allowed to be defined as either 
idiscrete/i or
+ icontinuous/i.
--- End diff --

`i` tag


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [Flink-2030][ml]Data Set Statistics and Histog...

2015-08-16 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/861#discussion_r37144173
  
--- Diff: docs/libs/ml/statistics.md ---
@@ -0,0 +1,100 @@
+---
+mathjax: include
+htmlTitle: FlinkML - Statistics
+title: a href=../mlFlinkML/a - Statistics
+---
+!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+License); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+AS IS BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+--
+
+* This will be replaced by the TOC
+{:toc}
+
+## Description
+
+ The statistics utility provides features such as building histograms over 
data, determining
+ mean, variance, gini impurity, entropy etc. of data.
+
+## Methods
+
+ The Statistics utility provides two major functions: `createHistogram` 
and `dataStats`.
+
+### Creating a histogram
+
+ There are two types of histograms:
+   1. strongContinuous Histograms/strong: These histograms are formed 
on a data set `X:
+   DataSet[Double]`
+   when the values in `X` are from a continuous range. These histograms 
support
+   `quantile` and `sum`  operations. Here `quantile(q)` refers to a value 
$x_q$ such that $|x: x
+   \leq x_q| = q * |X|$. Further, `sum(s)` refers to the number of 
elements $x \leq s$, which can
+be construed as a cumulative probability value at $s$[Of course, 
iscaled/i probability].
+   br
+   2. A continuous histogram can be formed by calling 
`X.createHistogram(b)` where `b` is the
+number of bins.
+strongCategorical Histograms/strong: These histograms are formed 
on a data set `X:DataSet[Double]` 
+when the values in `X` are from a discrete distribution. These 
histograms
+support `count(c)` operation which returns the number of elements 
associated with cateogry `c`.
+br
+A categorical histogram can be formed by calling 
`X.createHistogram(0)`.
+
+### Data Statistics
+
+ The `dataStats` function operates on a data set `X: DataSet[Vector]` and 
returns column-wise
+ statistics for `X`. Every field of `X` is allowed to be defined as either 
idiscrete/i or
--- End diff --

`i` tag


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [Flink-2030][ml]Data Set Statistics and Histog...

2015-08-16 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/861#discussion_r37144171
  
--- Diff: docs/libs/ml/statistics.md ---
@@ -0,0 +1,100 @@
+---
+mathjax: include
+htmlTitle: FlinkML - Statistics
+title: a href=../mlFlinkML/a - Statistics
+---
+!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+License); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+AS IS BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+--
+
+* This will be replaced by the TOC
+{:toc}
+
+## Description
+
+ The statistics utility provides features such as building histograms over 
data, determining
+ mean, variance, gini impurity, entropy etc. of data.
+
+## Methods
+
+ The Statistics utility provides two major functions: `createHistogram` 
and `dataStats`.
+
+### Creating a histogram
+
+ There are two types of histograms:
+   1. strongContinuous Histograms/strong: These histograms are formed 
on a data set `X:
+   DataSet[Double]`
+   when the values in `X` are from a continuous range. These histograms 
support
+   `quantile` and `sum`  operations. Here `quantile(q)` refers to a value 
$x_q$ such that $|x: x
+   \leq x_q| = q * |X|$. Further, `sum(s)` refers to the number of 
elements $x \leq s$, which can
+be construed as a cumulative probability value at $s$[Of course, 
iscaled/i probability].
+   br
+   2. A continuous histogram can be formed by calling 
`X.createHistogram(b)` where `b` is the
+number of bins.
+strongCategorical Histograms/strong: These histograms are formed 
on a data set `X:DataSet[Double]` 
--- End diff --

`strong` tag


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2458][FLINK-2449]Access distributed cac...

2015-08-16 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/970#discussion_r37144275
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -897,7 +897,7 @@ class TaskManager(
 config.timeout,
 libCache,
 fileCache,
-runtimeInfo)
+new TaskRuntimeInfo(hostname, taskManagerConfig, 
tdd.getAttemptNumber))
--- End diff --

I would prefer if you opened a second PR once this is merged. The issues 
are not really related to each other; the 2nd commit was simply made based on 
the 1st commit. We would end up having two separate discussions in 1 PR, which 
i think is a bad idea.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-2529) fix on some unused code for flink-runtime

2015-08-16 Thread Huang Wei (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2529?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Huang Wei updated FLINK-2529:
-
Description: 
In file BlobServer.java, I found the Thread.currentThread() will never return 
null in my learned knowledge.
So I think shutdownHook != null“ is not necessary in code 'if (shutdownHook != 
null  shutdownHook != Thread.currentThread())';
And I am not complete sure.
Maybe I am wrong.

 fix on some unused code for flink-runtime
 -

 Key: FLINK-2529
 URL: https://issues.apache.org/jira/browse/FLINK-2529
 Project: Flink
  Issue Type: Improvement
  Components: Local Runtime
Affects Versions: 0.10
Reporter: Huang Wei
Priority: Minor
 Fix For: 0.10

   Original Estimate: 168h
  Remaining Estimate: 168h

 In file BlobServer.java, I found the Thread.currentThread() will never return 
 null in my learned knowledge.
 So I think shutdownHook != null“ is not necessary in code 'if (shutdownHook 
 != null  shutdownHook != Thread.currentThread())';
 And I am not complete sure.
 Maybe I am wrong.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2458) Distributed Cache doesn't work with iterations.

2015-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14698607#comment-14698607
 ] 

ASF GitHub Bot commented on FLINK-2458:
---

Github user sachingoel0101 commented on a diff in the pull request:

https://github.com/apache/flink/pull/970#discussion_r37144194
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -897,7 +897,7 @@ class TaskManager(
 config.timeout,
 libCache,
 fileCache,
-runtimeInfo)
+new TaskRuntimeInfo(hostname, taskManagerConfig, 
tdd.getAttemptNumber))
--- End diff --

Yes. The addition of distributed cache removes the need for multiple 
constructors for `RuntimeContext`s. Since providing access to runtime 
information needed changing the constructors, I deemed it better to work with 
what would be the only needed constructors after merging this. 
I can revert this commit and open a separate PR for the *other* three 
issues if necessary.


 Distributed Cache doesn't work with iterations.
 ---

 Key: FLINK-2458
 URL: https://issues.apache.org/jira/browse/FLINK-2458
 Project: Flink
  Issue Type: Bug
Reporter: Sachin Goel
Assignee: Sachin Goel

 Distributed cache files cannot be accessed when working in Iterative 
 contexts. While constructing the context in {{AbstractIterativePactTask}}, 
 the distributed entries are not passed on to runtime context.
 My commit will add this and also unify the distributed cache system to work 
 everywhere, including in the Collection Environment.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [Flink-2030][ml]Data Set Statistics and Histog...

2015-08-16 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/861#discussion_r37144183
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/statistics/CategoricalHistogram.scala
 ---
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.statistics
+
+import scala.collection.mutable
+
+/** Implementation of a discrete valued online histogram
+  *
+  * =Parameters=
+  * -[[numCategories]]:
+  *   Number of categories in the histogram
+  */
+case class CategoricalHistogram(numCategories: Int) extends 
OnlineHistogram {
+
+  require(numCategories  0, Capacity must be greater than zero)
+  val data = new mutable.HashMap[Double, Int]()
+
+  /** Number of categories in the histogram
+*
+* @return number of categories
+*/
+  override def bins: Int = {
+numCategories
+  }
+
+  /** Increment count of category c
+*
+* @param c category whose count needs to be incremented
+*/
+  override def add(c: Double): Unit = {
+data.get(c) match{
+  case None =
+require(data.size  numCategories, Insufficient capacity. Failed 
to add.)
+data.put(c, 1)
+  case Some(value) =
+data.update(c, value + 1)
+}
+  }
+
+  /** Merges the histogram with h and returns a new histogram
+*
+* @param h histogram to be merged
+* @param B number of categories in the resultant histogram.
+*  (Default: ```0```, number of categories will be the size of 
union of categories in
+*  both histograms)
+* @return Merged histogram with capacity B
+*/
+  override def merge(h: OnlineHistogram, B: Int = 0): CategoricalHistogram 
= {
+h match {
+  case h1: CategoricalHistogram = {
+val finalMap = new mutable.HashMap[Double, Int]()
+data.iterator.foreach(x = finalMap.put(x._1, x._2))
+h1.data.iterator.foreach(x = {
+  finalMap.get(x._1) match{
+case None = finalMap.put(x._1, x._2)
+case Some(value) = finalMap.update(x._1, x._2 + value)
+  }
+})
+require(B == 0 || finalMap.size = B, Insufficient capacity. 
Failed to merge)
+var finalSize = finalMap.size
+if (B  0) {
+  finalSize = B
+}
+val ret = new CategoricalHistogram(finalSize)
+ret.loadData(finalMap.toArray)
+ret
+  }
+  case default =
+throw new RuntimeException(Only a categorical histogram is 
allowed to be merged with a  +
+  categorical histogram)
+}
+  }
+
+  /** Number of elements in category c
+*
+* @return Number of points in category c
+*/
+  def count(c: Double): Int = {
+data.get(c) match{
--- End diff --

We need a space between `match` and `{`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [Flink-2030][ml]Data Set Statistics and Histog...

2015-08-16 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/861#discussion_r37144182
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/statistics/CategoricalHistogram.scala
 ---
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.statistics
+
+import scala.collection.mutable
+
+/** Implementation of a discrete valued online histogram
+  *
+  * =Parameters=
+  * -[[numCategories]]:
+  *   Number of categories in the histogram
+  */
+case class CategoricalHistogram(numCategories: Int) extends 
OnlineHistogram {
+
+  require(numCategories  0, Capacity must be greater than zero)
+  val data = new mutable.HashMap[Double, Int]()
+
+  /** Number of categories in the histogram
+*
+* @return number of categories
+*/
+  override def bins: Int = {
+numCategories
+  }
+
+  /** Increment count of category c
+*
+* @param c category whose count needs to be incremented
+*/
+  override def add(c: Double): Unit = {
+data.get(c) match{
+  case None =
+require(data.size  numCategories, Insufficient capacity. Failed 
to add.)
+data.put(c, 1)
+  case Some(value) =
+data.update(c, value + 1)
+}
+  }
+
+  /** Merges the histogram with h and returns a new histogram
+*
+* @param h histogram to be merged
+* @param B number of categories in the resultant histogram.
+*  (Default: ```0```, number of categories will be the size of 
union of categories in
+*  both histograms)
+* @return Merged histogram with capacity B
+*/
+  override def merge(h: OnlineHistogram, B: Int = 0): CategoricalHistogram 
= {
+h match {
+  case h1: CategoricalHistogram = {
+val finalMap = new mutable.HashMap[Double, Int]()
+data.iterator.foreach(x = finalMap.put(x._1, x._2))
+h1.data.iterator.foreach(x = {
+  finalMap.get(x._1) match{
+case None = finalMap.put(x._1, x._2)
+case Some(value) = finalMap.update(x._1, x._2 + value)
+  }
+})
+require(B == 0 || finalMap.size = B, Insufficient capacity. 
Failed to merge)
+var finalSize = finalMap.size
+if (B  0) {
+  finalSize = B
+}
--- End diff --

We can change `finalSize` from mutable to immutable.

```scala
val finalSize = if (B  0) B else finalMap.size
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2458][FLINK-2449]Access distributed cac...

2015-08-16 Thread sachingoel0101
Github user sachingoel0101 commented on a diff in the pull request:

https://github.com/apache/flink/pull/970#discussion_r37144194
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -897,7 +897,7 @@ class TaskManager(
 config.timeout,
 libCache,
 fileCache,
-runtimeInfo)
+new TaskRuntimeInfo(hostname, taskManagerConfig, 
tdd.getAttemptNumber))
--- End diff --

Yes. The addition of distributed cache removes the need for multiple 
constructors for `RuntimeContext`s. Since providing access to runtime 
information needed changing the constructors, I deemed it better to work with 
what would be the only needed constructors after merging this. 
I can revert this commit and open a separate PR for the *other* three 
issues if necessary.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [Flink-2030][ml]Data Set Statistics and Histog...

2015-08-16 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/861#discussion_r37144195
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/statistics/CategoricalHistogram.scala
 ---
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.statistics
+
+import scala.collection.mutable
+
+/** Implementation of a discrete valued online histogram
+  *
+  * =Parameters=
+  * -[[numCategories]]:
+  *   Number of categories in the histogram
+  */
+case class CategoricalHistogram(numCategories: Int) extends 
OnlineHistogram {
+
+  require(numCategories  0, Capacity must be greater than zero)
+  val data = new mutable.HashMap[Double, Int]()
+
+  /** Number of categories in the histogram
+*
+* @return number of categories
+*/
+  override def bins: Int = {
+numCategories
+  }
+
+  /** Increment count of category c
+*
+* @param c category whose count needs to be incremented
+*/
+  override def add(c: Double): Unit = {
+data.get(c) match{
+  case None =
+require(data.size  numCategories, Insufficient capacity. Failed 
to add.)
+data.put(c, 1)
+  case Some(value) =
+data.update(c, value + 1)
+}
+  }
+
+  /** Merges the histogram with h and returns a new histogram
+*
+* @param h histogram to be merged
+* @param B number of categories in the resultant histogram.
+*  (Default: ```0```, number of categories will be the size of 
union of categories in
+*  both histograms)
+* @return Merged histogram with capacity B
+*/
+  override def merge(h: OnlineHistogram, B: Int = 0): CategoricalHistogram 
= {
+h match {
+  case h1: CategoricalHistogram = {
+val finalMap = new mutable.HashMap[Double, Int]()
+data.iterator.foreach(x = finalMap.put(x._1, x._2))
+h1.data.iterator.foreach(x = {
+  finalMap.get(x._1) match{
--- End diff --

A space is needed between `match` and `{`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [Flink-2030][ml]Data Set Statistics and Histog...

2015-08-16 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/861#discussion_r37144192
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/statistics/CategoricalHistogram.scala
 ---
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.statistics
+
+import scala.collection.mutable
+
+/** Implementation of a discrete valued online histogram
+  *
+  * =Parameters=
+  * -[[numCategories]]:
+  *   Number of categories in the histogram
+  */
+case class CategoricalHistogram(numCategories: Int) extends 
OnlineHistogram {
+
+  require(numCategories  0, Capacity must be greater than zero)
+  val data = new mutable.HashMap[Double, Int]()
+
+  /** Number of categories in the histogram
+*
+* @return number of categories
+*/
+  override def bins: Int = {
+numCategories
+  }
+
+  /** Increment count of category c
+*
+* @param c category whose count needs to be incremented
+*/
+  override def add(c: Double): Unit = {
+data.get(c) match{
--- End diff --

A space is needed between `match` and `{`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2458) Distributed Cache doesn't work with iterations.

2015-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14698613#comment-14698613
 ] 

ASF GitHub Bot commented on FLINK-2458:
---

Github user sachingoel0101 commented on a diff in the pull request:

https://github.com/apache/flink/pull/970#discussion_r37144300
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -897,7 +897,7 @@ class TaskManager(
 config.timeout,
 libCache,
 fileCache,
-runtimeInfo)
+new TaskRuntimeInfo(hostname, taskManagerConfig, 
tdd.getAttemptNumber))
--- End diff --

Ah. Yes. That makes sense. I will revert this and open a separate PR. 
Apologies.


 Distributed Cache doesn't work with iterations.
 ---

 Key: FLINK-2458
 URL: https://issues.apache.org/jira/browse/FLINK-2458
 Project: Flink
  Issue Type: Bug
Reporter: Sachin Goel
Assignee: Sachin Goel

 Distributed cache files cannot be accessed when working in Iterative 
 contexts. While constructing the context in {{AbstractIterativePactTask}}, 
 the distributed entries are not passed on to runtime context.
 My commit will add this and also unify the distributed cache system to work 
 everywhere, including in the Collection Environment.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2488][FLINK-2496] Expose Task Manager c...

2015-08-16 Thread sachingoel0101
GitHub user sachingoel0101 opened a pull request:

https://github.com/apache/flink/pull/1026

[FLINK-2488][FLINK-2496] Expose Task Manager configuration and Task attempt 
number to Runtime context

This PR fixes these issues:
1. [FLINK-2496]Expose Task Manager configuration to Runtime Context 
2. [FLINK-2488]Expose attempt number of task to Runtime Context
3. [FLINK-2524]Add getTaskNameWithSubtasks to Runtime Context

Depends on #970 for simplistic constructors for `RuntimeContext`.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sachingoel0101/flink flink-2488

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1026.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1026


commit a8d138519bfcf24fb63028b6ecae3e384a05b69d
Author: Sachin Goel sachingoel0...@gmail.com
Date:   2015-08-01T13:55:42Z

[FLINK-2458]Access distributed cache entries from Iteration contexts.
[FLINK-2449]Allow use of distributed cache from Collection Environments

commit 5fbdebe5742b26683a2755d80681cc505962fc02
Author: Sachin Goel sachingoel0...@gmail.com
Date:   2015-08-16T07:44:39Z

[FLINK-2496]Expose Task Manager configuration to Runtime Context
[FLINK-2488]Expose attempt number of task to Runtime Context
[FLINK-2524]Add getTaskNameWithSubtasks to Runtime Context




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [Flink-2030][ml]Data Set Statistics and Histog...

2015-08-16 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/861#discussion_r37144137
  
--- Diff: docs/libs/ml/statistics.md ---
@@ -0,0 +1,100 @@
+---
+mathjax: include
+htmlTitle: FlinkML - Statistics
+title: a href=../mlFlinkML/a - Statistics
+---
+!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+License); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+AS IS BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+--
+
+* This will be replaced by the TOC
+{:toc}
+
+## Description
+
+ The statistics utility provides features such as building histograms over 
data, determining
+ mean, variance, gini impurity, entropy etc. of data.
+
+## Methods
+
+ The Statistics utility provides two major functions: `createHistogram` 
and `dataStats`.
+
+### Creating a histogram
+
+ There are two types of histograms:
+   1. strongContinuous Histograms/strong: These histograms are formed 
on a data set `X:
+   DataSet[Double]`
+   when the values in `X` are from a continuous range. These histograms 
support
+   `quantile` and `sum`  operations. Here `quantile(q)` refers to a value 
$x_q$ such that $|x: x
+   \leq x_q| = q * |X|$. Further, `sum(s)` refers to the number of 
elements $x \leq s$, which can
+be construed as a cumulative probability value at $s$[Of course, 
iscaled/i probability].
--- End diff --

`i` tag can be replace by `*`. `iscaled/i` can be represented as 
`*scaled*`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [Flink-2030][ml]Data Set Statistics and Histog...

2015-08-16 Thread chiwanpark
Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/861#issuecomment-131520764
  
I just found that `Predef.assume` could be replaced by `assume`. Sorry for 
wrong guidance. Could you change `Predef.assume` to `assume`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2530]optimize equal() of AcknowledgeChe...

2015-08-16 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1024#issuecomment-131577837
  
Is this really in need of optimization? It is written failsafe, where it 
does not crash even if `state` does not handle null values properly. This is 
good, because it helps prevent future bugs. The performance cost is minimal.

Concerning performance:  This is an actor message send and processed 
asynchronously by the JobManager. These code paths should never saturate the 
CPU in any reasonable setup, and even the computation during the processing of 
this message is dominated by other parts (cache misses during hash table 
lookups). Checking more, this method is never even executed. It is there only 
for the sake of testing / debugability.

Please double check whether statements really need fixing. Eliminating 
checks for the sake of performance is really only necessary on the most 
performance critical code paths (the inner loops of the runtime operators and 
algorithms).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2458][FLINK-2449]Access distributed cac...

2015-08-16 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/970#issuecomment-131579342
  
Looks good, merging this!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2458) Distributed Cache doesn't work with iterations.

2015-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14698745#comment-14698745
 ] 

ASF GitHub Bot commented on FLINK-2458:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/970#issuecomment-131579342
  
Looks good, merging this!


 Distributed Cache doesn't work with iterations.
 ---

 Key: FLINK-2458
 URL: https://issues.apache.org/jira/browse/FLINK-2458
 Project: Flink
  Issue Type: Bug
Reporter: Sachin Goel
Assignee: Sachin Goel

 Distributed cache files cannot be accessed when working in Iterative 
 contexts. While constructing the context in {{AbstractIterativePactTask}}, 
 the distributed entries are not passed on to runtime context.
 My commit will add this and also unify the distributed cache system to work 
 everywhere, including in the Collection Environment.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-2525) Add configuration support in Storm-compatibility

2015-08-16 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated FLINK-2525:
---
Affects Version/s: (was: 0.8.1)

 Add configuration support in Storm-compatibility
 

 Key: FLINK-2525
 URL: https://issues.apache.org/jira/browse/FLINK-2525
 Project: Flink
  Issue Type: New Feature
  Components: flink-contrib
Reporter: fangfengbin
Assignee: fangfengbin





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2527] [gelly] Ensure that VertexUpdateF...

2015-08-16 Thread ggevay
GitHub user ggevay opened a pull request:

https://github.com/apache/flink/pull/1027

[FLINK-2527] [gelly] Ensure that VertexUpdateFunction.setNewVertexValue is 
called at most once

I implemented (1), with the check to enforce that it is called at most 
once. Unfortunately, I had to add exception specification to a number of 
methods, and users might also have to do this with already existing code. If 
you think this does not worth it, then I can remove the check.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ggevay/flink setNewVertexValueFix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1027.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1027


commit 9b514633d868b28d628785a0d099115134599cee
Author: Gabor Gevay gga...@gmail.com
Date:   2015-08-16T13:26:59Z

[FLINK-2527] [gelly] Ensure that VertexUpdateFunction.setNewVertexValue is 
called at most once per updateVertex




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2458) Distributed Cache doesn't work with iterations.

2015-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14698711#comment-14698711
 ] 

ASF GitHub Bot commented on FLINK-2458:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/970#issuecomment-131570065
  
We are indeed falling behind on merging pull requests, right now. Many 
committers are on vacation this month, and for the others, the large amount of 
pull requests is hard to keep up with, especially next to the work on our own 
issues.

Hope this will get better in a week or two.

I'll try to get a look at this very soon...


 Distributed Cache doesn't work with iterations.
 ---

 Key: FLINK-2458
 URL: https://issues.apache.org/jira/browse/FLINK-2458
 Project: Flink
  Issue Type: Bug
Reporter: Sachin Goel
Assignee: Sachin Goel

 Distributed cache files cannot be accessed when working in Iterative 
 contexts. While constructing the context in {{AbstractIterativePactTask}}, 
 the distributed entries are not passed on to runtime context.
 My commit will add this and also unify the distributed cache system to work 
 everywhere, including in the Collection Environment.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: Some updates for programming_guide.md

2015-08-16 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1019#issuecomment-131578024
  
This looks good, will merge this!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2314) Make Streaming File Sources Persistent

2015-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2314?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14698743#comment-14698743
 ] 

ASF GitHub Bot commented on FLINK-2314:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/997#issuecomment-131578723
  
Thanks for updating the pull request. I'll try to have a look at this in a 
bit!


 Make Streaming File Sources Persistent
 --

 Key: FLINK-2314
 URL: https://issues.apache.org/jira/browse/FLINK-2314
 Project: Flink
  Issue Type: Improvement
  Components: Streaming
Affects Versions: 0.9
Reporter: Stephan Ewen
Assignee: Sheetal Parade
  Labels: easyfix, starter

 Streaming File sources should participate in the checkpointing. They should 
 track the bytes they read from the file and checkpoint it.
 One can look at the sequence generating source function for an example of a 
 checkpointed source.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2458][FLINK-2449]Access distributed cac...

2015-08-16 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/970#issuecomment-131570065
  
We are indeed falling behind on merging pull requests, right now. Many 
committers are on vacation this month, and for the others, the large amount of 
pull requests is hard to keep up with, especially next to the work on our own 
issues.

Hope this will get better in a week or two.

I'll try to get a look at this very soon...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2462) Wrong exception reporting in streaming jobs

2015-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2462?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14698717#comment-14698717
 ] 

ASF GitHub Bot commented on FLINK-2462:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1017#discussion_r37146789
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
 ---
@@ -25,65 +25,68 @@
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.io.BlockingQueueBroker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class StreamIterationTailIN extends OneInputStreamTaskIN, IN {
 
-   private static final Logger LOG = 
LoggerFactory.getLogger(StreamIterationTail.class);
+   private static final Logger LOG = 
LoggerFactory.getLogger(StreamIterationHead.class);
--- End diff --

Yep, it is. Good catch!


 Wrong exception reporting in streaming jobs
 ---

 Key: FLINK-2462
 URL: https://issues.apache.org/jira/browse/FLINK-2462
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Affects Versions: 0.10
Reporter: Stephan Ewen
Assignee: Stephan Ewen
Priority: Blocker
 Fix For: 0.10


 When streaming tasks are fail and are canceled, they report a plethora of 
 followup exceptions.
 The batch operators have a clear model that makes sure that root causes are 
 reported, and followup exceptions are not reported. That makes debugging much 
 easier.
 A big part of that is to have a single consistent place that logs exceptions, 
 and that has a view of whether the operation is still running, or whether it 
 has been canceled.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2458][FLINK-2449]Access distributed cac...

2015-08-16 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/970#issuecomment-131570789
  
Aside from the comment above, this looks good. Would merge this, after the 
comment is addressed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2458) Distributed Cache doesn't work with iterations.

2015-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14698716#comment-14698716
 ] 

ASF GitHub Bot commented on FLINK-2458:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/970#issuecomment-131570789
  
Aside from the comment above, this looks good. Would merge this, after the 
comment is addressed.


 Distributed Cache doesn't work with iterations.
 ---

 Key: FLINK-2458
 URL: https://issues.apache.org/jira/browse/FLINK-2458
 Project: Flink
  Issue Type: Bug
Reporter: Sachin Goel
Assignee: Sachin Goel

 Distributed cache files cannot be accessed when working in Iterative 
 contexts. While constructing the context in {{AbstractIterativePactTask}}, 
 the distributed entries are not passed on to runtime context.
 My commit will add this and also unify the distributed cache system to work 
 everywhere, including in the Collection Environment.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2462] [streaming] Major cleanup of stre...

2015-08-16 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1017#discussion_r37146789
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
 ---
@@ -25,65 +25,68 @@
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.io.BlockingQueueBroker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class StreamIterationTailIN extends OneInputStreamTaskIN, IN {
 
-   private static final Logger LOG = 
LoggerFactory.getLogger(StreamIterationTail.class);
+   private static final Logger LOG = 
LoggerFactory.getLogger(StreamIterationHead.class);
--- End diff --

Yep, it is. Good catch!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2527) If a VertexUpdateFunction calls setNewVertexValue more than once, the MessagingFunction will only see the first value set

2015-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2527?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14698738#comment-14698738
 ] 

ASF GitHub Bot commented on FLINK-2527:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1027#issuecomment-131576936
  
I think you can make this non-API-breaking by simply throwing an 
`IllegalStateException`, which is a `RuntimeException` and therefore needs no 
part in the signature.


 If a VertexUpdateFunction calls setNewVertexValue more than once, the 
 MessagingFunction will only see the first value set
 -

 Key: FLINK-2527
 URL: https://issues.apache.org/jira/browse/FLINK-2527
 Project: Flink
  Issue Type: Bug
  Components: Gelly
Reporter: Gabor Gevay
Assignee: Gabor Gevay
 Fix For: 0.10, 0.9.1


 The problem is that if setNewVertexValue is called more than once, it sends 
 each new value to the out Collector, and these all end up in the workset, but 
 then the coGroups in the two descendants of MessagingUdfWithEdgeValues use 
 only the first value in the state Iterable. I see three ways to resolve this:
 1. Add it to the documentation that setNewVertexValue should only be called 
 once, and optionally add a check for this.
 2. In setNewVertexValue, do not send the newValue to the out Collector at 
 once, but only record it in outVal, and send the last recorded value after 
 updateVertex returns.
 3. Iterate over the entire Iterable in MessagingUdfWithEVsSimpleVV.coGroup 
 and MessagingUdfWithEVsVVWithDegrees.coGroup. (This would probably still need 
 some documentation addition.)
 I like 2. the best. What are your opinions?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2527] [gelly] Ensure that VertexUpdateF...

2015-08-16 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1027#issuecomment-131576936
  
I think you can make this non-API-breaking by simply throwing an 
`IllegalStateException`, which is a `RuntimeException` and therefore needs no 
part in the signature.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2527) If a VertexUpdateFunction calls setNewVertexValue more than once, the MessagingFunction will only see the first value set

2015-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2527?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14698670#comment-14698670
 ] 

ASF GitHub Bot commented on FLINK-2527:
---

GitHub user ggevay opened a pull request:

https://github.com/apache/flink/pull/1027

[FLINK-2527] [gelly] Ensure that VertexUpdateFunction.setNewVertexValue is 
called at most once

I implemented (1), with the check to enforce that it is called at most 
once. Unfortunately, I had to add exception specification to a number of 
methods, and users might also have to do this with already existing code. If 
you think this does not worth it, then I can remove the check.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ggevay/flink setNewVertexValueFix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1027.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1027


commit 9b514633d868b28d628785a0d099115134599cee
Author: Gabor Gevay gga...@gmail.com
Date:   2015-08-16T13:26:59Z

[FLINK-2527] [gelly] Ensure that VertexUpdateFunction.setNewVertexValue is 
called at most once per updateVertex




 If a VertexUpdateFunction calls setNewVertexValue more than once, the 
 MessagingFunction will only see the first value set
 -

 Key: FLINK-2527
 URL: https://issues.apache.org/jira/browse/FLINK-2527
 Project: Flink
  Issue Type: Bug
  Components: Gelly
Reporter: Gabor Gevay
Assignee: Gabor Gevay
 Fix For: 0.10, 0.9.1


 The problem is that if setNewVertexValue is called more than once, it sends 
 each new value to the out Collector, and these all end up in the workset, but 
 then the coGroups in the two descendants of MessagingUdfWithEdgeValues use 
 only the first value in the state Iterable. I see three ways to resolve this:
 1. Add it to the documentation that setNewVertexValue should only be called 
 once, and optionally add a check for this.
 2. In setNewVertexValue, do not send the newValue to the out Collector at 
 once, but only record it in outVal, and send the last recorded value after 
 updateVertex returns.
 3. Iterate over the entire Iterable in MessagingUdfWithEVsSimpleVV.coGroup 
 and MessagingUdfWithEVsVVWithDegrees.coGroup. (This would probably still need 
 some documentation addition.)
 I like 2. the best. What are your opinions?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2462] [streaming] Major cleanup of stre...

2015-08-16 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1017#discussion_r37146808
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
 ---
@@ -27,114 +26,65 @@
 import org.apache.flink.streaming.api.graph.StreamEdge;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TwoInputStreamTaskIN1, IN2, OUT extends StreamTaskOUT, 
TwoInputStreamOperatorIN1, IN2, OUT {
 
-   private static final Logger LOG = 
LoggerFactory.getLogger(TwoInputStreamTask.class);
-
private StreamTwoInputProcessorIN1, IN2 inputProcessor;
+   
+   private volatile boolean running = true;
 
@Override
-   public void registerInputOutput() {
-   try {
-   super.registerInputOutput();
+   public void init() throws Exception {
+   TypeSerializerIN1 inputDeserializer1 = 
configuration.getTypeSerializerIn1(userClassLoader);
+   TypeSerializerIN2 inputDeserializer2 = 
configuration.getTypeSerializerIn2(userClassLoader);

-   TypeSerializerIN1 inputDeserializer1 = 
configuration.getTypeSerializerIn1(userClassLoader);
-   TypeSerializerIN2 inputDeserializer2 = 
configuration.getTypeSerializerIn2(userClassLoader);
+   int numberOfInputs = configuration.getNumberOfInputs();

-   int numberOfInputs = configuration.getNumberOfInputs();
+   ArrayListInputGate inputList1 = new ArrayListInputGate();
+   ArrayListInputGate inputList2 = new ArrayListInputGate();

-   ArrayListInputGate inputList1 = new 
ArrayListInputGate();
-   ArrayListInputGate inputList2 = new 
ArrayListInputGate();
+   ListStreamEdge inEdges = 
configuration.getInPhysicalEdges(userClassLoader);

-   ListStreamEdge inEdges = 
configuration.getInPhysicalEdges(userClassLoader);
-   
-   for (int i = 0; i  numberOfInputs; i++) {
-   int inputType = inEdges.get(i).getTypeNumber();
-   InputGate reader = 
getEnvironment().getInputGate(i);
-   switch (inputType) {
-   case 1:
-   inputList1.add(reader);
-   break;
-   case 2:
-   inputList2.add(reader);
-   break;
-   default:
-   throw new 
RuntimeException(Invalid input type number:  + inputType);
-   }
+   for (int i = 0; i  numberOfInputs; i++) {
+   int inputType = inEdges.get(i).getTypeNumber();
+   InputGate reader = getEnvironment().getInputGate(i);
+   switch (inputType) {
+   case 1:
+   inputList1.add(reader);
+   break;
+   case 2:
--- End diff --

This was actually part of the original code - I did not modify it as part 
of this pull request.
As far as I see it, the `StreamEdge` code is part of the API, not the 
runtime. It may be adjusted as part of  #988 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2462) Wrong exception reporting in streaming jobs

2015-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2462?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14698718#comment-14698718
 ] 

ASF GitHub Bot commented on FLINK-2462:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1017#discussion_r37146808
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
 ---
@@ -27,114 +26,65 @@
 import org.apache.flink.streaming.api.graph.StreamEdge;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TwoInputStreamTaskIN1, IN2, OUT extends StreamTaskOUT, 
TwoInputStreamOperatorIN1, IN2, OUT {
 
-   private static final Logger LOG = 
LoggerFactory.getLogger(TwoInputStreamTask.class);
-
private StreamTwoInputProcessorIN1, IN2 inputProcessor;
+   
+   private volatile boolean running = true;
 
@Override
-   public void registerInputOutput() {
-   try {
-   super.registerInputOutput();
+   public void init() throws Exception {
+   TypeSerializerIN1 inputDeserializer1 = 
configuration.getTypeSerializerIn1(userClassLoader);
+   TypeSerializerIN2 inputDeserializer2 = 
configuration.getTypeSerializerIn2(userClassLoader);

-   TypeSerializerIN1 inputDeserializer1 = 
configuration.getTypeSerializerIn1(userClassLoader);
-   TypeSerializerIN2 inputDeserializer2 = 
configuration.getTypeSerializerIn2(userClassLoader);
+   int numberOfInputs = configuration.getNumberOfInputs();

-   int numberOfInputs = configuration.getNumberOfInputs();
+   ArrayListInputGate inputList1 = new ArrayListInputGate();
+   ArrayListInputGate inputList2 = new ArrayListInputGate();

-   ArrayListInputGate inputList1 = new 
ArrayListInputGate();
-   ArrayListInputGate inputList2 = new 
ArrayListInputGate();
+   ListStreamEdge inEdges = 
configuration.getInPhysicalEdges(userClassLoader);

-   ListStreamEdge inEdges = 
configuration.getInPhysicalEdges(userClassLoader);
-   
-   for (int i = 0; i  numberOfInputs; i++) {
-   int inputType = inEdges.get(i).getTypeNumber();
-   InputGate reader = 
getEnvironment().getInputGate(i);
-   switch (inputType) {
-   case 1:
-   inputList1.add(reader);
-   break;
-   case 2:
-   inputList2.add(reader);
-   break;
-   default:
-   throw new 
RuntimeException(Invalid input type number:  + inputType);
-   }
+   for (int i = 0; i  numberOfInputs; i++) {
+   int inputType = inEdges.get(i).getTypeNumber();
+   InputGate reader = getEnvironment().getInputGate(i);
+   switch (inputType) {
+   case 1:
+   inputList1.add(reader);
+   break;
+   case 2:
--- End diff --

This was actually part of the original code - I did not modify it as part 
of this pull request.
As far as I see it, the `StreamEdge` code is part of the API, not the 
runtime. It may be adjusted as part of  #988 


 Wrong exception reporting in streaming jobs
 ---

 Key: FLINK-2462
 URL: https://issues.apache.org/jira/browse/FLINK-2462
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Affects Versions: 0.10
Reporter: Stephan Ewen
Assignee: Stephan Ewen
Priority: Blocker
 Fix For: 0.10


 When streaming tasks are fail and are canceled, they report a plethora of 
 followup exceptions.
 The batch operators have a clear model that makes sure that root causes are 
 reported, and followup exceptions are not reported. That makes debugging much 
 easier.
 A big part of that is to have a single consistent place that logs exceptions, 
 and that has a view of whether the operation is still running, or whether it 
 has been canceled.



--
This message was sent by Atlassian JIRA

[jira] [Commented] (FLINK-2458) Distributed Cache doesn't work with iterations.

2015-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14698790#comment-14698790
 ] 

ASF GitHub Bot commented on FLINK-2458:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/970


 Distributed Cache doesn't work with iterations.
 ---

 Key: FLINK-2458
 URL: https://issues.apache.org/jira/browse/FLINK-2458
 Project: Flink
  Issue Type: Bug
Reporter: Sachin Goel
Assignee: Sachin Goel

 Distributed cache files cannot be accessed when working in Iterative 
 contexts. While constructing the context in {{AbstractIterativePactTask}}, 
 the distributed entries are not passed on to runtime context.
 My commit will add this and also unify the distributed cache system to work 
 everywhere, including in the Collection Environment.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2487) the array has out of bounds

2015-08-16 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14698793#comment-14698793
 ] 

Stephan Ewen commented on FLINK-2487:
-

Added a fix in 0a7cc02354abc985b92704729a8c12a856056398

(I suspect that the fix belonged to this issue. Hard to say without better 
description).

 the array has out of bounds
 ---

 Key: FLINK-2487
 URL: https://issues.apache.org/jira/browse/FLINK-2487
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Affects Versions: 0.8.1
Reporter: zhangrucong
Priority: Minor





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2478) The page “FlinkML - Machine Learning for Flink“ https://ci.apache.org/projects/flink/flink-docs-master/libs/ml/ contains a dead link

2015-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14698791#comment-14698791
 ] 

ASF GitHub Bot commented on FLINK-2478:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/1021


 The page “FlinkML - Machine Learning for Flink“  
 https://ci.apache.org/projects/flink/flink-docs-master/libs/ml/ contains a 
 dead link
 -

 Key: FLINK-2478
 URL: https://issues.apache.org/jira/browse/FLINK-2478
 Project: Flink
  Issue Type: Task
  Components: Documentation
Affects Versions: 0.10
Reporter: Slim Baltagi
Assignee: Till Rohrmann
Priority: Minor

 Note that FlinkML is currently not part of the binary distribution. See 
 linking with it for cluster execution here.
 'here' links to a dead link: 
 https://ci.apache.org/projects/flink/flink-docs-master/libs/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution
 The correct link is: 
 https://ci.apache.org/projects/flink/flink-docs-master/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2458][FLINK-2449]Access distributed cac...

2015-08-16 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/970


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2314] - Added Checkpointing to File Sou...

2015-08-16 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/997#issuecomment-131578723
  
Thanks for updating the pull request. I'll try to have a look at this in a 
bit!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-2525) Add configuration support in Storm-compatibility

2015-08-16 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated FLINK-2525:
---
Description: Spouts and Bolt are initialized by a call to `Spout.open(...)` 
and `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
be configure with user defined parameters. In order to support this feature, 
spout and bolt wrapper classes need to be extended to create a proper `Map` 
object. Furthermore, the clients need to be extended to take a `Map`, translate 
it into a Flink `Configuration` that is forwarded to the wrappers for proper 
initialization of the map.

 Add configuration support in Storm-compatibility
 

 Key: FLINK-2525
 URL: https://issues.apache.org/jira/browse/FLINK-2525
 Project: Flink
  Issue Type: New Feature
  Components: flink-contrib
Reporter: fangfengbin
Assignee: fangfengbin

 Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
 `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
 parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
 be configure with user defined parameters. In order to support this feature, 
 spout and bolt wrapper classes need to be extended to create a proper `Map` 
 object. Furthermore, the clients need to be extended to take a `Map`, 
 translate it into a Flink `Configuration` that is forwarded to the wrappers 
 for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2478]fix the array may have out of boun...

2015-08-16 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/1021


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2458][FLINK-2449]Access distributed cac...

2015-08-16 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/970#issuecomment-131570435
  
In the `CollectionExecutor`, can you skip creating the `ExecutiorService`? 
You can eagerly resolve the path and then put an already finished future into 
the map.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


  1   2   >