[jira] [Commented] (FLINK-5023) Add get() method in State interface

2016-11-09 Thread Xiaogang Shi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15653177#comment-15653177
 ] 

Xiaogang Shi commented on FLINK-5023:
-

[~aljoscha] [~StephanEwen] I have updated the PR. Now, `State` only provides a 
read-only accessor and a new interface called `UpdatableState` is added.

> Add get() method in State interface
> ---
>
> Key: FLINK-5023
> URL: https://issues.apache.org/jira/browse/FLINK-5023
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> Currently, the only method provided by the State interface is `clear()`. I 
> think we should provide another method called `get()` to return the 
> structured value (e.g., value, list, or map) under the current key. 
> In fact, the functionality of `get()` has already been implemented in all 
> types of states: e.g., `value()` in ValueState and `get()` in ListState. The 
> modification to the interface can better abstract these states.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5017) Introduce WatermarkStatus stream element to allow for temporarily idle streaming sources

2016-11-09 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15653044#comment-15653044
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-5017:


Yes I think that's better than {{WatermarkStatus}}, thanks. Will use 
{{StreamStatus}}.

> Introduce WatermarkStatus stream element to allow for temporarily idle 
> streaming sources
> 
>
> Key: FLINK-5017
> URL: https://issues.apache.org/jira/browse/FLINK-5017
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.2.0
>
> Attachments: operator_chain_with_multiple_network_outputs.png
>
>
> A {{WatermarkStatus}} element informs receiving operators whether or not they 
> should continue to expect watermarks from the sending operator. There are 2 
> kinds of status, namely {{IDLE}} and {{ACTIVE}}. Watermark status elements 
> are generated at the sources, and may be propagated through the operators of 
> the topology using {{Output#emitWatermarkStatus(WatermarkStatus)}}.
> Sources and downstream operators should emit either of the status elements 
> once it changes between "watermark-idle" and "watermark-active" states.
> A source is considered "watermark-idle" if it will not emit records for an 
> indefinite amount of time. This is the case, for example, for Flink's Kafka 
> Consumer, where sources might initially have no assigned partitions to read 
> from, or no records can be read from the assigned partitions. Once the source 
> detects that it will resume emitting data, it is considered 
> "watermark-active".
> Downstream operators with multiple inputs (ex. head operators of a 
> {{OneInputStreamTask}} or {{TwoInputStreamTask}}) should not wait for 
> watermarks from an upstream operator that is "watermark-idle" when deciding 
> whether or not to advance the operator's current watermark. When a downstream 
> operator determines that all upstream operators are "watermark-idle" (i.e. 
> when all input channels have received the watermark idle status element), 
> then the operator is considered to also be "watermark-idle", as it will 
> temporarily be unable to advance its own watermark. This is always the case 
> for operators that only read from a single upstream operator. Once an 
> operator is considered "watermark-idle", it should itself forward its idle 
> status to inform downstream operators. The operator is considered to be back 
> to "watermark-active" as soon as at least one of its upstream operators 
> resume to be "watermark-active" (i.e. when at least one input channel 
> receives the watermark active status element), and should also forward its 
> active status to inform downstream operators.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5024) Add SimpleStateDescriptor to clarify the concepts

2016-11-09 Thread Xiaogang Shi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5024?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15652901#comment-15652901
 ] 

Xiaogang Shi commented on FLINK-5024:
-

I am very poor at English :( But i think "Simple" is more often used as the 
opposite of "Compound". For example: simple interests and compound interests.  
"Primitive" is not that good because it is usually used to describe those BASIC 
elements which form the other things.

Maybe we need some help from native speakers lol



> Add SimpleStateDescriptor to clarify the concepts
> -
>
> Key: FLINK-5024
> URL: https://issues.apache.org/jira/browse/FLINK-5024
> Project: Flink
>  Issue Type: Improvement
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> Currently, StateDescriptors accept two type arguments : the first one is the 
> type of the created state and the second one is the type of the values in the 
> states. 
> The concepts however is a little confusing here because in ListStates, the 
> arguments passed to the StateDescriptors are the types of the list elements 
> instead of the lists. It also makes the implementation of MapStates difficult.
> I suggest not to put the type serializer in StateDescriptors, making 
> StateDescriptors independent of the data structures of the values. 
> A new type of StateDescriptor named SimpleStateDescriptor can be provided to 
> abstract those states (namely ValueState, ReducingState and FoldingState) 
> whose states are not composited. 
> The states (e.g. ListStates and MapStates) can implement their own 
> descriptors according to their data structures. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4692) Add tumbling group-windows for batch tables

2016-11-09 Thread Jark Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15652887#comment-15652887
 ] 

Jark Wu commented on FLINK-4692:


Hi guys, I moved the sliding window into FLINK-5047. And keep this issue only 
for tumbling window. I suggest to continue the discussion of sliding window 
implementation under FLINK-5047.

> Add tumbling group-windows for batch tables
> ---
>
> Key: FLINK-4692
> URL: https://issues.apache.org/jira/browse/FLINK-4692
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Timo Walther
>
> Add Tumble group-windows for batch tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4692) Add tumbling group-windows for batch tables

2016-11-09 Thread Jark Wu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4692?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu updated FLINK-4692:
---
Summary: Add tumbling group-windows for batch tables  (was: Add tumbling 
and sliding group-windows for batch tables)

> Add tumbling group-windows for batch tables
> ---
>
> Key: FLINK-4692
> URL: https://issues.apache.org/jira/browse/FLINK-4692
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Timo Walther
>
> Add Tumble group-windows for batch tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4692) Add tumbling and sliding group-windows for batch tables

2016-11-09 Thread Jark Wu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4692?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu updated FLINK-4692:
---
Description: Add Tumble group-windows for batch tables as described in 
[FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
   (was: Add Tumble and Slide group-windows for batch tables as described in 
[FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
 )

> Add tumbling and sliding group-windows for batch tables
> ---
>
> Key: FLINK-4692
> URL: https://issues.apache.org/jira/browse/FLINK-4692
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Timo Walther
>
> Add Tumble group-windows for batch tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5047) Add sliding group-windows for batch tables

2016-11-09 Thread Jark Wu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5047?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu updated FLINK-5047:
---
Summary: Add sliding group-windows for batch tables  (was: Add Sliding 
group-windows for batch tables)

> Add sliding group-windows for batch tables
> --
>
> Key: FLINK-5047
> URL: https://issues.apache.org/jira/browse/FLINK-5047
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Jark Wu
>
> Add Slide group-windows for batch tables as described in 
> [FLIP-11](https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations).
> There are two ways to implement sliding windows for batch:
> 1. replicate the output in order to assign keys for overlapping windows. This 
> is probably the more straight-forward implementation and supports any 
> aggregation function but blows up the data volume.
> 2. if the aggregation functions are combinable / pre-aggregatable, we can 
> also find the largest tumbling window size from which the sliding windows can 
> be assembled. This is basically the technique used to express sliding windows 
> with plain SQL (GROUP BY + OVER clauses). For a sliding window Slide(10 
> minutes, 2 minutes) this would mean to first compute aggregates of 
> non-overlapping (tumbling) 2 minute windows and assembling consecutively 5 of 
> these into a sliding window (could be done in a MapPartition with sorted 
> input). The implementation could be done as an optimizer rule to split the 
> sliding aggregate into a tumbling aggregate and a SQL WINDOW operator. Maybe 
> it makes sense to implement the WINDOW clause first and reuse this for 
> sliding windows.
> see FLINK-4692 for more discussion



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5047) Add sliding group-windows for batch tables

2016-11-09 Thread Jark Wu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5047?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu updated FLINK-5047:
---
Description: 
Add Slide group-windows for batch tables as described in 
[FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].

There are two ways to implement sliding windows for batch:
1. replicate the output in order to assign keys for overlapping windows. This 
is probably the more straight-forward implementation and supports any 
aggregation function but blows up the data volume.
2. if the aggregation functions are combinable / pre-aggregatable, we can also 
find the largest tumbling window size from which the sliding windows can be 
assembled. This is basically the technique used to express sliding windows with 
plain SQL (GROUP BY + OVER clauses). For a sliding window Slide(10 minutes, 2 
minutes) this would mean to first compute aggregates of non-overlapping 
(tumbling) 2 minute windows and assembling consecutively 5 of these into a 
sliding window (could be done in a MapPartition with sorted input). The 
implementation could be done as an optimizer rule to split the sliding 
aggregate into a tumbling aggregate and a SQL WINDOW operator. Maybe it makes 
sense to implement the WINDOW clause first and reuse this for sliding windows.

see FLINK-4692 for more discussion

  was:
Add Slide group-windows for batch tables as described in 
[FLIP-11](https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations).

There are two ways to implement sliding windows for batch:
1. replicate the output in order to assign keys for overlapping windows. This 
is probably the more straight-forward implementation and supports any 
aggregation function but blows up the data volume.
2. if the aggregation functions are combinable / pre-aggregatable, we can also 
find the largest tumbling window size from which the sliding windows can be 
assembled. This is basically the technique used to express sliding windows with 
plain SQL (GROUP BY + OVER clauses). For a sliding window Slide(10 minutes, 2 
minutes) this would mean to first compute aggregates of non-overlapping 
(tumbling) 2 minute windows and assembling consecutively 5 of these into a 
sliding window (could be done in a MapPartition with sorted input). The 
implementation could be done as an optimizer rule to split the sliding 
aggregate into a tumbling aggregate and a SQL WINDOW operator. Maybe it makes 
sense to implement the WINDOW clause first and reuse this for sliding windows.

see FLINK-4692 for more discussion


> Add sliding group-windows for batch tables
> --
>
> Key: FLINK-5047
> URL: https://issues.apache.org/jira/browse/FLINK-5047
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Jark Wu
>
> Add Slide group-windows for batch tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
> There are two ways to implement sliding windows for batch:
> 1. replicate the output in order to assign keys for overlapping windows. This 
> is probably the more straight-forward implementation and supports any 
> aggregation function but blows up the data volume.
> 2. if the aggregation functions are combinable / pre-aggregatable, we can 
> also find the largest tumbling window size from which the sliding windows can 
> be assembled. This is basically the technique used to express sliding windows 
> with plain SQL (GROUP BY + OVER clauses). For a sliding window Slide(10 
> minutes, 2 minutes) this would mean to first compute aggregates of 
> non-overlapping (tumbling) 2 minute windows and assembling consecutively 5 of 
> these into a sliding window (could be done in a MapPartition with sorted 
> input). The implementation could be done as an optimizer rule to split the 
> sliding aggregate into a tumbling aggregate and a SQL WINDOW operator. Maybe 
> it makes sense to implement the WINDOW clause first and reuse this for 
> sliding windows.
> see FLINK-4692 for more discussion



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5047) Add Sliding group-windows for batch tables

2016-11-09 Thread Jark Wu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5047?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu updated FLINK-5047:
---
Summary: Add Sliding group-windows for batch tables  (was: Add tumbling 
group-windows for batch tables)

> Add Sliding group-windows for batch tables
> --
>
> Key: FLINK-5047
> URL: https://issues.apache.org/jira/browse/FLINK-5047
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Jark Wu
>
> Add Slide group-windows for batch tables as described in 
> [FLIP-11](https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations).
> There are two ways to implement sliding windows for batch:
> 1. replicate the output in order to assign keys for overlapping windows. This 
> is probably the more straight-forward implementation and supports any 
> aggregation function but blows up the data volume.
> 2. if the aggregation functions are combinable / pre-aggregatable, we can 
> also find the largest tumbling window size from which the sliding windows can 
> be assembled. This is basically the technique used to express sliding windows 
> with plain SQL (GROUP BY + OVER clauses). For a sliding window Slide(10 
> minutes, 2 minutes) this would mean to first compute aggregates of 
> non-overlapping (tumbling) 2 minute windows and assembling consecutively 5 of 
> these into a sliding window (could be done in a MapPartition with sorted 
> input). The implementation could be done as an optimizer rule to split the 
> sliding aggregate into a tumbling aggregate and a SQL WINDOW operator. Maybe 
> it makes sense to implement the WINDOW clause first and reuse this for 
> sliding windows.
> see FLINK-4692 for more discussion



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5047) Add tumbling group-windows for batch tables

2016-11-09 Thread Jark Wu (JIRA)
Jark Wu created FLINK-5047:
--

 Summary: Add tumbling group-windows for batch tables
 Key: FLINK-5047
 URL: https://issues.apache.org/jira/browse/FLINK-5047
 Project: Flink
  Issue Type: Sub-task
Reporter: Jark Wu


Add Slide group-windows for batch tables as described in 
[FLIP-11](https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations).

There are two ways to implement sliding windows for batch:
1. replicate the output in order to assign keys for overlapping windows. This 
is probably the more straight-forward implementation and supports any 
aggregation function but blows up the data volume.
2. if the aggregation functions are combinable / pre-aggregatable, we can also 
find the largest tumbling window size from which the sliding windows can be 
assembled. This is basically the technique used to express sliding windows with 
plain SQL (GROUP BY + OVER clauses). For a sliding window Slide(10 minutes, 2 
minutes) this would mean to first compute aggregates of non-overlapping 
(tumbling) 2 minute windows and assembling consecutively 5 of these into a 
sliding window (could be done in a MapPartition with sorted input). The 
implementation could be done as an optimizer rule to split the sliding 
aggregate into a tumbling aggregate and a SQL WINDOW operator. Maybe it makes 
sense to implement the WINDOW clause first and reuse this for sliding windows.

see FLINK-4692 for more discussion



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4692) Add tumbling and sliding group-windows for batch tables

2016-11-09 Thread Jark Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15652869#comment-15652869
 ] 

Jark Wu commented on FLINK-4692:


Yes. I agree to move the sliding window to a separate issue. And we can discuss 
the implementation more detail in that issue.

Option 2 is a nicer way but only support combinable aggregation. Maybe we can 
implement approach-1 in the first version, and do improvement in the later 
issues. 

> Add tumbling and sliding group-windows for batch tables
> ---
>
> Key: FLINK-4692
> URL: https://issues.apache.org/jira/browse/FLINK-4692
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Timo Walther
>
> Add Tumble and Slide group-windows for batch tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2653: [FLINK-4469] [table] Add support for user defined table f...

2016-11-09 Thread wuchong
Github user wuchong commented on the issue:

https://github.com/apache/flink/pull/2653
  
Sounds great. Thanks Fabian .


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4469) Add support for user defined table function in Table API & SQL

2016-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15652697#comment-15652697
 ] 

ASF GitHub Bot commented on FLINK-4469:
---

Github user wuchong commented on the issue:

https://github.com/apache/flink/pull/2653
  
Sounds great. Thanks Fabian .


> Add support for user defined table function in Table API & SQL
> --
>
> Key: FLINK-4469
> URL: https://issues.apache.org/jira/browse/FLINK-4469
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Jark Wu
>
> Normal user-defined functions, such as concat(), take in a single input row 
> and output a single output row. In contrast, table-generating functions 
> transform a single input row to multiple output rows. It is very useful in 
> some cases, such as look up in HBase by rowkey and return one or more rows.
> Adding a user defined table function should:
> 1. inherit from UDTF class with specific generic type T
> 2. define one or more evel function. 
> NOTE: 
> 1. the eval method must be public and non-static.
> 2. the generic type T is the row type returned by table function. Because of 
> Java type erasure, we can’t extract T from the Iterable.
> 3. use {{collect(T)}} to emit table row
> 4. eval method can be overload. Blink will choose the best match eval method 
> to call according to parameter types and number.
> {code}
> public class Word {
>   public String word;
>   public Integer length;
> }
> public class SplitStringUDTF extends UDTF {
> public Iterable eval(String str) {
> if (str != null) {
> for (String s : str.split(",")) {
> collect(new Word(s, s.length()));
> }
> }
> }
> }
> // in SQL
> tableEnv.registerFunction("split", new SplitStringUDTF())
> tableEnv.sql("SELECT a, b, t.* FROM MyTable, LATERAL TABLE(split(c)) AS 
> t(w,l)")
> // in Java Table API
> tableEnv.registerFunction("split", new SplitStringUDTF())
> // rename split table columns to “w” and “l”
> table.crossApply("split(c) as (w, l)")
>  .select("a, b, w, l")
> // without renaming, we will use the origin field names in the POJO/case/...
> table.crossApply("split(c)")
>  .select("a, b, word, length")
> // in Scala Table API
> val split = new SplitStringUDTF()
> table.crossApply(split('c) as ('w, 'l))
>  .select('a, 'b, 'w, 'l)
> // outerApply for outer join to a UDTF
> table.outerApply(split('c))
>  .select('a, 'b, 'word, 'length)
> {code}
> See [1] for more information about UDTF design.
> [1] 
> https://docs.google.com/document/d/15iVc1781dxYWm3loVQlESYvMAxEzbbuVFPZWBYuY1Ek/edit#



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-09 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r87314969
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
 ---
@@ -152,21 +153,40 @@ abstract class TableEnvironment(val config: 
TableConfig) {
   protected def getBuiltInRuleSet: RuleSet
 
   /**
-* Registers a [[UserDefinedFunction]] under a unique name. Replaces 
already existing
+* Registers a [[ScalarFunction]] under a unique name. Replaces already 
existing
 * user-defined functions under this name.
 */
-  def registerFunction(name: String, function: UserDefinedFunction): Unit 
= {
-function match {
-  case sf: ScalarFunction =>
-// register in Table API
-functionCatalog.registerFunction(name, function.getClass)
+  def registerFunction(name: String, function: ScalarFunction): Unit = {
+// register in Table API
+functionCatalog.registerFunction(name, function.getClass)
 
-// register in SQL API
-functionCatalog.registerSqlFunction(sf.getSqlFunction(name, 
typeFactory))
+// register in SQL API
+functionCatalog.registerSqlFunction(function.getSqlFunction(name, 
typeFactory))
+  }
+
+  /**
+* Registers a [[TableFunction]] under a unique name. Replaces already 
existing
+* user-defined functions under this name.
+*/
+  private[flink] def registerTableFunctionInternal[T: TypeInformation](
+name: String, tf: TableFunction[T]): Unit = {
 
-  case _ =>
-throw new TableException("Unsupported user-defined function type.")
+val typeInfo: TypeInformation[_] = if (tf.getResultType != null) {
+  tf.getResultType
+} else {
+  implicitly[TypeInformation[T]]
 }
+
+val (fieldNames, fieldIndexes) = 
UserDefinedFunctionUtils.getFieldInfo(typeInfo)
--- End diff --

that’s good


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4469) Add support for user defined table function in Table API & SQL

2016-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15652654#comment-15652654
 ] 

ASF GitHub Bot commented on FLINK-4469:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r87314969
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
 ---
@@ -152,21 +153,40 @@ abstract class TableEnvironment(val config: 
TableConfig) {
   protected def getBuiltInRuleSet: RuleSet
 
   /**
-* Registers a [[UserDefinedFunction]] under a unique name. Replaces 
already existing
+* Registers a [[ScalarFunction]] under a unique name. Replaces already 
existing
 * user-defined functions under this name.
 */
-  def registerFunction(name: String, function: UserDefinedFunction): Unit 
= {
-function match {
-  case sf: ScalarFunction =>
-// register in Table API
-functionCatalog.registerFunction(name, function.getClass)
+  def registerFunction(name: String, function: ScalarFunction): Unit = {
+// register in Table API
+functionCatalog.registerFunction(name, function.getClass)
 
-// register in SQL API
-functionCatalog.registerSqlFunction(sf.getSqlFunction(name, 
typeFactory))
+// register in SQL API
+functionCatalog.registerSqlFunction(function.getSqlFunction(name, 
typeFactory))
+  }
+
+  /**
+* Registers a [[TableFunction]] under a unique name. Replaces already 
existing
+* user-defined functions under this name.
+*/
+  private[flink] def registerTableFunctionInternal[T: TypeInformation](
+name: String, tf: TableFunction[T]): Unit = {
 
-  case _ =>
-throw new TableException("Unsupported user-defined function type.")
+val typeInfo: TypeInformation[_] = if (tf.getResultType != null) {
+  tf.getResultType
+} else {
+  implicitly[TypeInformation[T]]
 }
+
+val (fieldNames, fieldIndexes) = 
UserDefinedFunctionUtils.getFieldInfo(typeInfo)
--- End diff --

that’s good


> Add support for user defined table function in Table API & SQL
> --
>
> Key: FLINK-4469
> URL: https://issues.apache.org/jira/browse/FLINK-4469
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Jark Wu
>
> Normal user-defined functions, such as concat(), take in a single input row 
> and output a single output row. In contrast, table-generating functions 
> transform a single input row to multiple output rows. It is very useful in 
> some cases, such as look up in HBase by rowkey and return one or more rows.
> Adding a user defined table function should:
> 1. inherit from UDTF class with specific generic type T
> 2. define one or more evel function. 
> NOTE: 
> 1. the eval method must be public and non-static.
> 2. the generic type T is the row type returned by table function. Because of 
> Java type erasure, we can’t extract T from the Iterable.
> 3. use {{collect(T)}} to emit table row
> 4. eval method can be overload. Blink will choose the best match eval method 
> to call according to parameter types and number.
> {code}
> public class Word {
>   public String word;
>   public Integer length;
> }
> public class SplitStringUDTF extends UDTF {
> public Iterable eval(String str) {
> if (str != null) {
> for (String s : str.split(",")) {
> collect(new Word(s, s.length()));
> }
> }
> }
> }
> // in SQL
> tableEnv.registerFunction("split", new SplitStringUDTF())
> tableEnv.sql("SELECT a, b, t.* FROM MyTable, LATERAL TABLE(split(c)) AS 
> t(w,l)")
> // in Java Table API
> tableEnv.registerFunction("split", new SplitStringUDTF())
> // rename split table columns to “w” and “l”
> table.crossApply("split(c) as (w, l)")
>  .select("a, b, w, l")
> // without renaming, we will use the origin field names in the POJO/case/...
> table.crossApply("split(c)")
>  .select("a, b, word, length")
> // in Scala Table API
> val split = new SplitStringUDTF()
> table.crossApply(split('c) as ('w, 'l))
>  .select('a, 'b, 'w, 'l)
> // outerApply for outer join to a UDTF
> table.outerApply(split('c))
>  .select('a, 'b, 'word, 'length)
> {code}
> See [1] for more information about UDTF design.
> [1] 
> https://docs.google.com/document/d/15iVc1781dxYWm3loVQlESYvMAxEzbbuVFPZWBYuY1Ek/edit#



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5046) Avoid redundant serialization when creating the TaskDeploymentDescriptor

2016-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15652552#comment-15652552
 ] 

ASF GitHub Bot commented on FLINK-5046:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/2779

[FLINK-5046] [tdd] Preserialize TaskDeploymentDescriptor information

In order to speed up the serialization of the TaskDeploymentDescriptor we 
can pre serialize
all information which stays the same for all TaskDeploymentDescriptors. The 
information which
is static for a TDD is the job related information contained in the 
ExecutionGraph and the
operator/task related information stored in the ExecutionJobVertex.

In order to pre serialize this information, this PR introduces the 
JobInformation class
and the TaskInformation class which are stored in serialized form in the 
ExecutionGraph
and the ExecutionJobVertex, respectively.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink 
eagerStreamConfigSerialization

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2779.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2779


commit fb7621a5a5023595a89d7e92562b503ec2a039e5
Author: Till Rohrmann 
Date:   2016-11-09T18:11:36Z

[FLINK-5046] [tdd] Preserialize TaskDeploymentDescriptor information

In order to speed up the serialization of the TaskDeploymentDescriptor we 
can pre serialize
all information which stays the same for all TaskDeploymentDescriptors. The 
information which
is static for a TDD is the job related information contained in the 
ExecutionGraph and the
operator/task related information stored in the ExecutionJobVertex.

In order to pre serialize this information, this PR introduces the 
JobInformation class
and the TaskInformration class which are stored in serialized form in the 
ExecutionGraph
and the ExecutionJobVertex, respectively.




> Avoid redundant serialization when creating the TaskDeploymentDescriptor
> 
>
> Key: FLINK-5046
> URL: https://issues.apache.org/jira/browse/FLINK-5046
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
> Fix For: 1.2.0, 1.1.4
>
>
> When creating the {{TaskDeploymentDescriptor}} we extract information from 
> the {{ExecutionGraph}} which is defined job-wide and from the 
> {{ExecutionJobVertex}} which is defined operator-wide. The extracted 
> information will be serialized for every subtask even though it stays the 
> same. 
> As an improvement, we can serialize this information once and give the 
> serialized byte array to the {{TaskDeploymentDescriptor}}. This will reduce 
> the serialization work Flink has to do when deploying sub tasks.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2779: [FLINK-5046] [tdd] Preserialize TaskDeploymentDesc...

2016-11-09 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/2779

[FLINK-5046] [tdd] Preserialize TaskDeploymentDescriptor information

In order to speed up the serialization of the TaskDeploymentDescriptor we 
can pre serialize
all information which stays the same for all TaskDeploymentDescriptors. The 
information which
is static for a TDD is the job related information contained in the 
ExecutionGraph and the
operator/task related information stored in the ExecutionJobVertex.

In order to pre serialize this information, this PR introduces the 
JobInformation class
and the TaskInformation class which are stored in serialized form in the 
ExecutionGraph
and the ExecutionJobVertex, respectively.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink 
eagerStreamConfigSerialization

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2779.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2779


commit fb7621a5a5023595a89d7e92562b503ec2a039e5
Author: Till Rohrmann 
Date:   2016-11-09T18:11:36Z

[FLINK-5046] [tdd] Preserialize TaskDeploymentDescriptor information

In order to speed up the serialization of the TaskDeploymentDescriptor we 
can pre serialize
all information which stays the same for all TaskDeploymentDescriptors. The 
information which
is static for a TDD is the job related information contained in the 
ExecutionGraph and the
operator/task related information stored in the ExecutionJobVertex.

In order to pre serialize this information, this PR introduces the 
JobInformation class
and the TaskInformration class which are stored in serialized form in the 
ExecutionGraph
and the ExecutionJobVertex, respectively.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-5046) Avoid redundant serialization when creating the TaskDeploymentDescriptor

2016-11-09 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-5046:


 Summary: Avoid redundant serialization when creating the 
TaskDeploymentDescriptor
 Key: FLINK-5046
 URL: https://issues.apache.org/jira/browse/FLINK-5046
 Project: Flink
  Issue Type: Improvement
  Components: Distributed Coordination
Affects Versions: 1.1.3, 1.2.0
Reporter: Till Rohrmann
Assignee: Till Rohrmann
 Fix For: 1.2.0, 1.1.4


When creating the {{TaskDeploymentDescriptor}} we extract information from the 
{{ExecutionGraph}} which is defined job-wide and from the 
{{ExecutionJobVertex}} which is defined operator-wide. The extracted 
information will be serialized for every subtask even though it stays the same. 

As an improvement, we can serialize this information once and give the 
serialized byte array to the {{TaskDeploymentDescriptor}}. This will reduce the 
serialization work Flink has to do when deploying sub tasks.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4469) Add support for user defined table function in Table API & SQL

2016-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15652469#comment-15652469
 ] 

ASF GitHub Bot commented on FLINK-4469:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/2653
  
Hi @wuchong, I did a first high-level pass over the PR. From what I've seen 
it looks really good and I do not expect that major changes are necessary . 
Will do a more thorough review in the next days.

Thanks, Fabian


> Add support for user defined table function in Table API & SQL
> --
>
> Key: FLINK-4469
> URL: https://issues.apache.org/jira/browse/FLINK-4469
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Jark Wu
>
> Normal user-defined functions, such as concat(), take in a single input row 
> and output a single output row. In contrast, table-generating functions 
> transform a single input row to multiple output rows. It is very useful in 
> some cases, such as look up in HBase by rowkey and return one or more rows.
> Adding a user defined table function should:
> 1. inherit from UDTF class with specific generic type T
> 2. define one or more evel function. 
> NOTE: 
> 1. the eval method must be public and non-static.
> 2. the generic type T is the row type returned by table function. Because of 
> Java type erasure, we can’t extract T from the Iterable.
> 3. use {{collect(T)}} to emit table row
> 4. eval method can be overload. Blink will choose the best match eval method 
> to call according to parameter types and number.
> {code}
> public class Word {
>   public String word;
>   public Integer length;
> }
> public class SplitStringUDTF extends UDTF {
> public Iterable eval(String str) {
> if (str != null) {
> for (String s : str.split(",")) {
> collect(new Word(s, s.length()));
> }
> }
> }
> }
> // in SQL
> tableEnv.registerFunction("split", new SplitStringUDTF())
> tableEnv.sql("SELECT a, b, t.* FROM MyTable, LATERAL TABLE(split(c)) AS 
> t(w,l)")
> // in Java Table API
> tableEnv.registerFunction("split", new SplitStringUDTF())
> // rename split table columns to “w” and “l”
> table.crossApply("split(c) as (w, l)")
>  .select("a, b, w, l")
> // without renaming, we will use the origin field names in the POJO/case/...
> table.crossApply("split(c)")
>  .select("a, b, word, length")
> // in Scala Table API
> val split = new SplitStringUDTF()
> table.crossApply(split('c) as ('w, 'l))
>  .select('a, 'b, 'w, 'l)
> // outerApply for outer join to a UDTF
> table.outerApply(split('c))
>  .select('a, 'b, 'word, 'length)
> {code}
> See [1] for more information about UDTF design.
> [1] 
> https://docs.google.com/document/d/15iVc1781dxYWm3loVQlESYvMAxEzbbuVFPZWBYuY1Ek/edit#



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2653: [FLINK-4469] [table] Add support for user defined table f...

2016-11-09 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/2653
  
Hi @wuchong, I did a first high-level pass over the PR. From what I've seen 
it looks really good and I do not expect that major changes are necessary 👍. 
Will do a more thorough review in the next days.

Thanks, Fabian


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2740: [FLINK-4964] [ml]

2016-11-09 Thread tfournier314
Github user tfournier314 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2740#discussion_r87295380
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StringIndexer.scala
 ---
@@ -0,0 +1,108 @@
+package org.apache.flink.ml.preprocessing
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.extensions.acceptPartialFunctions
+import org.apache.flink.api.scala.utils._
+import org.apache.flink.ml.common.{Parameter, ParameterMap}
+import org.apache.flink.ml.pipeline.{FitOperation, 
TransformDataSetOperation, Transformer}
+import org.apache.flink.ml.preprocessing.StringIndexer.HandleInvalid
+
+import scala.collection.immutable.Seq
+
+/**
+  * String Indexer
+  */
+class StringIndexer extends Transformer[StringIndexer] {
+
+  private[preprocessing] var metricsOption: Option[DataSet[(String, 
Long)]] = None
+
+
+  def setHandleInvalid(value: String): this.type ={
+parameters.add( HandleInvalid, value )
+this
+  }
+
+}
+
+object StringIndexer {
+
+  case object HandleInvalid extends Parameter[String] {
+val defaultValue: Option[String] = Some( "skip" )
+  }
+
+  //  Factory methods 
==
+
+  def apply(): StringIndexer ={
+new StringIndexer( )
+  }
+
+  // == Operations 
=
+
+  /**
+* Trains [[StringIndexer]] by learning the count of each string in the 
input DataSet.
+*/
+
+  implicit def fitStringIndexer ={
+new FitOperation[StringIndexer, String] {
+  def fit(instance: StringIndexer, fitParameters: ParameterMap, input: 
DataSet[String]):Unit ={
+val metrics = extractIndices( input )
+instance.metricsOption = Some( metrics )
+  }
+}
+  }
+
+  private def extractIndices(input: DataSet[String]): DataSet[(String, 
Long)] = {
+
+val mapping = input
+  .mapWith( s => (s, 1) )
+  .groupBy( 0 )
+  .reduce( (a, b) => (a._1, a._2 + b._2) )
+  .partitionByRange( 1 )
--- End diff --

Indeed, I need to do a global sort, because mapping is a sorted 
DataSet[(String,Long)] of (labels,index), where the most frequent item has 
index = 0.

I need to sort a dataSet of (labels,frequency), then zipWithIndex to get 
the associated index.

I've just realised that sortPartition() will only sort my partitions 
locally, so how can I achieve a global sort this way ? 





---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4964) FlinkML - Add StringIndexer

2016-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15652290#comment-15652290
 ] 

ASF GitHub Bot commented on FLINK-4964:
---

Github user tfournier314 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2740#discussion_r87295380
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StringIndexer.scala
 ---
@@ -0,0 +1,108 @@
+package org.apache.flink.ml.preprocessing
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.extensions.acceptPartialFunctions
+import org.apache.flink.api.scala.utils._
+import org.apache.flink.ml.common.{Parameter, ParameterMap}
+import org.apache.flink.ml.pipeline.{FitOperation, 
TransformDataSetOperation, Transformer}
+import org.apache.flink.ml.preprocessing.StringIndexer.HandleInvalid
+
+import scala.collection.immutable.Seq
+
+/**
+  * String Indexer
+  */
+class StringIndexer extends Transformer[StringIndexer] {
+
+  private[preprocessing] var metricsOption: Option[DataSet[(String, 
Long)]] = None
+
+
+  def setHandleInvalid(value: String): this.type ={
+parameters.add( HandleInvalid, value )
+this
+  }
+
+}
+
+object StringIndexer {
+
+  case object HandleInvalid extends Parameter[String] {
+val defaultValue: Option[String] = Some( "skip" )
+  }
+
+  //  Factory methods 
==
+
+  def apply(): StringIndexer ={
+new StringIndexer( )
+  }
+
+  // == Operations 
=
+
+  /**
+* Trains [[StringIndexer]] by learning the count of each string in the 
input DataSet.
+*/
+
+  implicit def fitStringIndexer ={
+new FitOperation[StringIndexer, String] {
+  def fit(instance: StringIndexer, fitParameters: ParameterMap, input: 
DataSet[String]):Unit ={
+val metrics = extractIndices( input )
+instance.metricsOption = Some( metrics )
+  }
+}
+  }
+
+  private def extractIndices(input: DataSet[String]): DataSet[(String, 
Long)] = {
+
+val mapping = input
+  .mapWith( s => (s, 1) )
+  .groupBy( 0 )
+  .reduce( (a, b) => (a._1, a._2 + b._2) )
+  .partitionByRange( 1 )
--- End diff --

Indeed, I need to do a global sort, because mapping is a sorted 
DataSet[(String,Long)] of (labels,index), where the most frequent item has 
index = 0.

I need to sort a dataSet of (labels,frequency), then zipWithIndex to get 
the associated index.

I've just realised that sortPartition() will only sort my partitions 
locally, so how can I achieve a global sort this way ? 





> FlinkML - Add StringIndexer
> ---
>
> Key: FLINK-4964
> URL: https://issues.apache.org/jira/browse/FLINK-4964
> Project: Flink
>  Issue Type: New Feature
>Reporter: Thomas FOURNIER
>Priority: Minor
>
> Add StringIndexer as described here:
> http://spark.apache.org/docs/latest/ml-features.html#stringindexer
> This will be added in package preprocessing of FlinkML



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3930) Implement Service-Level Authorization

2016-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15652272#comment-15652272
 ] 

ASF GitHub Bot commented on FLINK-3930:
---

Github user vijikarthi commented on the issue:

https://github.com/apache/flink/pull/2425
  
@StephanEwen @mxm - Could you please review the proposed change and let me 
know if you are okay with it.


> Implement Service-Level Authorization
> -
>
> Key: FLINK-3930
> URL: https://issues.apache.org/jira/browse/FLINK-3930
> Project: Flink
>  Issue Type: New Feature
>  Components: Security
>Reporter: Eron Wright 
>Assignee: Vijay Srinivasaraghavan
>  Labels: security
>   Original Estimate: 672h
>  Remaining Estimate: 672h
>
> _This issue is part of a series of improvements detailed in the [Secure Data 
> Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing]
>  design doc._
> Service-level authorization is the initial authorization mechanism to ensure 
> clients (or servers) connecting to the Flink cluster are authorized to do so. 
>   The purpose is to prevent a cluster from being used by an unauthorized 
> user, whether to execute jobs, disrupt cluster functionality, or gain access 
> to secrets stored within the cluster.
> Implement service-level authorization as described in the design doc.
> - Introduce a shared secret cookie
> - Enable Akka security cookie
> - Implement data transfer authentication
> - Secure the web dashboard



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2425: FLINK-3930 Added shared secret based authorization for Fl...

2016-11-09 Thread vijikarthi
Github user vijikarthi commented on the issue:

https://github.com/apache/flink/pull/2425
  
@StephanEwen @mxm - Could you please review the proposed change and let me 
know if you are okay with it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Assigned] (FLINK-4648) Implement bipartite graph generators

2016-11-09 Thread Ivan Mushketyk (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4648?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ivan Mushketyk reassigned FLINK-4648:
-

Assignee: Ivan Mushketyk

> Implement bipartite graph generators
> 
>
> Key: FLINK-4648
> URL: https://issues.apache.org/jira/browse/FLINK-4648
> Project: Flink
>  Issue Type: Sub-task
>  Components: Gelly
>Reporter: Ivan Mushketyk
>Assignee: Ivan Mushketyk
>
> Implement generators for bipartite graphs.
> Should implement at least:
> * *BipartiteGraphGenerator* (maybe requires a better name) that will generate 
> a bipartite graph where every vertex of one set is connected only to some 
> vertices  from another set
> * *CompleteBipartiteGraphGenerator* that will generate a graph where every 
> vertex of one set is conneted to every vertex of another set



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2774: [hotfix] PartitionerITCase replace comparing Integers by ...

2016-11-09 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/2774
  
Thanks for the fix @BorisOsipov 
+1 to merge


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2775: [hotfix] Fix wrong arguments order in assert

2016-11-09 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/2775
  
Thanks for the fix! 
+1 to merge


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-5045) Latency markers emitted after output buffer pool destroyed

2016-11-09 Thread Ufuk Celebi (JIRA)
Ufuk Celebi created FLINK-5045:
--

 Summary: Latency markers emitted after output buffer pool destroyed
 Key: FLINK-5045
 URL: https://issues.apache.org/jira/browse/FLINK-5045
 Project: Flink
  Issue Type: Improvement
Reporter: Ufuk Celebi


In a log of test runs I saw the following warning:

{code}
18:01:42,873 WARN  
org.apache.flink.streaming.api.operators.AbstractStreamOperator  - Error while 
emitting latency marker
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitLatencyMarker(OperatorChain.java:386)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.emitLatencyMarker(OperatorChain.java:449)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitLatencyMarker(AbstractStreamOperator.java:740)
at 
org.apache.flink.streaming.api.operators.StreamSource$LatencyMarksEmitter$1.run(StreamSource.java:134)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Buffer pool is destroyed.
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitLatencyMarker(RecordWriterOutput.java:99)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitLatencyMarker(AbstractStreamOperator.java:740)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.reportOrForwardLatencyMarker(AbstractStreamOperator.java:600)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processLatencyMarker(AbstractStreamOperator.java:582)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitLatencyMarker(OperatorChain.java:383)
... 10 more
Caused by: java.lang.IllegalStateException: Buffer pool is destroyed.
at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:149)
at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:138)
at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:126)
at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.randomEmit(RecordWriter.java:102)
at 
org.apache.flink.streaming.runtime.io.StreamRecordWriter.randomEmit(StreamRecordWriter.java:104)
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitLatencyMarker(RecordWriterOutput.java:96)
... 14 more
{code}

See {{2.log}} in 
https://s3.amazonaws.com/flink-logs-us/travis-artifacts/uce/flink/1205/1205.8.tar.gz
 (part of build https://travis-ci.org/uce/flink/jobs/174550925, which failed 
for another reason).

The warning hint at a problem with the life cycle of the latency markers.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3030) Enhance Dashboard to show Execution Attempts

2016-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15652165#comment-15652165
 ] 

ASF GitHub Bot commented on FLINK-3030:
---

Github user mushketyk commented on the issue:

https://github.com/apache/flink/pull/2448
  
@rmetzger @iampeter 
I've rebased the code and updated it according to your suggestions.


> Enhance Dashboard to show Execution Attempts
> 
>
> Key: FLINK-3030
> URL: https://issues.apache.org/jira/browse/FLINK-3030
> Project: Flink
>  Issue Type: Improvement
>  Components: Webfrontend
>Affects Versions: 0.10.0
>Reporter: Stephan Ewen
>Assignee: Ivan Mushketyk
> Fix For: 1.0.0
>
>
> Currently, the web dashboard shows only the latest execution attempt. We 
> should make all execution attempts and their accumulators available for 
> inspection.
> The REST monitoring API supports this, so it should be a change only to the 
> frontend part.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2448: [FLINK-3030][web frontend] Enhance dashboard to show exec...

2016-11-09 Thread mushketyk
Github user mushketyk commented on the issue:

https://github.com/apache/flink/pull/2448
  
@rmetzger @iampeter 
I've rebased the code and updated it according to your suggestions.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3848) Add ProjectableTableSource interface and translation rule

2016-11-09 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15652132#comment-15652132
 ] 

Fabian Hueske commented on FLINK-3848:
--

Yes, you're both right. The optimizer should identify that the TableSource 
supports projection push-down and push the projection into the TableSource. The 
user should not have to interact with the TableSource directly (apart from 
registering it as a Table in the TableEnvironment). 

IIRC, I had some problems specify the optimizer rules when I tried to implement 
this feature. I saw [this 
mail|https://lists.apache.org/thread.html/5896f4e834e976f146f280d279bf24c111c32476d96ae48d0f3c0d25@%3Cdev.calcite.apache.org%3E]
 a few days ago on the Calcite dev list. Maybe it helps to define the rule.

> Add ProjectableTableSource interface and translation rule
> -
>
> Key: FLINK-3848
> URL: https://issues.apache.org/jira/browse/FLINK-3848
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Anton Solovev
>
> Add a {{ProjectableTableSource}} interface for {{TableSource}} implementation 
> that support projection push-down.
> The interface could look as follows
> {code}
> def trait ProjectableTableSource {
>   def setProjection(fields: Array[String]): Unit
> }
> {code}
> In addition we need Calcite rules to push a projection into a TableScan that 
> refers to a {{ProjectableTableSource}}. We might need to tweak the cost model 
> as well to push the optimizer in the right direction.
> Moreover, the {{CsvTableSource}} could be extended to implement 
> {{ProjectableTableSource}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3613) Add standard deviation, mean, variance to list of Aggregations

2016-11-09 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15652113#comment-15652113
 ] 

Fabian Hueske commented on FLINK-3613:
--

Hi [~anmu], 
this issue proposes to add more built-in aggregation functions to the DataSet 
API. 
Since parts of the Table API are built on the DataSet API, such a feature could 
in principle be used to implement for instance also stddev for batch tables.

However, this would only help for batch tables so we would also need an 
implementation for streaming tables. Also, there are quite a few challenges 
when implementing these aggregation functions for the DataSet API. I think 
Stephan had a good point, when he asked whether these advanced functions would 
be better suited for the Table API which FLINK-4604 is all about.

So, I would rather opt to close this issue in favor of FLINK-4604.

> Add standard deviation, mean, variance to list of Aggregations
> --
>
> Key: FLINK-3613
> URL: https://issues.apache.org/jira/browse/FLINK-3613
> Project: Flink
>  Issue Type: Improvement
>Reporter: Todd Lisonbee
>Priority: Minor
> Attachments: DataSet-Aggregation-Design-March2016-v1.txt
>
>
> Implement standard deviation, mean, variance for 
> org.apache.flink.api.java.aggregation.Aggregations
> Ideally implementation should be single pass and numerically stable.
> References:
> "Scalable and Numerically Stable Descriptive Statistics in SystemML", Tian et 
> al, International Conference on Data Engineering 2012
> http://dl.acm.org/citation.cfm?id=2310392
> "The Kahan summation algorithm (also known as compensated summation) reduces 
> the numerical errors that occur when adding a sequence of finite precision 
> floating point numbers. Numerical errors arise due to truncation and 
> rounding. These errors can lead to numerical instability when calculating 
> variance."
> https://en.wikipedia.org/wiki/Kahan_summation_algorithm



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4692) Add tumbling and sliding group-windows for batch tables

2016-11-09 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15652063#comment-15652063
 ] 

Fabian Hueske commented on FLINK-4692:
--

Hi [~jark], I see two ways to implement sliding windows for batch:

1. replicate the output in order to assign keys for overlapping windows. This 
is probably the more straight-forward implementation and supports any 
aggregation function but blows up the data volume.
2. if the aggregation functions are combinable / pre-aggregatable, we can also 
find the largest tumbling window size from which the sliding windows can be 
assembled. This is basically the technique used to express sliding windows with 
plain SQL (GROUP BY + OVER clauses). For a sliding window {{Slide(10 minutes, 2 
minutes)}} this would mean to first compute aggregates of non-overlapping 
(tumbling) 2 minute windows and assembling consecutively 5 of these into a 
sliding window (could be done in a MapPartition with sorted input). The 
implementation could be done as an optimizer rule to split the sliding 
aggregate into a tumbling aggregate and a SQL WINDOW operator. Maybe it makes 
sense to implement the WINDOW clause first and reuse this for sliding windows.

OK, given the complexity for sliding group-windows, I think it makes sense to 
split this issue into tumbling and sliding windows.
What do you think [~jark]?

> Add tumbling and sliding group-windows for batch tables
> ---
>
> Key: FLINK-4692
> URL: https://issues.apache.org/jira/browse/FLINK-4692
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Timo Walther
>
> Add Tumble and Slide group-windows for batch tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4832) Count/Sum 0 elements

2016-11-09 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15652028#comment-15652028
 ] 

Fabian Hueske commented on FLINK-4832:
--

It is true that Flink does not support {{null}} fields in record types, i.e., 
Flink's Java Tuples and Scala's Tuples.
However, the {{Row}} type used for aggregations does support {{null}} fields. 

Have a look at how the {{DataSetValues}} class creates a {{DataSet}} for a 
given set of row values.
This technique and the {{ValuesInputFormat}} can be used to create a DataSet 
with a single row with all fields being {{null}}.

> Count/Sum 0 elements
> 
>
> Key: FLINK-4832
> URL: https://issues.apache.org/jira/browse/FLINK-4832
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Anton Mushin
>
> Currently, the Table API is unable to count or sum up 0 elements. We should 
> improve DataSet aggregations for this. Maybe by union the original DataSet 
> with a dummy record or by using a MapPartition function. Coming up with a 
> good design for this is also part of this issue.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2288: Feature/s3 a fix

2016-11-09 Thread cresny
Github user cresny commented on the issue:

https://github.com/apache/flink/pull/2288
  
@uce I finally got around to fixing this. Regarding your above comments:

- I think we got the "flattening" concern backwards. What I meant and 
wanted to avoid was to move all files into single directory. The calling code 
asks to simply upload the lib dir, and I think it should be copied structurally 
intact. I think I saw on the user list a complaint that properties files were 
not moved -- this should fix that. Or am I missing some other concern?

- The new commit 26c8511701d6b852a3c6f8b306f4b5da8e7b8479 now only modifies 
flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java

- I kept the check for file:// scheme for now just because I think it's 
safer to check than assume it's set down the call stack but I can hunt down and 
change those calls if you think it's best. 

Since this PR is pretty tortured, maybe I should create a new one with just 
the above commit?




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2776: [docs] Fix typos in Table API documentation

2016-11-09 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/2776
  
Good catches @wuchong!
+1 to merge


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Closed] (FLINK-5038) Errors in the "cancelTask" method prevent closeables from being closed early

2016-11-09 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-5038.
---

> Errors in the "cancelTask" method prevent closeables from being closed early
> 
>
> Key: FLINK-5038
> URL: https://issues.apache.org/jira/browse/FLINK-5038
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.1.3
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.2.0, 1.1.4
>
>
> The title says it all :-)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-5038) Errors in the "cancelTask" method prevent closeables from being closed early

2016-11-09 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-5038.
-
Resolution: Fixed

Fixed in
  - 1.2.0 via 616c4f5e483f0fd81ce2db05e911f01f15a0b583
  - 1.1.4 via 290f8a25fc4127b9734f45e782391506207748bc

> Errors in the "cancelTask" method prevent closeables from being closed early
> 
>
> Key: FLINK-5038
> URL: https://issues.apache.org/jira/browse/FLINK-5038
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.1.3
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.2.0, 1.1.4
>
>
> The title says it all :-)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5044) Converting operator and function state from Flink 1.1 for all changed operators in 1.2

2016-11-09 Thread Stefan Richter (JIRA)
Stefan Richter created FLINK-5044:
-

 Summary: Converting operator and function state from Flink 1.1 for 
all changed operators in 1.2
 Key: FLINK-5044
 URL: https://issues.apache.org/jira/browse/FLINK-5044
 Project: Flink
  Issue Type: Sub-task
  Components: Streaming, Windowing Operators
Affects Versions: 1.2.0
Reporter: Stefan Richter


Snapshot/restore mechanics for operators changed significantly between Flink 
1.1 and Flink 1.2. Furthermore, operators and their hierarchy also changed. We 
need to ensure that old operators can still restore state from their old 
version.

In particular, WindowOperator and KafkaConsumer are currently affected by this.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5043) Converting keyed state from Flink 1.1 backend implementations to their new counterparts in 1.2

2016-11-09 Thread Stefan Richter (JIRA)
Stefan Richter created FLINK-5043:
-

 Summary: Converting keyed state from Flink 1.1 backend 
implementations to their new counterparts in 1.2
 Key: FLINK-5043
 URL: https://issues.apache.org/jira/browse/FLINK-5043
 Project: Flink
  Issue Type: Sub-task
  Components: State Backends, Checkpointing
Affects Versions: 1.2.0
Reporter: Stefan Richter
Assignee: Stefan Richter


Keyed state backends became keygroup aware in Flink 1.2 and their hierarchy as 
a whole changed significantly. We need to implement a conversion so that old 
snapshots can be restored into new backends.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5042) Convert old savepoints to new savepoints

2016-11-09 Thread Stefan Richter (JIRA)
Stefan Richter created FLINK-5042:
-

 Summary: Convert old savepoints to new savepoints
 Key: FLINK-5042
 URL: https://issues.apache.org/jira/browse/FLINK-5042
 Project: Flink
  Issue Type: Sub-task
  Components: State Backends, Checkpointing
Affects Versions: 1.2.0
Reporter: Stefan Richter
Assignee: Stefan Richter


The format of savepoints and the hierarchy of state handles changed a lot 
between Flink 1.1 and 1.2. For backwards compatibility, we need to convert old 
to new savepoints.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5041) Implement savepoint backwards compatibility 1.1 -> 1.2

2016-11-09 Thread Stefan Richter (JIRA)
Stefan Richter created FLINK-5041:
-

 Summary: Implement savepoint backwards compatibility 1.1 -> 1.2
 Key: FLINK-5041
 URL: https://issues.apache.org/jira/browse/FLINK-5041
 Project: Flink
  Issue Type: New Feature
  Components: State Backends, Checkpointing
Affects Versions: 1.2.0
Reporter: Stefan Richter
Assignee: Stefan Richter


This issue tracks the implementation of backwards compatibility between Flink 
1.1 and 1.2 releases.

This task subsumes:

- Converting old savepoints to new savepoints, including a conversion of state 
handles to their new replacement.

- Converting keyed state from old backend implementations to their new 
counterparts.

- Converting operator and function state for all changed operators.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4946) Load jar files from subdirectories of lib

2016-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15651848#comment-15651848
 ] 

ASF GitHub Bot commented on FLINK-4946:
---

Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2708
  
Thanks @greghogan! The changes look good. 

+1 to merge


> Load jar files from subdirectories of lib
> -
>
> Key: FLINK-4946
> URL: https://issues.apache.org/jira/browse/FLINK-4946
> Project: Flink
>  Issue Type: Improvement
>  Components: Startup Shell Scripts
>Affects Versions: 1.2.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Minor
> Fix For: 1.2.0
>
>
> Users can more easily track Flink jars with transitive dependencies when 
> copied into subdirectories of {{lib}}. This is the arrangement of {{opt}} for 
> FLINK-4861.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2708: [FLINK-4946] [scripts] Load jar files from subdirectories...

2016-11-09 Thread mxm
Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2708
  
Thanks @greghogan! The changes look good. 

+1 to merge


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2254) Add Bipartite Graph Support for Gelly

2016-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15651812#comment-15651812
 ] 

ASF GitHub Bot commented on FLINK-2254:
---

Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/2564
  
Try switching to `ExecutionEnvironment.createCollectionsEnvironment()`.


> Add Bipartite Graph Support for Gelly
> -
>
> Key: FLINK-2254
> URL: https://issues.apache.org/jira/browse/FLINK-2254
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Affects Versions: 0.10.0
>Reporter: Andra Lungu
>Assignee: Ivan Mushketyk
>  Labels: requires-design-doc
>
> A bipartite graph is a graph for which the set of vertices can be divided 
> into two disjoint sets such that each edge having a source vertex in the 
> first set, will have a target vertex in the second set. We would like to 
> support efficient operations for this type of graphs along with a set of 
> metrics(http://jponnela.com/web_documents/twomode.pdf). 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2564: [FLINK-2254] Add BipartiateGraph class

2016-11-09 Thread greghogan
Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/2564
  
Try switching to `ExecutionEnvironment.createCollectionsEnvironment()`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4521) Fix "Submit new Job" panel in development mode

2016-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15651797#comment-15651797
 ] 

ASF GitHub Bot commented on FLINK-4521:
---

Github user mushketyk commented on the issue:

https://github.com/apache/flink/pull/2431
  
Hi @iampeter 
I've fixed the code according to your review.
Could you please review it again?

Best regards,
Ivan.


> Fix "Submit new Job" panel in development mode
> --
>
> Key: FLINK-4521
> URL: https://issues.apache.org/jira/browse/FLINK-4521
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Reporter: Ivan Mushketyk
>Assignee: Ivan Mushketyk
>
> If web frontend is started in the development mode, "Submit new Job" panel is 
> empty.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2431: [FLINK-4521][web frontend] Fix "Submit new Job" panel in ...

2016-11-09 Thread mushketyk
Github user mushketyk commented on the issue:

https://github.com/apache/flink/pull/2431
  
Hi @iampeter 
I've fixed the code according to your review.
Could you please review it again?

Best regards,
Ivan.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2254) Add Bipartite Graph Support for Gelly

2016-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15651760#comment-15651760
 ] 

ASF GitHub Bot commented on FLINK-2254:
---

Github user mushketyk commented on the issue:

https://github.com/apache/flink/pull/2564
  
New **gelly** tests failed with errors like:

> Caused by: java.io.IOException: Insufficient number of network buffers: 
required 32, but only 3 available. The total number of network buffers is 
currently set to 2048. You can increase this number by setting the 
configuration key 'taskmanager.network.numberOfBuffers'.

Do you know what is causing this error? Should I update the code somehow?


> Add Bipartite Graph Support for Gelly
> -
>
> Key: FLINK-2254
> URL: https://issues.apache.org/jira/browse/FLINK-2254
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Affects Versions: 0.10.0
>Reporter: Andra Lungu
>Assignee: Ivan Mushketyk
>  Labels: requires-design-doc
>
> A bipartite graph is a graph for which the set of vertices can be divided 
> into two disjoint sets such that each edge having a source vertex in the 
> first set, will have a target vertex in the second set. We would like to 
> support efficient operations for this type of graphs along with a set of 
> metrics(http://jponnela.com/web_documents/twomode.pdf). 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2564: [FLINK-2254] Add BipartiateGraph class

2016-11-09 Thread mushketyk
Github user mushketyk commented on the issue:

https://github.com/apache/flink/pull/2564
  
New **gelly** tests failed with errors like:

> Caused by: java.io.IOException: Insufficient number of network buffers: 
required 32, but only 3 available. The total number of network buffers is 
currently set to 2048. You can increase this number by setting the 
configuration key 'taskmanager.network.numberOfBuffers'.

Do you know what is causing this error? Should I update the code somehow?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4946) Load jar files from subdirectories of lib

2016-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15651634#comment-15651634
 ] 

ASF GitHub Bot commented on FLINK-4946:
---

Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/2708
  
@mxm I pushed a new commit that is working in YARN with recursive 
directories. The issue looks to have been that YARN was copying files 
recursively but the Java classpath can only contain simple wildcards `*` but 
not recursive wildcards `**`.


> Load jar files from subdirectories of lib
> -
>
> Key: FLINK-4946
> URL: https://issues.apache.org/jira/browse/FLINK-4946
> Project: Flink
>  Issue Type: Improvement
>  Components: Startup Shell Scripts
>Affects Versions: 1.2.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Minor
> Fix For: 1.2.0
>
>
> Users can more easily track Flink jars with transitive dependencies when 
> copied into subdirectories of {{lib}}. This is the arrangement of {{opt}} for 
> FLINK-4861.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2708: [FLINK-4946] [scripts] Load jar files from subdirectories...

2016-11-09 Thread greghogan
Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/2708
  
@mxm I pushed a new commit that is working in YARN with recursive 
directories. The issue looks to have been that YARN was copying files 
recursively but the Java classpath can only contain simple wildcards `*` but 
not recursive wildcards `**`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2778: [hotfix] fix duplicate "ms" time unit

2016-11-09 Thread NicoK
GitHub user NicoK opened a pull request:

https://github.com/apache/flink/pull/2778

[hotfix] fix duplicate "ms" time unit

as in "Restart with fixed delay (1 ms ms)." in the web interface under 
"Max. number of execution retries"
(org.apache.flink.api.common.time.Time already prints the time unit)

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/NicoK/flink hotfix_2016-11-09

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2778.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2778


commit 8b9865a2990aae77b38115bd27a6868ef00d8534
Author: Nico Kruber 
Date:   2016-11-09T16:54:43Z

[hotfix] fix duplicate "ms" time unit

For example: "Restart with fixed delay (1 ms ms)."
-> org.apache.flink.api.common.time.Time already prints the time unit.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-5040) Set correct input channel types with eager scheduling

2016-11-09 Thread Ufuk Celebi (JIRA)
Ufuk Celebi created FLINK-5040:
--

 Summary: Set correct input channel types with eager scheduling
 Key: FLINK-5040
 URL: https://issues.apache.org/jira/browse/FLINK-5040
 Project: Flink
  Issue Type: Bug
  Components: JobManager
Reporter: Ufuk Celebi
Assignee: Ufuk Celebi
 Fix For: 1.2.0, 1.1.4


When we do eager deployment all intermediate stream/partition locations are 
already known when scheduling an intermediate stream/partition consumer. 
Nonetheless we saw tasks with "unknown input channels" that were updated lazily 
during runtime. This was caused by a wrong producer execution state check 
requiring the producers to be in RUNNING or DEPLOYING state when creating 
consumer input channels.

(We had a bogus fix for this in FLINK-3232. With that "fix" we actually did not 
fix anything correctly and instead doubled the number of schedule or update 
consumer messages we sent.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3848) Add ProjectableTableSource interface and translation rule

2016-11-09 Thread Anton Solovev (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15651319#comment-15651319
 ] 

Anton Solovev commented on FLINK-3848:
--

So do we need to create an another ProjectableTableScan with Rule for it?

> Add ProjectableTableSource interface and translation rule
> -
>
> Key: FLINK-3848
> URL: https://issues.apache.org/jira/browse/FLINK-3848
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Anton Solovev
>
> Add a {{ProjectableTableSource}} interface for {{TableSource}} implementation 
> that support projection push-down.
> The interface could look as follows
> {code}
> def trait ProjectableTableSource {
>   def setProjection(fields: Array[String]): Unit
> }
> {code}
> In addition we need Calcite rules to push a projection into a TableScan that 
> refers to a {{ProjectableTableSource}}. We might need to tweak the cost model 
> as well to push the optimizer in the right direction.
> Moreover, the {{CsvTableSource}} could be extended to implement 
> {{ProjectableTableSource}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-3613) Add standard deviation, mean, variance to list of Aggregations

2016-11-09 Thread Anton Mushin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15651203#comment-15651203
 ] 

Anton Mushin edited comment on FLINK-3613 at 11/9/16 3:38 PM:
--

Hello everyone.
Do FLINK-4604 extends this issue? or is FLINK-4604 part of this issue for table 
API?


was (Author: anmu):
Hello everyone.
Do FLINK-4604 extends this issue? or is FLINK-4604 part of this issue for flink 
sql?

> Add standard deviation, mean, variance to list of Aggregations
> --
>
> Key: FLINK-3613
> URL: https://issues.apache.org/jira/browse/FLINK-3613
> Project: Flink
>  Issue Type: Improvement
>Reporter: Todd Lisonbee
>Priority: Minor
> Attachments: DataSet-Aggregation-Design-March2016-v1.txt
>
>
> Implement standard deviation, mean, variance for 
> org.apache.flink.api.java.aggregation.Aggregations
> Ideally implementation should be single pass and numerically stable.
> References:
> "Scalable and Numerically Stable Descriptive Statistics in SystemML", Tian et 
> al, International Conference on Data Engineering 2012
> http://dl.acm.org/citation.cfm?id=2310392
> "The Kahan summation algorithm (also known as compensated summation) reduces 
> the numerical errors that occur when adding a sequence of finite precision 
> floating point numbers. Numerical errors arise due to truncation and 
> rounding. These errors can lead to numerical instability when calculating 
> variance."
> https://en.wikipedia.org/wiki/Kahan_summation_algorithm



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3613) Add standard deviation, mean, variance to list of Aggregations

2016-11-09 Thread Anton Mushin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15651203#comment-15651203
 ] 

Anton Mushin commented on FLINK-3613:
-

Hello everyone.
Do FLINK-4604 extends this issue? or is FLINK-4604 part of this issue for flink 
sql?

> Add standard deviation, mean, variance to list of Aggregations
> --
>
> Key: FLINK-3613
> URL: https://issues.apache.org/jira/browse/FLINK-3613
> Project: Flink
>  Issue Type: Improvement
>Reporter: Todd Lisonbee
>Priority: Minor
> Attachments: DataSet-Aggregation-Design-March2016-v1.txt
>
>
> Implement standard deviation, mean, variance for 
> org.apache.flink.api.java.aggregation.Aggregations
> Ideally implementation should be single pass and numerically stable.
> References:
> "Scalable and Numerically Stable Descriptive Statistics in SystemML", Tian et 
> al, International Conference on Data Engineering 2012
> http://dl.acm.org/citation.cfm?id=2310392
> "The Kahan summation algorithm (also known as compensated summation) reduces 
> the numerical errors that occur when adding a sequence of finite precision 
> floating point numbers. Numerical errors arise due to truncation and 
> rounding. These errors can lead to numerical instability when calculating 
> variance."
> https://en.wikipedia.org/wiki/Kahan_summation_algorithm



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2542: [FLINK-4613] [ml] Extend ALS to handle implicit fe...

2016-11-09 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/2542#discussion_r87202604
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala
 ---
@@ -675,7 +756,69 @@ object ALS {
   collector.collect((blockID, array))
 }
   }
-}.withForwardedFieldsFirst("0").withForwardedFieldsSecond("0")
+}
+
+// broadcasting XtX matrix in the implicit case
+val updatedFactorMatrix = if (implicitPrefs) {
+  newMatrix.withBroadcastSet(XtXtoBroadcast.get, "XtX")
+} else {
+  newMatrix
+}
+
+
updatedFactorMatrix.withForwardedFieldsFirst("0").withForwardedFieldsSecond("0")
+  }
+
+  /**
+* Computes the XtX matrix for the implicit version before updating the 
factors.
+* This matrix is intended to be broadcast, but as we cannot use a sink 
inside a Flink
+* iteration, so we represent it as a [[DataSet]] with a single element 
containing the matrix.
+*
+* The algorithm computes `X_i^T * X_i` for every block `X_i` of `X`,
+* then sums all these computed matrices to get `X^T * X`.
+*/
+  private[recommendation] def computeXtX(x: DataSet[(Int, 
Array[Array[Double]])], factors: Int):
+  DataSet[Array[Double]] = {
+val triangleSize = factors * (factors - 1) / 2 + factors
+
+type MtxBlock = (Int, Array[Array[Double]])
+// construct XtX for all blocks
+val xtx = x
+  .mapPartition(new MapPartitionFunction[MtxBlock, Array[Double]]() {
+var xtxForBlock: Array[Double] = null
+
+override def mapPartition(blocks: Iterable[(Int, 
Array[Array[Double]])],
+  out: Collector[Array[Double]]): Unit = {
+
+  if (xtxForBlock == null) {
+// creating the matrix if not yet created
+xtxForBlock = Array.fill(triangleSize)(0.0)
+  } else {
+// erasing the matrix
+var i = 0
+while (i < xtxForBlock.length) {
--- End diff --

Any reason why `fill` is not/cannot be used here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2542: [FLINK-4613] [ml] Extend ALS to handle implicit fe...

2016-11-09 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/2542#discussion_r87199446
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala
 ---
@@ -273,6 +308,14 @@ object ALS {
 val defaultValue: Option[Int] = Some(10)
   }
 
+  case object ImplicitPrefs extends Parameter[Boolean] {
--- End diff --

Can't find a way to comment on line 264/299 but we should take the 
opportunity to set the default number of factors to a more reasonable 50, and 
add to the docstring and documentation the recommendation:

> we recommend working with the highest number of factors feasible within 
computational limitations.

Which comes straight from the iALS paper.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5037) Instability in AbstractUdfStreamOperatorLifecycleTest

2016-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15651166#comment-15651166
 ] 

ASF GitHub Bot commented on FLINK-5037:
---

Github user StefanRRichter closed the pull request at:

https://github.com/apache/flink/pull/2777


> Instability in AbstractUdfStreamOperatorLifecycleTest
> -
>
> Key: FLINK-5037
> URL: https://issues.apache.org/jira/browse/FLINK-5037
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Reporter: Aljoscha Krettek
>Assignee: Stefan Richter
> Fix For: 1.2.0
>
>
> I saw this failure: 
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/174340963/log.txt
> I suspect it has to do with the {{Thread.sleep()}} in Line 237. I think it 
> can be replaced by {{runStarted.await()}}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-5037) Instability in AbstractUdfStreamOperatorLifecycleTest

2016-11-09 Thread Stefan Richter (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5037?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter closed FLINK-5037.
-
   Resolution: Fixed
Fix Version/s: 1.2.0

> Instability in AbstractUdfStreamOperatorLifecycleTest
> -
>
> Key: FLINK-5037
> URL: https://issues.apache.org/jira/browse/FLINK-5037
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Reporter: Aljoscha Krettek
>Assignee: Stefan Richter
> Fix For: 1.2.0
>
>
> I saw this failure: 
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/174340963/log.txt
> I suspect it has to do with the {{Thread.sleep()}} in Line 237. I think it 
> can be replaced by {{runStarted.await()}}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2777: [FLINK-5037] Fixed instability in AbstractUdfStrea...

2016-11-09 Thread StefanRRichter
Github user StefanRRichter closed the pull request at:

https://github.com/apache/flink/pull/2777


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4613) Extend ALS to handle implicit feedback datasets

2016-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15651161#comment-15651161
 ] 

ASF GitHub Bot commented on FLINK-4613:
---

Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/2542#discussion_r87199446
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala
 ---
@@ -273,6 +308,14 @@ object ALS {
 val defaultValue: Option[Int] = Some(10)
   }
 
+  case object ImplicitPrefs extends Parameter[Boolean] {
--- End diff --

Can't find a way to comment on line 264/299 but we should take the 
opportunity to set the default number of factors to a more reasonable 50, and 
add to the docstring and documentation the recommendation:

> we recommend working with the highest number of factors feasible within 
computational limitations.

Which comes straight from the iALS paper.


> Extend ALS to handle implicit feedback datasets
> ---
>
> Key: FLINK-4613
> URL: https://issues.apache.org/jira/browse/FLINK-4613
> Project: Flink
>  Issue Type: New Feature
>  Components: Machine Learning Library
>Reporter: Gábor Hermann
>Assignee: Gábor Hermann
>
> The Alternating Least Squares implementation should be extended to handle 
> _implicit feedback_ datasets. These datasets do not contain explicit ratings 
> by users, they are rather built by collecting user behavior (e.g. user 
> listened to artist X for Y minutes), and they require a slightly different 
> optimization objective. See details by [Hu et 
> al|http://dx.doi.org/10.1109/ICDM.2008.22].
> We do not need to modify much in the original ALS algorithm. See [Spark ALS 
> implementation|https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala],
>  which could be a basis for this extension. Only the updating factor part is 
> modified, and most of the changes are in the local parts of the algorithm 
> (i.e. UDFs). In fact, the only modification that is not local, is 
> precomputing a matrix product Y^T * Y and broadcasting it to all the nodes, 
> which we can do with broadcast DataSets. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4613) Extend ALS to handle implicit feedback datasets

2016-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15651159#comment-15651159
 ] 

ASF GitHub Bot commented on FLINK-4613:
---

Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/2542#discussion_r87201508
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala
 ---
@@ -535,8 +581,17 @@ object ALS {
 itemOut: DataSet[(Int, OutBlockInformation)],
 userIn: DataSet[(Int, InBlockInformation)],
 factors: Int,
-lambda: Double, blockIDPartitioner: FlinkPartitioner[Int]):
+lambda: Double, blockIDPartitioner: FlinkPartitioner[Int],
+implicitPrefs: Boolean,
+alpha: Double):
   DataSet[(Int, Array[Array[Double]])] = {
+// retrieve broadcast XtX matrix in implicit case
+val XtXtoBroadcast = if (implicitPrefs) {
--- End diff --

I'm a bit confused with the notation here, is this matrix the `YtY ` matrix 
from the paper? If yes, I would recommend sticking to the notation of the paper 
to avoid confusion.


> Extend ALS to handle implicit feedback datasets
> ---
>
> Key: FLINK-4613
> URL: https://issues.apache.org/jira/browse/FLINK-4613
> Project: Flink
>  Issue Type: New Feature
>  Components: Machine Learning Library
>Reporter: Gábor Hermann
>Assignee: Gábor Hermann
>
> The Alternating Least Squares implementation should be extended to handle 
> _implicit feedback_ datasets. These datasets do not contain explicit ratings 
> by users, they are rather built by collecting user behavior (e.g. user 
> listened to artist X for Y minutes), and they require a slightly different 
> optimization objective. See details by [Hu et 
> al|http://dx.doi.org/10.1109/ICDM.2008.22].
> We do not need to modify much in the original ALS algorithm. See [Spark ALS 
> implementation|https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala],
>  which could be a basis for this extension. Only the updating factor part is 
> modified, and most of the changes are in the local parts of the algorithm 
> (i.e. UDFs). In fact, the only modification that is not local, is 
> precomputing a matrix product Y^T * Y and broadcasting it to all the nodes, 
> which we can do with broadcast DataSets. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4613) Extend ALS to handle implicit feedback datasets

2016-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15651160#comment-15651160
 ] 

ASF GitHub Bot commented on FLINK-4613:
---

Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/2542#discussion_r87202604
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala
 ---
@@ -675,7 +756,69 @@ object ALS {
   collector.collect((blockID, array))
 }
   }
-}.withForwardedFieldsFirst("0").withForwardedFieldsSecond("0")
+}
+
+// broadcasting XtX matrix in the implicit case
+val updatedFactorMatrix = if (implicitPrefs) {
+  newMatrix.withBroadcastSet(XtXtoBroadcast.get, "XtX")
+} else {
+  newMatrix
+}
+
+
updatedFactorMatrix.withForwardedFieldsFirst("0").withForwardedFieldsSecond("0")
+  }
+
+  /**
+* Computes the XtX matrix for the implicit version before updating the 
factors.
+* This matrix is intended to be broadcast, but as we cannot use a sink 
inside a Flink
+* iteration, so we represent it as a [[DataSet]] with a single element 
containing the matrix.
+*
+* The algorithm computes `X_i^T * X_i` for every block `X_i` of `X`,
+* then sums all these computed matrices to get `X^T * X`.
+*/
+  private[recommendation] def computeXtX(x: DataSet[(Int, 
Array[Array[Double]])], factors: Int):
+  DataSet[Array[Double]] = {
+val triangleSize = factors * (factors - 1) / 2 + factors
+
+type MtxBlock = (Int, Array[Array[Double]])
+// construct XtX for all blocks
+val xtx = x
+  .mapPartition(new MapPartitionFunction[MtxBlock, Array[Double]]() {
+var xtxForBlock: Array[Double] = null
+
+override def mapPartition(blocks: Iterable[(Int, 
Array[Array[Double]])],
+  out: Collector[Array[Double]]): Unit = {
+
+  if (xtxForBlock == null) {
+// creating the matrix if not yet created
+xtxForBlock = Array.fill(triangleSize)(0.0)
+  } else {
+// erasing the matrix
+var i = 0
+while (i < xtxForBlock.length) {
--- End diff --

Any reason why `fill` is not/cannot be used here?


> Extend ALS to handle implicit feedback datasets
> ---
>
> Key: FLINK-4613
> URL: https://issues.apache.org/jira/browse/FLINK-4613
> Project: Flink
>  Issue Type: New Feature
>  Components: Machine Learning Library
>Reporter: Gábor Hermann
>Assignee: Gábor Hermann
>
> The Alternating Least Squares implementation should be extended to handle 
> _implicit feedback_ datasets. These datasets do not contain explicit ratings 
> by users, they are rather built by collecting user behavior (e.g. user 
> listened to artist X for Y minutes), and they require a slightly different 
> optimization objective. See details by [Hu et 
> al|http://dx.doi.org/10.1109/ICDM.2008.22].
> We do not need to modify much in the original ALS algorithm. See [Spark ALS 
> implementation|https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala],
>  which could be a basis for this extension. Only the updating factor part is 
> modified, and most of the changes are in the local parts of the algorithm 
> (i.e. UDFs). In fact, the only modification that is not local, is 
> precomputing a matrix product Y^T * Y and broadcasting it to all the nodes, 
> which we can do with broadcast DataSets. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4613) Extend ALS to handle implicit feedback datasets

2016-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15651162#comment-15651162
 ] 

ASF GitHub Bot commented on FLINK-4613:
---

Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/2542#discussion_r87195483
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala
 ---
@@ -156,6 +171,26 @@ class ALS extends Predictor[ALS] {
 this
   }
 
+  /** Sets the input observations to be implicit, thus using the iALS 
algorithm for learning.
--- End diff --

The docstring is not worded correctly, as the passed argument could be true 
or false.

Should be prefixed with something like "When set to true, we assume 
implicit observations..."


> Extend ALS to handle implicit feedback datasets
> ---
>
> Key: FLINK-4613
> URL: https://issues.apache.org/jira/browse/FLINK-4613
> Project: Flink
>  Issue Type: New Feature
>  Components: Machine Learning Library
>Reporter: Gábor Hermann
>Assignee: Gábor Hermann
>
> The Alternating Least Squares implementation should be extended to handle 
> _implicit feedback_ datasets. These datasets do not contain explicit ratings 
> by users, they are rather built by collecting user behavior (e.g. user 
> listened to artist X for Y minutes), and they require a slightly different 
> optimization objective. See details by [Hu et 
> al|http://dx.doi.org/10.1109/ICDM.2008.22].
> We do not need to modify much in the original ALS algorithm. See [Spark ALS 
> implementation|https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala],
>  which could be a basis for this extension. Only the updating factor part is 
> modified, and most of the changes are in the local parts of the algorithm 
> (i.e. UDFs). In fact, the only modification that is not local, is 
> precomputing a matrix product Y^T * Y and broadcasting it to all the nodes, 
> which we can do with broadcast DataSets. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2542: [FLINK-4613] [ml] Extend ALS to handle implicit fe...

2016-11-09 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/2542#discussion_r87195483
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala
 ---
@@ -156,6 +171,26 @@ class ALS extends Predictor[ALS] {
 this
   }
 
+  /** Sets the input observations to be implicit, thus using the iALS 
algorithm for learning.
--- End diff --

The docstring is not worded correctly, as the passed argument could be true 
or false.

Should be prefixed with something like "When set to true, we assume 
implicit observations..."


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2542: [FLINK-4613] [ml] Extend ALS to handle implicit fe...

2016-11-09 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/2542#discussion_r87201508
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala
 ---
@@ -535,8 +581,17 @@ object ALS {
 itemOut: DataSet[(Int, OutBlockInformation)],
 userIn: DataSet[(Int, InBlockInformation)],
 factors: Int,
-lambda: Double, blockIDPartitioner: FlinkPartitioner[Int]):
+lambda: Double, blockIDPartitioner: FlinkPartitioner[Int],
+implicitPrefs: Boolean,
+alpha: Double):
   DataSet[(Int, Array[Array[Double]])] = {
+// retrieve broadcast XtX matrix in implicit case
+val XtXtoBroadcast = if (implicitPrefs) {
--- End diff --

I'm a bit confused with the notation here, is this matrix the `YtY ` matrix 
from the paper? If yes, I would recommend sticking to the notation of the paper 
to avoid confusion.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2777: [FLINK-5037] Fixed instability in AbstractUdfStreamOperat...

2016-11-09 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/2777
  
Thanks for the fix! 👍 Could you please close the issue and this PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5037) Instability in AbstractUdfStreamOperatorLifecycleTest

2016-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15651140#comment-15651140
 ] 

ASF GitHub Bot commented on FLINK-5037:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/2777
  
Thanks for the fix!  Could you please close the issue and this PR?


> Instability in AbstractUdfStreamOperatorLifecycleTest
> -
>
> Key: FLINK-5037
> URL: https://issues.apache.org/jira/browse/FLINK-5037
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Reporter: Aljoscha Krettek
>Assignee: Stefan Richter
>
> I saw this failure: 
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/174340963/log.txt
> I suspect it has to do with the {{Thread.sleep()}} in Line 237. I think it 
> can be replaced by {{runStarted.await()}}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2732: [FLINK-4272] Create a JobClient for job control an...

2016-11-09 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2732#discussion_r87201308
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/JobClient.java ---
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common;
+
+import java.util.Map;
+
+/*
+ * An Flink job client interface to interact with running Flink jobs.
+ */
+public interface JobClient {
+
+   /**
+* Gets the JobID associated with this JobClient.
+*/
+   JobID getJobID();
+
+   /**
+* Returns a boolean indicating whether the job execution has finished.
+*/
+   boolean hasFinished() throws Exception;
+
+   /**
+* Blocks until the result of the job execution is returned.
+*/
+   JobExecutionResult waitForResult() throws Exception;
+
+   /**
+* Gets the accumulator map of a running job.
+*/
+   Map getAccumulators() throws Exception;
+
+   /**
+* Cancels a running job.
+*/
+   void cancel() throws Exception;
+
+   /**
+* Stops a running job if the job supports stopping.
+*/
+   void stop() throws Exception;
+
+   /**
+* Adds a Runnable to this JobClient to be called
+* when the client is shut down. Runnables are called
+* in the order they are added.
+*/
+   void addFinalizer(Runnable finalizer) throws Exception;
+
+   /**
+* Runs finalization code to shutdown the client
+* and its dependencies.
+*/
+   void shutdown();
--- End diff --

Shutdown seems to be an internal implementation detail for the new 
finalizers. It should therefore not be in the public API, it event seems 
problematic to allow users to call it because it would prematurely call 
finalizers.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4272) Create a JobClient for job control and monitoring

2016-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4272?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15651106#comment-15651106
 ] 

ASF GitHub Bot commented on FLINK-4272:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2732#discussion_r87201308
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/JobClient.java ---
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common;
+
+import java.util.Map;
+
+/*
+ * An Flink job client interface to interact with running Flink jobs.
+ */
+public interface JobClient {
+
+   /**
+* Gets the JobID associated with this JobClient.
+*/
+   JobID getJobID();
+
+   /**
+* Returns a boolean indicating whether the job execution has finished.
+*/
+   boolean hasFinished() throws Exception;
+
+   /**
+* Blocks until the result of the job execution is returned.
+*/
+   JobExecutionResult waitForResult() throws Exception;
+
+   /**
+* Gets the accumulator map of a running job.
+*/
+   Map getAccumulators() throws Exception;
+
+   /**
+* Cancels a running job.
+*/
+   void cancel() throws Exception;
+
+   /**
+* Stops a running job if the job supports stopping.
+*/
+   void stop() throws Exception;
+
+   /**
+* Adds a Runnable to this JobClient to be called
+* when the client is shut down. Runnables are called
+* in the order they are added.
+*/
+   void addFinalizer(Runnable finalizer) throws Exception;
+
+   /**
+* Runs finalization code to shutdown the client
+* and its dependencies.
+*/
+   void shutdown();
--- End diff --

Shutdown seems to be an internal implementation detail for the new 
finalizers. It should therefore not be in the public API, it event seems 
problematic to allow users to call it because it would prematurely call 
finalizers.


> Create a JobClient for job control and monitoring 
> --
>
> Key: FLINK-4272
> URL: https://issues.apache.org/jira/browse/FLINK-4272
> Project: Flink
>  Issue Type: New Feature
>  Components: Client
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Minor
> Fix For: 1.2.0
>
>
> The aim of this new features is to expose a client to the user which allows 
> to cancel a running job, retrieve accumulators for a running job, or perform 
> other actions in the future. Let's call it {{JobClient}} for now (although 
> this clashes with the existing JobClient class which could be renamed to 
> JobClientActorUtils instead).
> The new client should be returned from the {{ClusterClient}} class upon job 
> submission. The client should also be instantiatable by the users to retrieve 
> the JobClient with a JobID.
> We should expose the new JobClient to the Java and Scala APIs using a new 
> method on the {{ExecutionEnvironment}} / {{StreamExecutionEnvironment}} 
> called {{executeWithControl()}} (perhaps we can find a better name).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2729: [FLINK-4883]Prevent UDFs implementations through S...

2016-11-09 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/2729#discussion_r87201177
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/ScalaObjectChecker.java ---
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.InvalidProgramException;
+
+/**
+ * Scala Object checker tries to verify if a class is implemented by
+ * Scala Object
+ */
+@Internal
+public class ScalaObjectChecker {
+   public static boolean isPotentialScalaObject(Object o) {
--- End diff --

I think there might be a nicer/idiomatic way of doing this check through:
```
def isSingleton[A](a: A)(implicit ev: A <:< Singleton = null) = 
  Option(ev).isDefined
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2729: [FLINK-4883]Prevent UDFs implementations through S...

2016-11-09 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/2729#discussion_r87202166
  
--- Diff: flink-java/src/main/java/org/apache/flink/api/java/DataSet.java 
---
@@ -181,7 +181,17 @@ protected void fillInType(TypeInformation typeInfo) 
{
return this.type;
}
 
+   /**
+ *  1. Check if the function is implemented by a scala object. Checks 
only if scala object function forbidden
+ *is not disabled in the 
[[org.apache.flink.api.common.ExecutionConfig]]
+ *
+ *  2. Returns a "closure-cleaned" version of the given function. 
Cleans only if closure cleaning
+ * is not disabled in the 
[[org.apache.flink.api.common.ExecutionConfig]]
+ */
public  F clean(F f) {
--- End diff --

I suggest to move this check in a separate method like:

```public  F checkNotSingleton(F f)```

One method should typically have one single duty, and the duty of a 
cleaning function should not be checking for singleton objects. Both functions 
could then be chained however into `clean(check(f))`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4883) Prevent UDFs implementations through Scala singleton objects

2016-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4883?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15651101#comment-15651101
 ] 

ASF GitHub Bot commented on FLINK-4883:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/2729#discussion_r87201177
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/ScalaObjectChecker.java ---
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.InvalidProgramException;
+
+/**
+ * Scala Object checker tries to verify if a class is implemented by
+ * Scala Object
+ */
+@Internal
+public class ScalaObjectChecker {
+   public static boolean isPotentialScalaObject(Object o) {
--- End diff --

I think there might be a nicer/idiomatic way of doing this check through:
```
def isSingleton[A](a: A)(implicit ev: A <:< Singleton = null) = 
  Option(ev).isDefined
```


> Prevent UDFs implementations through Scala singleton objects
> 
>
> Key: FLINK-4883
> URL: https://issues.apache.org/jira/browse/FLINK-4883
> Project: Flink
>  Issue Type: Bug
>Reporter: Stefan Richter
>Assignee: Renkai Ge
>
> Currently, user can create and use UDFs in Scala like this:
> {code}
> object FlatMapper extends RichCoFlatMapFunction[Long, String, (Long, String)] 
> {
> ...
> }
> {code}
> However, this leads to problems as the UDF is now a singleton that Flink 
> could use across several operator instances, which leads to job failures. We 
> should detect and prevent the usage of singleton UDFs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4883) Prevent UDFs implementations through Scala singleton objects

2016-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4883?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15651100#comment-15651100
 ] 

ASF GitHub Bot commented on FLINK-4883:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/2729#discussion_r87202166
  
--- Diff: flink-java/src/main/java/org/apache/flink/api/java/DataSet.java 
---
@@ -181,7 +181,17 @@ protected void fillInType(TypeInformation typeInfo) 
{
return this.type;
}
 
+   /**
+ *  1. Check if the function is implemented by a scala object. Checks 
only if scala object function forbidden
+ *is not disabled in the 
[[org.apache.flink.api.common.ExecutionConfig]]
+ *
+ *  2. Returns a "closure-cleaned" version of the given function. 
Cleans only if closure cleaning
+ * is not disabled in the 
[[org.apache.flink.api.common.ExecutionConfig]]
+ */
public  F clean(F f) {
--- End diff --

I suggest to move this check in a separate method like:

```public  F checkNotSingleton(F f)```

One method should typically have one single duty, and the duty of a 
cleaning function should not be checking for singleton objects. Both functions 
could then be chained however into `clean(check(f))`.


> Prevent UDFs implementations through Scala singleton objects
> 
>
> Key: FLINK-4883
> URL: https://issues.apache.org/jira/browse/FLINK-4883
> Project: Flink
>  Issue Type: Bug
>Reporter: Stefan Richter
>Assignee: Renkai Ge
>
> Currently, user can create and use UDFs in Scala like this:
> {code}
> object FlatMapper extends RichCoFlatMapFunction[Long, String, (Long, String)] 
> {
> ...
> }
> {code}
> However, this leads to problems as the UDF is now a singleton that Flink 
> could use across several operator instances, which leads to job failures. We 
> should detect and prevent the usage of singleton UDFs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2732: [FLINK-4272] Create a JobClient for job control an...

2016-11-09 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2732#discussion_r87050071
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java ---
@@ -207,14 +207,23 @@ public JobExecutionResult 
executePlanWithJars(JobWithJars program) throws Except
shutDownAtEnd = false;
}
 
-   try {
-   return client.run(program, 
defaultParallelism).getJobExecutionResult();
-   }
-   finally {
-   if (shutDownAtEnd) {
-   stop();
-   }
-   }
+   final JobClient jobClient = client.run(program, 
defaultParallelism);
+
+   jobClient.addFinalizer(
+   new Runnable() {
+   @Override
+   public void run() {
+   if (shutDownAtEnd) {
+   try {
+   stop();
+   } catch (Exception e) {
+   throw new 
RuntimeException("Failed to clean up.", e);
--- End diff --

Same here with the exception. I think it is not a good practice to 
masquerade checked exceptions as unchecked exceptions, because it makes it 
violates the contract defined by the `Runnable` interface.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2732: [FLINK-4272] Create a JobClient for job control an...

2016-11-09 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2732#discussion_r87052872
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/JobClientEager.java 
---
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.client.program;
+
+import org.apache.flink.api.common.JobClient;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.accumulators.AccumulatorHelper;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.client.JobClientActorUtils;
+import org.apache.flink.runtime.client.JobClientActor;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.client.JobListeningContext;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import 
org.apache.flink.runtime.messages.accumulators.AccumulatorResultsErroneous;
+import 
org.apache.flink.runtime.messages.accumulators.AccumulatorResultsFound;
+import 
org.apache.flink.runtime.messages.accumulators.RequestAccumulatorResults;
+import org.apache.flink.util.SerializedValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A client to interact with a running Flink job.
+ */
+public class JobClientEager implements JobClient {
+
+   private final Logger LOG = LoggerFactory.getLogger(getClass());
+
+   /** The Job's listening context for monitoring and job interaction */
+   private final JobListeningContext jobListeningContext;
+
+   /** Finalization code to run upon shutting down the JobClient */
+   private final List finalizers;
+
+   public JobClientEager(JobListeningContext jobListeningContext) {
+   this.jobListeningContext = jobListeningContext;
+   this.finalizers = new LinkedList<>();
+   }
+
+   /**
+* Blocks until the job finishes and returns the {@link 
JobExecutionResult}
+* @return the result of the job execution
+*/
+   @Override
+   public JobExecutionResult waitForResult() throws JobExecutionException {
+   LOG.info("Waiting for results of Job {}", 
jobListeningContext.getJobID());
+   JobExecutionResult result = 
JobClientActorUtils.awaitJobResult(jobListeningContext);
+   shutdown();
+   return result;
+   }
+
+   /**
+* Gets the job id that this client is bound to
+* @return The JobID of this JobClient
+*/
+   public JobID getJobID() {
+   return jobListeningContext.getJobID();
+   }
+
+   @Override
+   public boolean hasFinished() {
+   return jobListeningContext.getJobResultFuture().isCompleted();
+   }
+
+   /**
+* Cancels a job identified by the job id.
+* @throws Exception In case an error occurred.
+*/
+   @Override
+   public void cancel() throws Exception {
+   final ActorGateway jobClient = 
jobListeningContext.getJobClientGateway();
+
+   final Future response;
+   try {
+   response = jobClient.ask(
+   new JobClientActor.ClientMessage(
+   new 
JobManagerMessages.CancelJob(getJobID())),
+   AkkaUtils.getDefaultTimeoutAsFiniteDuration());
+   } catch (final Exception e) {
+   throw new ProgramInvocationException("Failed to query 
the job manager gateway.", e);
--- End diff --

Why is this a `ProgramInvocationException`? Should be rather something like 
a `JobClientOperationException`.


---
If your project is set up for it, you 

[GitHub] flink pull request #2732: [FLINK-4272] Create a JobClient for job control an...

2016-11-09 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2732#discussion_r87053310
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/JobClientEager.java 
---
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.client.program;
+
+import org.apache.flink.api.common.JobClient;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.accumulators.AccumulatorHelper;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.client.JobClientActorUtils;
+import org.apache.flink.runtime.client.JobClientActor;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.client.JobListeningContext;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import 
org.apache.flink.runtime.messages.accumulators.AccumulatorResultsErroneous;
+import 
org.apache.flink.runtime.messages.accumulators.AccumulatorResultsFound;
+import 
org.apache.flink.runtime.messages.accumulators.RequestAccumulatorResults;
+import org.apache.flink.util.SerializedValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A client to interact with a running Flink job.
+ */
+public class JobClientEager implements JobClient {
+
+   private final Logger LOG = LoggerFactory.getLogger(getClass());
+
+   /** The Job's listening context for monitoring and job interaction */
+   private final JobListeningContext jobListeningContext;
+
+   /** Finalization code to run upon shutting down the JobClient */
+   private final List finalizers;
+
+   public JobClientEager(JobListeningContext jobListeningContext) {
+   this.jobListeningContext = jobListeningContext;
+   this.finalizers = new LinkedList<>();
+   }
+
+   /**
+* Blocks until the job finishes and returns the {@link 
JobExecutionResult}
+* @return the result of the job execution
+*/
+   @Override
+   public JobExecutionResult waitForResult() throws JobExecutionException {
+   LOG.info("Waiting for results of Job {}", 
jobListeningContext.getJobID());
+   JobExecutionResult result = 
JobClientActorUtils.awaitJobResult(jobListeningContext);
+   shutdown();
+   return result;
+   }
+
+   /**
+* Gets the job id that this client is bound to
+* @return The JobID of this JobClient
+*/
+   public JobID getJobID() {
+   return jobListeningContext.getJobID();
+   }
+
+   @Override
+   public boolean hasFinished() {
+   return jobListeningContext.getJobResultFuture().isCompleted();
+   }
+
+   /**
+* Cancels a job identified by the job id.
+* @throws Exception In case an error occurred.
+*/
+   @Override
+   public void cancel() throws Exception {
+   final ActorGateway jobClient = 
jobListeningContext.getJobClientGateway();
+
+   final Future response;
+   try {
+   response = jobClient.ask(
+   new JobClientActor.ClientMessage(
+   new 
JobManagerMessages.CancelJob(getJobID())),
+   AkkaUtils.getDefaultTimeoutAsFiniteDuration());
+   } catch (final Exception e) {
+   throw new ProgramInvocationException("Failed to query 
the job manager gateway.", e);
+   }
+
+   final Object result = Await.result(response, 
AkkaUtils.getDefaultTimeoutAsFiniteDuration());
+
+   if (result instanceof 

[GitHub] flink pull request #2732: [FLINK-4272] Create a JobClient for job control an...

2016-11-09 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2732#discussion_r87176485
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/JobClientEager.java 
---
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.client.program;
+
+import org.apache.flink.api.common.JobClient;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.accumulators.AccumulatorHelper;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.client.JobClientActorUtils;
+import org.apache.flink.runtime.client.JobClientActor;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.client.JobListeningContext;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import 
org.apache.flink.runtime.messages.accumulators.AccumulatorResultsErroneous;
+import 
org.apache.flink.runtime.messages.accumulators.AccumulatorResultsFound;
+import 
org.apache.flink.runtime.messages.accumulators.RequestAccumulatorResults;
+import org.apache.flink.util.SerializedValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A client to interact with a running Flink job.
+ */
+public class JobClientEager implements JobClient {
+
+   private final Logger LOG = LoggerFactory.getLogger(getClass());
+
+   /** The Job's listening context for monitoring and job interaction */
+   private final JobListeningContext jobListeningContext;
+
+   /** Finalization code to run upon shutting down the JobClient */
+   private final List finalizers;
+
+   public JobClientEager(JobListeningContext jobListeningContext) {
+   this.jobListeningContext = jobListeningContext;
+   this.finalizers = new LinkedList<>();
+   }
+
+   /**
+* Blocks until the job finishes and returns the {@link 
JobExecutionResult}
+* @return the result of the job execution
+*/
+   @Override
+   public JobExecutionResult waitForResult() throws JobExecutionException {
+   LOG.info("Waiting for results of Job {}", 
jobListeningContext.getJobID());
--- End diff --

Typo: "results of job {}"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2732: [FLINK-4272] Create a JobClient for job control an...

2016-11-09 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2732#discussion_r87054735
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/JobClientEager.java 
---
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.client.program;
+
+import org.apache.flink.api.common.JobClient;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.accumulators.AccumulatorHelper;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.client.JobClientActorUtils;
+import org.apache.flink.runtime.client.JobClientActor;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.client.JobListeningContext;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import 
org.apache.flink.runtime.messages.accumulators.AccumulatorResultsErroneous;
+import 
org.apache.flink.runtime.messages.accumulators.AccumulatorResultsFound;
+import 
org.apache.flink.runtime.messages.accumulators.RequestAccumulatorResults;
+import org.apache.flink.util.SerializedValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A client to interact with a running Flink job.
+ */
+public class JobClientEager implements JobClient {
+
+   private final Logger LOG = LoggerFactory.getLogger(getClass());
+
+   /** The Job's listening context for monitoring and job interaction */
+   private final JobListeningContext jobListeningContext;
+
+   /** Finalization code to run upon shutting down the JobClient */
+   private final List finalizers;
+
+   public JobClientEager(JobListeningContext jobListeningContext) {
+   this.jobListeningContext = jobListeningContext;
+   this.finalizers = new LinkedList<>();
--- End diff --

Not sure if we have to use a `LinkedList` here. For many cases `ArrayList` 
is faster (even though this might not make a big difference here).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2732: [FLINK-4272] Create a JobClient for job control an...

2016-11-09 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2732#discussion_r87175803
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/JobClient.java ---
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common;
+
+import java.util.Map;
+
+/*
--- End diff --

No JavaDoc comment


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2732: [FLINK-4272] Create a JobClient for job control an...

2016-11-09 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2732#discussion_r87098055
  
--- Diff: 
flink-clients/src/test/java/org/apache/flink/client/program/JobClientTest.java 
---
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.client.program;
+
+import akka.dispatch.Futures;
+import org.apache.flink.api.common.JobClient;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.client.JobListeningContext;
+import org.apache.flink.runtime.client.SerializedJobExecutionResult;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.util.SerializedValue;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import scala.concurrent.Promise;
+
+import java.util.Collections;
+
+
+/**
+ * Tests the JobClient implementations.
+ *
+ * See also: JobRetrievalITCase
+ */
+public class JobClientTest {
+
+   private static boolean finalizeCalled;
+
+   private JobListeningContext listeningContext;
+   private JobID jobID;
+   private JobManagerMessages.JobResultSuccess successMessage;
+
+   private Runnable finalizer = new Runnable() {
+   @Override
+   public void run() {
+   finalizeCalled = true;
+   }
+   };
+
+   private Promise resultPromise;
+
+   @Before
+   public void beforeTest() throws Exception {
+   finalizeCalled = false;
+
+   this.jobID = JobID.generate();
+   this.listeningContext = Mockito.mock(JobListeningContext.class);
+   this.resultPromise = Futures.promise();
+   ActorGateway mockActorClientGateway = 
Mockito.mock(ActorGateway.class);
+   Mockito.when(listeningContext.getJobID()).thenReturn(jobID);
+   
Mockito.when(listeningContext.getJobClientGateway()).thenReturn(mockActorClientGateway);
+   
Mockito.when(listeningContext.getJobResultFuture()).thenReturn(resultPromise.future());
+   
Mockito.when(listeningContext.getClassLoader()).thenReturn(JobClientTest.class.getClassLoader());
+
+   this.successMessage = new JobManagerMessages.JobResultSuccess(
+   new SerializedJobExecutionResult(
+   jobID,
+   42,
+   Collections.singletonMap("key", new 
SerializedValue("value";
+   }
+
+   @Test(timeout = 1)
+   public void testEagerJobClient() throws Exception {
+
+   JobClient jobClient = new JobClientEager(listeningContext);
+
+   jobClient.addFinalizer(finalizer);
+
+   Assert.assertFalse(jobClient.hasFinished());
+
+   resultPromise.success(successMessage);
+
+   Assert.assertTrue(jobClient.hasFinished());
+
+   JobExecutionResult retrievedResult = jobClient.waitForResult();
+   Assert.assertNotNull(retrievedResult);
+
+   Assert.assertEquals(jobID, retrievedResult.getJobID());
+   Assert.assertEquals(42, retrievedResult.getNetRuntime());
+   Assert.assertEquals(1, 
retrievedResult.getAllAccumulatorResults().size());
+   Assert.assertEquals("value", 
retrievedResult.getAllAccumulatorResults().get("key"));
+
+   jobClient.shutdown();
+   Assert.assertTrue(finalizeCalled);
+
+   finalizeCalled = false;
+   jobClient.shutdown();
+   Assert.assertFalse(finalizeCalled);
+   }
+
+   @Test(timeout = 1)
+   public void testLazyJobClient() throws Exception {
+
+   ClusterClient mockedClusterClient = 
Mockito.mock(ClusterClient.class);
+   

[GitHub] flink pull request #2732: [FLINK-4272] Create a JobClient for job control an...

2016-11-09 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2732#discussion_r87052197
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/JobClientEager.java 
---
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.client.program;
+
+import org.apache.flink.api.common.JobClient;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.accumulators.AccumulatorHelper;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.client.JobClientActorUtils;
+import org.apache.flink.runtime.client.JobClientActor;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.client.JobListeningContext;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import 
org.apache.flink.runtime.messages.accumulators.AccumulatorResultsErroneous;
+import 
org.apache.flink.runtime.messages.accumulators.AccumulatorResultsFound;
+import 
org.apache.flink.runtime.messages.accumulators.RequestAccumulatorResults;
+import org.apache.flink.util.SerializedValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A client to interact with a running Flink job.
+ */
+public class JobClientEager implements JobClient {
+
+   private final Logger LOG = LoggerFactory.getLogger(getClass());
--- End diff --

Why not private static? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4272) Create a JobClient for job control and monitoring

2016-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4272?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15650981#comment-15650981
 ] 

ASF GitHub Bot commented on FLINK-4272:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2732#discussion_r87053575
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/JobClientEager.java 
---
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.client.program;
+
+import org.apache.flink.api.common.JobClient;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.accumulators.AccumulatorHelper;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.client.JobClientActorUtils;
+import org.apache.flink.runtime.client.JobClientActor;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.client.JobListeningContext;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import 
org.apache.flink.runtime.messages.accumulators.AccumulatorResultsErroneous;
+import 
org.apache.flink.runtime.messages.accumulators.AccumulatorResultsFound;
+import 
org.apache.flink.runtime.messages.accumulators.RequestAccumulatorResults;
+import org.apache.flink.util.SerializedValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A client to interact with a running Flink job.
+ */
+public class JobClientEager implements JobClient {
+
+   private final Logger LOG = LoggerFactory.getLogger(getClass());
+
+   /** The Job's listening context for monitoring and job interaction */
+   private final JobListeningContext jobListeningContext;
+
+   /** Finalization code to run upon shutting down the JobClient */
+   private final List finalizers;
+
+   public JobClientEager(JobListeningContext jobListeningContext) {
+   this.jobListeningContext = jobListeningContext;
+   this.finalizers = new LinkedList<>();
+   }
+
+   /**
+* Blocks until the job finishes and returns the {@link 
JobExecutionResult}
+* @return the result of the job execution
+*/
+   @Override
+   public JobExecutionResult waitForResult() throws JobExecutionException {
+   LOG.info("Waiting for results of Job {}", 
jobListeningContext.getJobID());
+   JobExecutionResult result = 
JobClientActorUtils.awaitJobResult(jobListeningContext);
+   shutdown();
+   return result;
+   }
+
+   /**
+* Gets the job id that this client is bound to
+* @return The JobID of this JobClient
+*/
+   public JobID getJobID() {
+   return jobListeningContext.getJobID();
+   }
+
+   @Override
+   public boolean hasFinished() {
+   return jobListeningContext.getJobResultFuture().isCompleted();
+   }
+
+   /**
+* Cancels a job identified by the job id.
+* @throws Exception In case an error occurred.
+*/
+   @Override
+   public void cancel() throws Exception {
+   final ActorGateway jobClient = 
jobListeningContext.getJobClientGateway();
+
+   final Future response;
+   try {
+   response = jobClient.ask(
+   new JobClientActor.ClientMessage(
+   new 
JobManagerMessages.CancelJob(getJobID())),
+   AkkaUtils.getDefaultTimeoutAsFiniteDuration());
+   } catch (final Exception e) {
+   throw new 

[GitHub] flink pull request #2732: [FLINK-4272] Create a JobClient for job control an...

2016-11-09 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2732#discussion_r87052088
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/JobClientEager.java 
---
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.client.program;
+
+import org.apache.flink.api.common.JobClient;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.accumulators.AccumulatorHelper;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.client.JobClientActorUtils;
+import org.apache.flink.runtime.client.JobClientActor;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.client.JobListeningContext;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import 
org.apache.flink.runtime.messages.accumulators.AccumulatorResultsErroneous;
+import 
org.apache.flink.runtime.messages.accumulators.AccumulatorResultsFound;
+import 
org.apache.flink.runtime.messages.accumulators.RequestAccumulatorResults;
+import org.apache.flink.util.SerializedValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A client to interact with a running Flink job.
--- End diff --

What is eager about this `JobClient`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2732: [FLINK-4272] Create a JobClient for job control an...

2016-11-09 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2732#discussion_r87098367
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/JobClient.java ---
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common;
+
+import java.util.Map;
+
+/*
+ * An Flink job client interface to interact with running Flink jobs.
+ */
+public interface JobClient {
+
+   /**
+* Gets the JobID associated with this JobClient.
+*/
+   JobID getJobID();
+
+   /**
+* Returns a boolean indicating whether the job execution has finished.
+*/
+   boolean hasFinished() throws Exception;
--- End diff --

Should we maybe throw a `JobClientException` instead of the more general 
`Exception` here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4272) Create a JobClient for job control and monitoring

2016-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4272?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15650979#comment-15650979
 ] 

ASF GitHub Bot commented on FLINK-4272:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2732#discussion_r87049129
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java ---
@@ -172,29 +174,40 @@ public JobExecutionResult executePlan(Plan plan) 
throws Exception {
 
// start the cluster for us
start();
-   }
-   else {
+   } else {
// we use the existing session
shutDownAtEnd = false;
}
 
-   try {
-   Configuration configuration = 
this.flink.configuration();
+   Configuration configuration = 
this.flink.configuration();
 
-   Optimizer pc = new Optimizer(new 
DataStatistics(), configuration);
-   OptimizedPlan op = pc.compile(plan);
+   Optimizer pc = new Optimizer(new DataStatistics(), 
configuration);
+   OptimizedPlan op = pc.compile(plan);
 
-   JobGraphGenerator jgg = new 
JobGraphGenerator(configuration);
-   JobGraph jobGraph = jgg.compileJobGraph(op, 
plan.getJobId());
+   JobGraphGenerator jgg = new 
JobGraphGenerator(configuration);
+   JobGraph jobGraph = jgg.compileJobGraph(op, 
plan.getJobId());
 
-   boolean sysoutPrint = 
isPrintingStatusDuringExecution();
-   return flink.submitJobAndWait(jobGraph, 
sysoutPrint);
-   }
-   finally {
-   if (shutDownAtEnd) {
-   stop();
+   boolean sysoutPrint = isPrintingStatusDuringExecution();
+
+
+   JobListeningContext jobListeningContext = 
flink.submitJob(jobGraph, sysoutPrint);
+   JobClientEager jobClient = new 
JobClientEager(jobListeningContext);
+
+   Runnable cleanup = new Runnable() {
+   @Override
+   public void run() {
+   if (shutDownAtEnd) {
--- End diff --

Can't we move this if condition out of the runnable and only add the clean 
up runnable if `shutDownAtEnd == true`?


> Create a JobClient for job control and monitoring 
> --
>
> Key: FLINK-4272
> URL: https://issues.apache.org/jira/browse/FLINK-4272
> Project: Flink
>  Issue Type: New Feature
>  Components: Client
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Minor
> Fix For: 1.2.0
>
>
> The aim of this new features is to expose a client to the user which allows 
> to cancel a running job, retrieve accumulators for a running job, or perform 
> other actions in the future. Let's call it {{JobClient}} for now (although 
> this clashes with the existing JobClient class which could be renamed to 
> JobClientActorUtils instead).
> The new client should be returned from the {{ClusterClient}} class upon job 
> submission. The client should also be instantiatable by the users to retrieve 
> the JobClient with a JobID.
> We should expose the new JobClient to the Java and Scala APIs using a new 
> method on the {{ExecutionEnvironment}} / {{StreamExecutionEnvironment}} 
> called {{executeWithControl()}} (perhaps we can find a better name).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4272) Create a JobClient for job control and monitoring

2016-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4272?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15651003#comment-15651003
 ] 

ASF GitHub Bot commented on FLINK-4272:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2732#discussion_r87053672
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/JobClientEager.java 
---
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.client.program;
+
+import org.apache.flink.api.common.JobClient;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.accumulators.AccumulatorHelper;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.client.JobClientActorUtils;
+import org.apache.flink.runtime.client.JobClientActor;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.client.JobListeningContext;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import 
org.apache.flink.runtime.messages.accumulators.AccumulatorResultsErroneous;
+import 
org.apache.flink.runtime.messages.accumulators.AccumulatorResultsFound;
+import 
org.apache.flink.runtime.messages.accumulators.RequestAccumulatorResults;
+import org.apache.flink.util.SerializedValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A client to interact with a running Flink job.
+ */
+public class JobClientEager implements JobClient {
+
+   private final Logger LOG = LoggerFactory.getLogger(getClass());
+
+   /** The Job's listening context for monitoring and job interaction */
+   private final JobListeningContext jobListeningContext;
+
+   /** Finalization code to run upon shutting down the JobClient */
+   private final List finalizers;
+
+   public JobClientEager(JobListeningContext jobListeningContext) {
+   this.jobListeningContext = jobListeningContext;
+   this.finalizers = new LinkedList<>();
+   }
+
+   /**
+* Blocks until the job finishes and returns the {@link 
JobExecutionResult}
+* @return the result of the job execution
+*/
+   @Override
+   public JobExecutionResult waitForResult() throws JobExecutionException {
+   LOG.info("Waiting for results of Job {}", 
jobListeningContext.getJobID());
+   JobExecutionResult result = 
JobClientActorUtils.awaitJobResult(jobListeningContext);
+   shutdown();
+   return result;
+   }
+
+   /**
+* Gets the job id that this client is bound to
+* @return The JobID of this JobClient
+*/
+   public JobID getJobID() {
+   return jobListeningContext.getJobID();
+   }
+
+   @Override
+   public boolean hasFinished() {
+   return jobListeningContext.getJobResultFuture().isCompleted();
+   }
+
+   /**
+* Cancels a job identified by the job id.
+* @throws Exception In case an error occurred.
+*/
+   @Override
+   public void cancel() throws Exception {
+   final ActorGateway jobClient = 
jobListeningContext.getJobClientGateway();
+
+   final Future response;
+   try {
+   response = jobClient.ask(
+   new JobClientActor.ClientMessage(
+   new 
JobManagerMessages.CancelJob(getJobID())),
+   AkkaUtils.getDefaultTimeoutAsFiniteDuration());
+   } catch (final Exception e) {
+   throw new 

[GitHub] flink pull request #2732: [FLINK-4272] Create a JobClient for job control an...

2016-11-09 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2732#discussion_r87098937
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java ---
@@ -911,6 +912,24 @@ public JobExecutionResult execute() throws Exception {
public abstract JobExecutionResult execute(String jobName) throws 
Exception;
 
/**
+* Triggers the program execution, just like {@code execute()} but does 
not block.
+* Instead, it returns a JobClient which can be used to interact with 
the running job.
+* @return A JobClient for job interaction.
+* @throws Exception Thrown if the program submission fails.
+*/
+   public JobClient executeWithControl() throws Exception {
--- End diff --

I think we should make these methods `PublicEvolving`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4272) Create a JobClient for job control and monitoring

2016-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4272?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15650995#comment-15650995
 ] 

ASF GitHub Bot commented on FLINK-4272:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2732#discussion_r87052872
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/JobClientEager.java 
---
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.client.program;
+
+import org.apache.flink.api.common.JobClient;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.accumulators.AccumulatorHelper;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.client.JobClientActorUtils;
+import org.apache.flink.runtime.client.JobClientActor;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.client.JobListeningContext;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import 
org.apache.flink.runtime.messages.accumulators.AccumulatorResultsErroneous;
+import 
org.apache.flink.runtime.messages.accumulators.AccumulatorResultsFound;
+import 
org.apache.flink.runtime.messages.accumulators.RequestAccumulatorResults;
+import org.apache.flink.util.SerializedValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A client to interact with a running Flink job.
+ */
+public class JobClientEager implements JobClient {
+
+   private final Logger LOG = LoggerFactory.getLogger(getClass());
+
+   /** The Job's listening context for monitoring and job interaction */
+   private final JobListeningContext jobListeningContext;
+
+   /** Finalization code to run upon shutting down the JobClient */
+   private final List finalizers;
+
+   public JobClientEager(JobListeningContext jobListeningContext) {
+   this.jobListeningContext = jobListeningContext;
+   this.finalizers = new LinkedList<>();
+   }
+
+   /**
+* Blocks until the job finishes and returns the {@link 
JobExecutionResult}
+* @return the result of the job execution
+*/
+   @Override
+   public JobExecutionResult waitForResult() throws JobExecutionException {
+   LOG.info("Waiting for results of Job {}", 
jobListeningContext.getJobID());
+   JobExecutionResult result = 
JobClientActorUtils.awaitJobResult(jobListeningContext);
+   shutdown();
+   return result;
+   }
+
+   /**
+* Gets the job id that this client is bound to
+* @return The JobID of this JobClient
+*/
+   public JobID getJobID() {
+   return jobListeningContext.getJobID();
+   }
+
+   @Override
+   public boolean hasFinished() {
+   return jobListeningContext.getJobResultFuture().isCompleted();
+   }
+
+   /**
+* Cancels a job identified by the job id.
+* @throws Exception In case an error occurred.
+*/
+   @Override
+   public void cancel() throws Exception {
+   final ActorGateway jobClient = 
jobListeningContext.getJobClientGateway();
+
+   final Future response;
+   try {
+   response = jobClient.ask(
+   new JobClientActor.ClientMessage(
+   new 
JobManagerMessages.CancelJob(getJobID())),
+   AkkaUtils.getDefaultTimeoutAsFiniteDuration());
+   } catch (final Exception e) {
+   throw new 

[jira] [Commented] (FLINK-4272) Create a JobClient for job control and monitoring

2016-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4272?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15651000#comment-15651000
 ] 

ASF GitHub Bot commented on FLINK-4272:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2732#discussion_r87175973
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/JobClient.java ---
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common;
+
+import java.util.Map;
+
+/*
+ * An Flink job client interface to interact with running Flink jobs.
+ */
+public interface JobClient {
--- End diff --

`PublicEvolving`?


> Create a JobClient for job control and monitoring 
> --
>
> Key: FLINK-4272
> URL: https://issues.apache.org/jira/browse/FLINK-4272
> Project: Flink
>  Issue Type: New Feature
>  Components: Client
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Minor
> Fix For: 1.2.0
>
>
> The aim of this new features is to expose a client to the user which allows 
> to cancel a running job, retrieve accumulators for a running job, or perform 
> other actions in the future. Let's call it {{JobClient}} for now (although 
> this clashes with the existing JobClient class which could be renamed to 
> JobClientActorUtils instead).
> The new client should be returned from the {{ClusterClient}} class upon job 
> submission. The client should also be instantiatable by the users to retrieve 
> the JobClient with a JobID.
> We should expose the new JobClient to the Java and Scala APIs using a new 
> method on the {{ExecutionEnvironment}} / {{StreamExecutionEnvironment}} 
> called {{executeWithControl()}} (perhaps we can find a better name).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4272) Create a JobClient for job control and monitoring

2016-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4272?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15650975#comment-15650975
 ] 

ASF GitHub Bot commented on FLINK-4272:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2732#discussion_r87052088
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/JobClientEager.java 
---
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.client.program;
+
+import org.apache.flink.api.common.JobClient;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.accumulators.AccumulatorHelper;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.client.JobClientActorUtils;
+import org.apache.flink.runtime.client.JobClientActor;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.client.JobListeningContext;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import 
org.apache.flink.runtime.messages.accumulators.AccumulatorResultsErroneous;
+import 
org.apache.flink.runtime.messages.accumulators.AccumulatorResultsFound;
+import 
org.apache.flink.runtime.messages.accumulators.RequestAccumulatorResults;
+import org.apache.flink.util.SerializedValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A client to interact with a running Flink job.
--- End diff --

What is eager about this `JobClient`?


> Create a JobClient for job control and monitoring 
> --
>
> Key: FLINK-4272
> URL: https://issues.apache.org/jira/browse/FLINK-4272
> Project: Flink
>  Issue Type: New Feature
>  Components: Client
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Minor
> Fix For: 1.2.0
>
>
> The aim of this new features is to expose a client to the user which allows 
> to cancel a running job, retrieve accumulators for a running job, or perform 
> other actions in the future. Let's call it {{JobClient}} for now (although 
> this clashes with the existing JobClient class which could be renamed to 
> JobClientActorUtils instead).
> The new client should be returned from the {{ClusterClient}} class upon job 
> submission. The client should also be instantiatable by the users to retrieve 
> the JobClient with a JobID.
> We should expose the new JobClient to the Java and Scala APIs using a new 
> method on the {{ExecutionEnvironment}} / {{StreamExecutionEnvironment}} 
> called {{executeWithControl()}} (perhaps we can find a better name).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2732: [FLINK-4272] Create a JobClient for job control an...

2016-11-09 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2732#discussion_r87102646
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
 ---
@@ -485,6 +485,52 @@ abstract class FlinkMiniCluster(
   }
 
   @throws(classOf[JobExecutionException])
+  def submitJob(
+jobGraph: JobGraph,
+printUpdates: Boolean)
--- End diff --

The current Flink Scala style indents parameters twice and the return type 
once:

```
def foobar(
x: Int,
y: Float)
  : Double = {
  ...
}
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2732: [FLINK-4272] Create a JobClient for job control an...

2016-11-09 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2732#discussion_r87098405
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/JobClient.java ---
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common;
+
+import java.util.Map;
+
+/*
+ * An Flink job client interface to interact with running Flink jobs.
+ */
+public interface JobClient {
+
+   /**
+* Gets the JobID associated with this JobClient.
+*/
+   JobID getJobID();
+
+   /**
+* Returns a boolean indicating whether the job execution has finished.
+*/
+   boolean hasFinished() throws Exception;
+
+   /**
+* Blocks until the result of the job execution is returned.
+*/
+   JobExecutionResult waitForResult() throws Exception;
+
+   /**
+* Gets the accumulator map of a running job.
+*/
+   Map getAccumulators() throws Exception;
+
+   /**
+* Cancels a running job.
+*/
+   void cancel() throws Exception;
+
+   /**
+* Stops a running job if the job supports stopping.
+*/
+   void stop() throws Exception;
+
+   /**
+* Adds a Runnable to this JobClient to be called
+* when the client is shut down. Runnables are called
+* in the order they are added.
--- End diff --

param and throws description missing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2732: [FLINK-4272] Create a JobClient for job control an...

2016-11-09 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2732#discussion_r87100841
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java 
---
@@ -333,4 +354,31 @@ protected boolean isClientConnected() {
return client != ActorRef.noSender();
}
 
+   public static class ClientMessage implements Serializable {
--- End diff --

serial version UID missing


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2732: [FLINK-4272] Create a JobClient for job control an...

2016-11-09 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2732#discussion_r87184404
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/JobClient.java ---
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common;
+
+import java.util.Map;
+
+/*
+ * An Flink job client interface to interact with running Flink jobs.
+ */
+public interface JobClient {
--- End diff --

Would it make sense to be able to retrieve the `ClusterClient` from the 
`JobClient`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2732: [FLINK-4272] Create a JobClient for job control an...

2016-11-09 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2732#discussion_r87104003
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java
 ---
@@ -91,11 +91,9 @@ public void testFaultyAccumulator() throws Exception {
try {
env.execute();
fail("Should have failed.");
-   } catch (ProgramInvocationException e) {
-   Assert.assertTrue("Exception should be passed:",
-   e.getCause() instanceof 
JobExecutionException);
+   } catch (JobExecutionException e) {
--- End diff --

Aren't we changing the API by no longer throwing a 
`ProgramInvocationException` and instead throwing a `JobExecutionException`? I 
thought that thrown exceptions are part of the defined interface.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2732: [FLINK-4272] Create a JobClient for job control an...

2016-11-09 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2732#discussion_r87102297
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/client/JobListeningContext.java
 ---
@@ -142,4 +209,66 @@ private ActorGateway getJobManager() throws 
JobRetrievalException {
throw new JobRetrievalException(jobID, "Couldn't 
retrieve leading JobManager.", e);
}
}
+
+   /**
+* Reconstructs the class loader by first requesting information about 
it at the JobManager
+* and then downloading missing jar files.
+* @param jobID id of job
+* @param jobManager gateway to the JobManager
+* @param config the flink configuration
+* @return A classloader that should behave like the original 
classloader
+* @throws JobRetrievalException if anything goes wrong
+*/
+   private static ClassLoader retrieveClassLoader(
+   JobID jobID,
+   ActorGateway jobManager,
+   Configuration config)
+   throws JobRetrievalException {
+
+   final Object jmAnswer;
+   try {
+   jmAnswer = Await.result(
+   jobManager.ask(
+   new 
JobManagerMessages.RequestClassloadingProps(jobID),
+   
AkkaUtils.getDefaultTimeoutAsFiniteDuration()),
+   AkkaUtils.getDefaultTimeoutAsFiniteDuration());
+   } catch (Exception e) {
+   throw new JobRetrievalException(jobID, "Couldn't 
retrieve class loading properties from JobManager.", e);
+   }
+
+   if (jmAnswer instanceof JobManagerMessages.ClassloadingProps) {
+   JobManagerMessages.ClassloadingProps props = 
((JobManagerMessages.ClassloadingProps) jmAnswer);
+
+   Option jmHost = 
jobManager.actor().path().address().host();
+   String jmHostname = jmHost.isDefined() ? jmHost.get() : 
"localhost";
+   InetSocketAddress serverAddress = new 
InetSocketAddress(jmHostname, props.blobManagerPort());
+   final BlobCache blobClient = new 
BlobCache(serverAddress, config);
+
+   final List requiredJarFiles = 
props.requiredJarFiles();
+   final List requiredClasspaths = 
props.requiredClasspaths();
+
+   final URL[] allURLs = new URL[requiredJarFiles.size() + 
requiredClasspaths.size()];
+
+   int pos = 0;
+   for (BlobKey blobKey : props.requiredJarFiles()) {
+   try {
+   allURLs[pos++] = 
blobClient.getURL(blobKey);
+   } catch (Exception e) {
+   blobClient.shutdown();
+   throw new JobRetrievalException(jobID, 
"Failed to download BlobKey " + blobKey);
--- End diff --

Exception `e` is swallowed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2732: [FLINK-4272] Create a JobClient for job control an...

2016-11-09 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2732#discussion_r87187863
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/JobClientLazy.java 
---
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.client.program;
+
+import org.apache.flink.api.common.JobClient;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.client.JobExecutionException;
+
+import java.util.Map;
+
+/**
+ * A detached job client which lazily initiates the cluster connection.
+ */
+public class JobClientLazy implements JobClient {
--- End diff --

The distinction between `JobClientEager` and `JobClientLazy` feels a little 
bit clumsy. Can't we get rid of one them and simply have a `JobClientImpl`? The 
only place where `JobClientLazy` is returned is when calling 
`submitJobDetached`. I think in this case, you don't expect to get a 
`JobClient` back because it is submitted in detached mode.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


  1   2   >