[jira] [Commented] (FLINK-7635) Support sideOutput in ProcessWindowFunciton

2017-09-20 Thread Bowen Li (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16174288#comment-16174288
 ] 

Bowen Li commented on FLINK-7635:
-

[~foxss] [~aljoscha]

After taking a quick look at the code structure, I decided to split the task 
into two: this ticket for ProcessWindowFunciton, and FLINK-7660 for 
ProcessAllWindowFunciton.

I'll work on this ticket first, so I can not only focus on one thing while 
getting familiar with the code, but also iterate code and deal with feedbacks 
faster. After finishing this ticket, I guess I should be able to complete 
FLINK-7660 pretty quickly.

> Support sideOutput in ProcessWindowFunciton
> ---
>
> Key: FLINK-7635
> URL: https://issues.apache.org/jira/browse/FLINK-7635
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API, Scala API
>Reporter: Chen Qin
>Assignee: Bowen Li
>Priority: Minor
> Fix For: 1.4.0, 1.3.3
>
>
> [FLINK-4460|https://issues.apache.org/jira/browse/FLINK-4460] only 
> implemented output to ProcessFunction Context. It would be nice to add 
> support to ProcessWindow and ProcessAllWindow functions as well. [email 
> threads|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/question-on-sideoutput-from-ProcessWindow-function-td15500.html]
> [~aljoscha] I thought this is good warm up task for ppl to learn how window 
> function works in general. Otherwise feel free to assign back to me.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7660) Support sideOutput in ProcessAllWindowFunction

2017-09-20 Thread Bowen Li (JIRA)
Bowen Li created FLINK-7660:
---

 Summary: Support sideOutput in ProcessAllWindowFunction
 Key: FLINK-7660
 URL: https://issues.apache.org/jira/browse/FLINK-7660
 Project: Flink
  Issue Type: Improvement
  Components: DataStream API, Scala API
Reporter: Bowen Li
Assignee: Bowen Li
 Fix For: 1.4.0


Add side output support to ProcessAllWindow functions.

This is a sibling ticket for FLINK-7635



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7635) Support sideOutput in ProcessWindowFunciton

2017-09-20 Thread Bowen Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7635?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li updated FLINK-7635:

Summary: Support sideOutput in ProcessWindowFunciton  (was: Support 
sideOutput in ProcessWindowFunciton & ProcessAllWindowFunction)

> Support sideOutput in ProcessWindowFunciton
> ---
>
> Key: FLINK-7635
> URL: https://issues.apache.org/jira/browse/FLINK-7635
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API, Scala API
>Reporter: Chen Qin
>Assignee: Bowen Li
>Priority: Minor
> Fix For: 1.4.0, 1.3.3
>
>
> [FLINK-4460|https://issues.apache.org/jira/browse/FLINK-4460] only 
> implemented output to ProcessFunction Context. It would be nice to add 
> support to ProcessWindow and ProcessAllWindow functions as well. [email 
> threads|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/question-on-sideoutput-from-ProcessWindow-function-td15500.html]
> [~aljoscha] I thought this is good warm up task for ppl to learn how window 
> function works in general. Otherwise feel free to assign back to me.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7571) Execution of TableSources with Time Indicators fails

2017-09-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7571?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16174248#comment-16174248
 ] 

ASF GitHub Bot commented on FLINK-7571:
---

Github user wuchong commented on the issue:

https://github.com/apache/flink/pull/4635
  
Hi @fhueske , thanks for the work! It is a very important fix! I'm fine 
with the changes.

+1 to merge


> Execution of TableSources with Time Indicators fails
> 
>
> Key: FLINK-7571
> URL: https://issues.apache.org/jira/browse/FLINK-7571
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>Priority: Critical
>
> The translation of queries that include a TableSource with time indicators 
> fails during the code generation because field names and field indicies are 
> not adjusted for the time indicators.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4635: [FLINK-7571] [table] Fix translation of TableSource with ...

2017-09-20 Thread wuchong
Github user wuchong commented on the issue:

https://github.com/apache/flink/pull/4635
  
Hi @fhueske , thanks for the work! It is a very important fix! I'm fine 
with the changes.

+1 to merge


---


[jira] [Comment Edited] (FLINK-7657) SQL Timestamps Converted To Wrong Type By Optimizer Causing ClassCastException

2017-09-20 Thread Kent Murra (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16173959#comment-16173959
 ] 

Kent Murra edited comment on FLINK-7657 at 9/21/17 12:34 AM:
-

I've noticed that the scope of the problem is a bit larger than what I found.  
As an example, integer literals in a SQL statement values are parsed as 
BigDecimal and not converted when moved to a literal.  When running the test 
cases I'll see Literals that have a BigDecimal value and a BasicTypeInfo.INT 
type.  It tends to work since the underlying engine considers Integers and 
BigDecimal comparable, however, my concern is that it does not match the 
documentation.  I can see people implementing FilterableTableSource being 
surprised by this.  At the same time, fixing the issue might break people who 
depend on the old behavior.

I think for testing purposes, the ideal thing is to have the literal also 
encode the type so that we can do type verification as well in the test cases.  
However the scope of such a change is very large, and I'm not sure the 
maintainers would want to review such a patch.  I can limit the scope of my 
change but my concern is that it won't be a *good* fix and more of a duct-tape 
thing.

Can I get some guidance on whether I should expand the scope or just limit it 
to the time-based literals as originally reported?

Let me know if there are any questions surrounding this.


was (Author: kmurra):
I've noticed that the scope of the problem is a bit larger than what I found.  
As an example, integer literals in a SQL statement values are parsed as 
BigDecimal and not converted when moved to a literal.  When running the test 
cases I'll see Literals that have a BigDecimal value and a BasicTypeInfo.INT 
type.  It tends to work since the underlying engine considers Integers and 
BigDecimal comparable, however, my concern is that it does not match the 
documentation.  I can see people implementing FilterableTableSource being 
surprised by this.  At the same time, fixing the issue might break people who 
depend on the old behavior.

I think for testing purposes, the ideal thing is to have the literal also 
encode the type so that we can do type verification as well in the test cases.  
However the scope of such a change is very large, and I'm not sure the 
maintainers would want to review such a patch.  I can limit the scope of my 
change but my concern is that it won't be a *good* fix and more of a duct-tape 
thing.

Can I get some guidance on the approach I should take?  

Let me know if there are any questions surrounding this.

> SQL Timestamps Converted To Wrong Type By Optimizer Causing ClassCastException
> --
>
> Key: FLINK-7657
> URL: https://issues.apache.org/jira/browse/FLINK-7657
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.3.1, 1.3.2
>Reporter: Kent Murra
>Assignee: Kent Murra
>Priority: Critical
>
> I have a SQL statement using the Tables API that has a timestamp in it. When 
> the execution environment tries to optimize the SQL, it causes an exception 
> (attached below).  The result is any SQL query with a timestamp, date, or 
> time literal is unexecutable if any table source is marked with 
> FilterableTableSource. 
> {code:none} 
> Exception in thread "main" java.lang.RuntimeException: Error while applying 
> rule PushFilterIntoTableSourceScanRule, args 
> [rel#30:FlinkLogicalCalc.LOGICAL(input=rel#29:Subset#0.LOGICAL,expr#0..1={inputs},expr#2=2017-05-01,expr#3=>($t1,
>  $t2),data=$t0,last_updated=$t1,$condition=$t3), Scan(table:[test_table], 
> fields:(data, last_updated))]
>   at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:235)
>   at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:650)
>   at 
> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:368)
>   at 
> org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:266)
>   at 
> org.apache.flink.table.api.BatchTableEnvironment.optimize(BatchTableEnvironment.scala:298)
>   at 
> org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:328)
>   at 
> org.apache.flink.table.api.BatchTableEnvironment.writeToSink(BatchTableEnvironment.scala:135)
>   at org.apache.flink.table.api.Table.writeToSink(table.scala:800)
>   at org.apache.flink.table.api.Table.writeToSink(table.scala:773)
>   at 
> com.remitly.flink.TestReproductionApp$.delayedEndpoint$com$remitly$flink$TestReproductionApp$1(TestReproductionApp.scala:27)
>   at 
> com.remitly.flink.TestReproductionApp$delayedInit$body.apply(TestReproductionApp.scala:22)
>   at 

[jira] [Comment Edited] (FLINK-7657) SQL Timestamps Converted To Wrong Type By Optimizer Causing ClassCastException

2017-09-20 Thread Kent Murra (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16173959#comment-16173959
 ] 

Kent Murra edited comment on FLINK-7657 at 9/20/17 10:48 PM:
-

I've noticed that the scope of the problem is a bit larger than what I found.  
As an example, integer literals in a SQL statement values are parsed as 
BigDecimal and not converted when moved to a literal.  When running the test 
cases I'll see Literals that have a BigDecimal value and a BasicTypeInfo.INT 
type.  It tends to work since the underlying engine considers Integers and 
BigDecimal comparable, however, my concern is that it does not match the 
documentation.  I can see people implementing FilterableTableSource being 
surprised by this.  At the same time, fixing the issue might break people who 
depend on the old behavior.

I think for testing purposes, the ideal thing is to have the literal also 
encode the type so that we can do type verification as well in the test cases.  
However the scope of such a change is very large, and I'm not sure the 
maintainers would want to review such a patch.  I can limit the scope of my 
change but my concern is that it won't be a *good* fix and more of a duct-tape 
thing.

Can I get some guidance on the approach I should take?  

Let me know if there are any questions surrounding this.


was (Author: kmurra):
I've noticed that the scope of the problem is a bit larger than what I found.  
As an example, integer values are BigDecimal, and when running the test cases 
I'll see Literals that have a BigDecimal value and a BasicTypeInfo.INT type.  
It tends to work since the underlying engine considers Integers and BigDecimal 
comparable, however, my concern is that it does not match the documentation.  I 
can see people implementing FilterableTableSource being surprised by this.  At 
the same time, fixing the issue might break people who depend on the old 
behavior.

I think for testing purposes, the ideal thing is to have the literal also 
encode the type so that we can do type verification as well in the test cases.  
However the scope of such a change is very large, and I'm not sure the 
maintainers would want to review such a patch.  I can limit the scope of my 
change but my concern is that it won't be a *good* fix and more of a duct-tape 
thing.

Can I get some guidance on the approach I should take?  

Let me know if there are any questions surrounding this.

> SQL Timestamps Converted To Wrong Type By Optimizer Causing ClassCastException
> --
>
> Key: FLINK-7657
> URL: https://issues.apache.org/jira/browse/FLINK-7657
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.3.1, 1.3.2
>Reporter: Kent Murra
>Assignee: Kent Murra
>Priority: Critical
>
> I have a SQL statement using the Tables API that has a timestamp in it. When 
> the execution environment tries to optimize the SQL, it causes an exception 
> (attached below).  The result is any SQL query with a timestamp, date, or 
> time literal is unexecutable if any table source is marked with 
> FilterableTableSource. 
> {code:none} 
> Exception in thread "main" java.lang.RuntimeException: Error while applying 
> rule PushFilterIntoTableSourceScanRule, args 
> [rel#30:FlinkLogicalCalc.LOGICAL(input=rel#29:Subset#0.LOGICAL,expr#0..1={inputs},expr#2=2017-05-01,expr#3=>($t1,
>  $t2),data=$t0,last_updated=$t1,$condition=$t3), Scan(table:[test_table], 
> fields:(data, last_updated))]
>   at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:235)
>   at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:650)
>   at 
> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:368)
>   at 
> org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:266)
>   at 
> org.apache.flink.table.api.BatchTableEnvironment.optimize(BatchTableEnvironment.scala:298)
>   at 
> org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:328)
>   at 
> org.apache.flink.table.api.BatchTableEnvironment.writeToSink(BatchTableEnvironment.scala:135)
>   at org.apache.flink.table.api.Table.writeToSink(table.scala:800)
>   at org.apache.flink.table.api.Table.writeToSink(table.scala:773)
>   at 
> com.remitly.flink.TestReproductionApp$.delayedEndpoint$com$remitly$flink$TestReproductionApp$1(TestReproductionApp.scala:27)
>   at 
> com.remitly.flink.TestReproductionApp$delayedInit$body.apply(TestReproductionApp.scala:22)
>   at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
>   at 
> scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
>   at 

[jira] [Commented] (FLINK-7657) SQL Timestamps Converted To Wrong Type By Optimizer Causing ClassCastException

2017-09-20 Thread Kent Murra (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16173959#comment-16173959
 ] 

Kent Murra commented on FLINK-7657:
---

I've noticed that the scope of the problem is a bit larger than what I found.  
As an example, integer values are BigDecimal, and when running the test cases 
I'll see Literals that have a BigDecimal value and a BasicTypeInfo.INT type.  
It tends to work since the underlying engine considers Integers and BigDecimal 
comparable, however, my concern is that it does not match the documentation.  I 
can see people implementing FilterableTableSource being surprised by this.  At 
the same time, fixing the issue might break people who depend on the old 
behavior.

I think for testing purposes, the ideal thing is to have the literal also 
encode the type so that we can do type verification as well in the test cases.  
However the scope of such a change is very large, and I'm not sure the 
maintainers would want to review such a patch.  I can limit the scope of my 
change but my concern is that it won't be a *good* fix and more of a duct-tape 
thing.

Can I get some guidance on the approach I should take?  

Let me know if there are any questions surrounding this.

> SQL Timestamps Converted To Wrong Type By Optimizer Causing ClassCastException
> --
>
> Key: FLINK-7657
> URL: https://issues.apache.org/jira/browse/FLINK-7657
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.3.1, 1.3.2
>Reporter: Kent Murra
>Assignee: Kent Murra
>Priority: Critical
>
> I have a SQL statement using the Tables API that has a timestamp in it. When 
> the execution environment tries to optimize the SQL, it causes an exception 
> (attached below).  The result is any SQL query with a timestamp, date, or 
> time literal is unexecutable if any table source is marked with 
> FilterableTableSource. 
> {code:none} 
> Exception in thread "main" java.lang.RuntimeException: Error while applying 
> rule PushFilterIntoTableSourceScanRule, args 
> [rel#30:FlinkLogicalCalc.LOGICAL(input=rel#29:Subset#0.LOGICAL,expr#0..1={inputs},expr#2=2017-05-01,expr#3=>($t1,
>  $t2),data=$t0,last_updated=$t1,$condition=$t3), Scan(table:[test_table], 
> fields:(data, last_updated))]
>   at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:235)
>   at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:650)
>   at 
> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:368)
>   at 
> org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:266)
>   at 
> org.apache.flink.table.api.BatchTableEnvironment.optimize(BatchTableEnvironment.scala:298)
>   at 
> org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:328)
>   at 
> org.apache.flink.table.api.BatchTableEnvironment.writeToSink(BatchTableEnvironment.scala:135)
>   at org.apache.flink.table.api.Table.writeToSink(table.scala:800)
>   at org.apache.flink.table.api.Table.writeToSink(table.scala:773)
>   at 
> com.remitly.flink.TestReproductionApp$.delayedEndpoint$com$remitly$flink$TestReproductionApp$1(TestReproductionApp.scala:27)
>   at 
> com.remitly.flink.TestReproductionApp$delayedInit$body.apply(TestReproductionApp.scala:22)
>   at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
>   at 
> scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
>   at scala.App$$anonfun$main$1.apply(App.scala:76)
>   at scala.App$$anonfun$main$1.apply(App.scala:76)
>   at scala.collection.immutable.List.foreach(List.scala:381)
>   at 
> scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
>   at scala.App$class.main(App.scala:76)
>   at 
> com.remitly.flink.TestReproductionApp$.main(TestReproductionApp.scala:22)
>   at com.remitly.flink.TestReproductionApp.main(TestReproductionApp.scala)
> Caused by: java.lang.ClassCastException: java.util.GregorianCalendar cannot 
> be cast to java.util.Date
>   at 
> org.apache.flink.table.expressions.Literal.dateToCalendar(literals.scala:107)
>   at 
> org.apache.flink.table.expressions.Literal.toRexNode(literals.scala:80)
>   at 
> org.apache.flink.table.expressions.BinaryComparison$$anonfun$toRexNode$1.apply(comparison.scala:35)
>   at 
> org.apache.flink.table.expressions.BinaryComparison$$anonfun$toRexNode$1.apply(comparison.scala:35)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 

[jira] [Commented] (FLINK-7552) Extend SinkFunction interface with SinkContext

2017-09-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16173936#comment-16173936
 ] 

ASF GitHub Bot commented on FLINK-7552:
---

Github user EronWright commented on the issue:

https://github.com/apache/flink/pull/4616
  
+1


> Extend SinkFunction interface with SinkContext
> --
>
> Key: FLINK-7552
> URL: https://issues.apache.org/jira/browse/FLINK-7552
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 1.4.0
>
>
> Now that we require Java 8 we can extend the {{SinkFunction}} interface 
> without breaking backwards compatibility. I'm proposing this:
> {code}
> /**
>  * Interface for implementing user defined sink functionality.
>  *
>  * @param  Input type parameter.
>  */
> @Public
> public interface SinkFunction extends Function, Serializable {
>   /**
>* Function for standard sink behaviour. This function is called for 
> every record.
>*
>* @param value The input record.
>* @throws Exception
>* @deprecated Use {@link #invoke(SinkContext, Object)}.
>*/
>   @Deprecated
>   default void invoke(IN value) throws Exception {
>   }
>   /**
>* Writes the given value to the sink. This function is called for 
> every record.
>*
>* @param context Additional context about the input record.
>* @param value The input record.
>* @throws Exception
>*/
>   default void invoke(SinkContext context, IN value) throws Exception {
>   invoke(value);
>   }
>   /**
>* Context that {@link SinkFunction SinkFunctions } can use for getting 
> additional data about
>* an input record.
>*
>* @param  The type of elements accepted by the sink.
>*/
>   @Public // Interface might be extended in the future with additional 
> methods.
>   interface SinkContext {
>   /**
>* Returns the timestamp of the current input record.
>*/
>   long timestamp();
>   }
> }
> {code}
> For now, this only allows access to the element timestamp. This would allow 
> us to fix the abomination that is {{FlinkKafkaProducer010}}, which is a 
> hybrid {{SinkFunction}}/{{StreamOperator}} only because it needs access to 
> timestamps.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4616: [FLINK-7552] [FLINK-7553] Enhance SinkInterface / Use in ...

2017-09-20 Thread EronWright
Github user EronWright commented on the issue:

https://github.com/apache/flink/pull/4616
  
+1


---


[jira] [Commented] (FLINK-7552) Extend SinkFunction interface with SinkContext

2017-09-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16173931#comment-16173931
 ] 

ASF GitHub Bot commented on FLINK-7552:
---

Github user EronWright commented on a diff in the pull request:

https://github.com/apache/flink/pull/4616#discussion_r140108752
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.java
 ---
@@ -31,10 +31,50 @@
 public interface SinkFunction extends Function, Serializable {
 
/**
-* Function for standard sink behaviour. This function is called for 
every record.
+* @deprecated Use {@link #invoke(Object, Context)}.
+*/
+   @Deprecated
+   default void invoke(IN value) throws Exception {}
+
+   /**
+* Writes the given value to the sink. This function is called for 
every record.
+*
+* You have to override this method when implementing a {@code 
SinkFunction}, this is a
+* {@code default} method for backward compatibility with the old-style 
method only.
 *
 * @param value The input record.
-* @throws Exception
+* @param context Additional context about the input record.
+*
+* @throws Exception This method may throw exceptions. Throwing an 
exception will cause the operation
+*   to fail and may trigger recovery.
 */
-   void invoke(IN value) throws Exception;
+   default void invoke(IN value, Context context) throws Exception {
+   invoke(value);
+   }
+
+   /**
+* Context that {@link SinkFunction SinkFunctions } can use for getting 
additional data about
--- End diff --

Is the link in this comment well-formed?


> Extend SinkFunction interface with SinkContext
> --
>
> Key: FLINK-7552
> URL: https://issues.apache.org/jira/browse/FLINK-7552
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 1.4.0
>
>
> Now that we require Java 8 we can extend the {{SinkFunction}} interface 
> without breaking backwards compatibility. I'm proposing this:
> {code}
> /**
>  * Interface for implementing user defined sink functionality.
>  *
>  * @param  Input type parameter.
>  */
> @Public
> public interface SinkFunction extends Function, Serializable {
>   /**
>* Function for standard sink behaviour. This function is called for 
> every record.
>*
>* @param value The input record.
>* @throws Exception
>* @deprecated Use {@link #invoke(SinkContext, Object)}.
>*/
>   @Deprecated
>   default void invoke(IN value) throws Exception {
>   }
>   /**
>* Writes the given value to the sink. This function is called for 
> every record.
>*
>* @param context Additional context about the input record.
>* @param value The input record.
>* @throws Exception
>*/
>   default void invoke(SinkContext context, IN value) throws Exception {
>   invoke(value);
>   }
>   /**
>* Context that {@link SinkFunction SinkFunctions } can use for getting 
> additional data about
>* an input record.
>*
>* @param  The type of elements accepted by the sink.
>*/
>   @Public // Interface might be extended in the future with additional 
> methods.
>   interface SinkContext {
>   /**
>* Returns the timestamp of the current input record.
>*/
>   long timestamp();
>   }
> }
> {code}
> For now, this only allows access to the element timestamp. This would allow 
> us to fix the abomination that is {{FlinkKafkaProducer010}}, which is a 
> hybrid {{SinkFunction}}/{{StreamOperator}} only because it needs access to 
> timestamps.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4616: [FLINK-7552] [FLINK-7553] Enhance SinkInterface / ...

2017-09-20 Thread EronWright
Github user EronWright commented on a diff in the pull request:

https://github.com/apache/flink/pull/4616#discussion_r140108752
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.java
 ---
@@ -31,10 +31,50 @@
 public interface SinkFunction extends Function, Serializable {
 
/**
-* Function for standard sink behaviour. This function is called for 
every record.
+* @deprecated Use {@link #invoke(Object, Context)}.
+*/
+   @Deprecated
+   default void invoke(IN value) throws Exception {}
+
+   /**
+* Writes the given value to the sink. This function is called for 
every record.
+*
+* You have to override this method when implementing a {@code 
SinkFunction}, this is a
+* {@code default} method for backward compatibility with the old-style 
method only.
 *
 * @param value The input record.
-* @throws Exception
+* @param context Additional context about the input record.
+*
+* @throws Exception This method may throw exceptions. Throwing an 
exception will cause the operation
+*   to fail and may trigger recovery.
 */
-   void invoke(IN value) throws Exception;
+   default void invoke(IN value, Context context) throws Exception {
+   invoke(value);
+   }
+
+   /**
+* Context that {@link SinkFunction SinkFunctions } can use for getting 
additional data about
--- End diff --

Is the link in this comment well-formed?


---


[jira] [Created] (FLINK-7659) Unprotected access to inProgress in JobCancellationWithSavepointHandlers#handleNewRequest

2017-09-20 Thread Ted Yu (JIRA)
Ted Yu created FLINK-7659:
-

 Summary: Unprotected access to inProgress in 
JobCancellationWithSavepointHandlers#handleNewRequest
 Key: FLINK-7659
 URL: https://issues.apache.org/jira/browse/FLINK-7659
 Project: Flink
  Issue Type: Bug
Reporter: Ted Yu


Here is related code:
{code}
  } finally {
inProgress.remove(jobId);
  }
{code}
A little lower, in another finally block, there is:
{code}
  synchronized (lock) {
if (!success) {
  inProgress.remove(jobId);
{code}
which is correct.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16173814#comment-16173814
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r140089533
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/MultisetRelDataType.scala
 ---
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.schema
+
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.sql.`type`.MultisetSqlType
+import org.apache.flink.api.common.typeinfo.TypeInformation
+
+class MultisetRelDataType(
+val typeInfo: TypeInformation[_],
+elementType: RelDataType,
+isNullable: Boolean)
+  extends MultisetSqlType(
+elementType,
+isNullable) {
+
+  override def toString = s"MULTISET($typeInfo)"
--- End diff --

Done


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

2017-09-20 Thread suez1224
Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r140089533
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/MultisetRelDataType.scala
 ---
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.schema
+
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.sql.`type`.MultisetSqlType
+import org.apache.flink.api.common.typeinfo.TypeInformation
+
+class MultisetRelDataType(
+val typeInfo: TypeInformation[_],
+elementType: RelDataType,
+isNullable: Boolean)
+  extends MultisetSqlType(
+elementType,
+isNullable) {
+
+  override def toString = s"MULTISET($typeInfo)"
--- End diff --

Done


---


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16173810#comment-16173810
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r140089018
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/AggregateITCase.scala
 ---
@@ -329,6 +329,35 @@ class AggregateITCase(
   }
 
   @Test
+  def testTumbleWindowAggregateWithCollect(): Unit = {
+
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+val sqlQuery =
+  "SELECT b, COLLECT(b)" +
--- End diff --

Updated the documentation.

Table API ticket created: 
https://issues.apache.org/jira/browse/FLINK-7658?filter=-1


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

2017-09-20 Thread suez1224
Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r140089018
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/AggregateITCase.scala
 ---
@@ -329,6 +329,35 @@ class AggregateITCase(
   }
 
   @Test
+  def testTumbleWindowAggregateWithCollect(): Unit = {
+
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+val sqlQuery =
+  "SELECT b, COLLECT(b)" +
--- End diff --

Updated the documentation.

Table API ticket created: 
https://issues.apache.org/jira/browse/FLINK-7658?filter=-1


---


[jira] [Created] (FLINK-7658) Support COLLECT Aggregate function in Flink TABLE API

2017-09-20 Thread Shuyi Chen (JIRA)
Shuyi Chen created FLINK-7658:
-

 Summary: Support COLLECT Aggregate function in Flink TABLE API
 Key: FLINK-7658
 URL: https://issues.apache.org/jira/browse/FLINK-7658
 Project: Flink
  Issue Type: New Feature
  Components: Table API & SQL
Reporter: Shuyi Chen
Assignee: Shuyi Chen






--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

2017-09-20 Thread suez1224
Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r140083074
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/CollectAggFunction.scala
 ---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.aggfunctions
+
+import java.lang.{Iterable => JIterable}
+import java.util
+import java.util.function.BiFunction
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple1 => JTuple1}
+import org.apache.flink.api.java.typeutils.{GenericTypeInfo, TupleTypeInfo}
+import org.apache.flink.table.functions.AggregateFunction
+
+import scala.collection.JavaConverters._
+
+/** The initial accumulator for Collect aggregate function */
+class CollectAccumulator[E] extends JTuple1[util.Map[E, Integer]]
+
+abstract class CollectAggFunction[E]
+  extends AggregateFunction[util.Map[E, Integer], CollectAccumulator[E]] {
+
+  @transient
+  private lazy val addFunction = new BiFunction[Integer, Integer, Integer] 
{
+override def apply(t: Integer, u: Integer): Integer = t + u
+  }
+
+  override def createAccumulator(): CollectAccumulator[E] = {
+val acc = new CollectAccumulator[E]()
+acc.f0 = new util.HashMap[E, Integer]()
+acc
+  }
+
+  def accumulate(accumulator: CollectAccumulator[E], value: E): Unit = {
+if (value != null) {
+  if (accumulator.f0.containsKey(value)) {
+val add = (x: Integer, y: Integer) => x + y
--- End diff --

yes, removed.


---


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16173777#comment-16173777
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r140083074
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/CollectAggFunction.scala
 ---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.aggfunctions
+
+import java.lang.{Iterable => JIterable}
+import java.util
+import java.util.function.BiFunction
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple1 => JTuple1}
+import org.apache.flink.api.java.typeutils.{GenericTypeInfo, TupleTypeInfo}
+import org.apache.flink.table.functions.AggregateFunction
+
+import scala.collection.JavaConverters._
+
+/** The initial accumulator for Collect aggregate function */
+class CollectAccumulator[E] extends JTuple1[util.Map[E, Integer]]
+
+abstract class CollectAggFunction[E]
+  extends AggregateFunction[util.Map[E, Integer], CollectAccumulator[E]] {
+
+  @transient
+  private lazy val addFunction = new BiFunction[Integer, Integer, Integer] 
{
+override def apply(t: Integer, u: Integer): Integer = t + u
+  }
+
+  override def createAccumulator(): CollectAccumulator[E] = {
+val acc = new CollectAccumulator[E]()
+acc.f0 = new util.HashMap[E, Integer]()
+acc
+  }
+
+  def accumulate(accumulator: CollectAccumulator[E], value: E): Unit = {
+if (value != null) {
+  if (accumulator.f0.containsKey(value)) {
+val add = (x: Integer, y: Integer) => x + y
--- End diff --

yes, removed.


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7571) Execution of TableSources with Time Indicators fails

2017-09-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7571?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16173670#comment-16173670
 ] 

ASF GitHub Bot commented on FLINK-7571:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/4635
  
@wuchong can you have a look at this PR? 
It fixes the code generation for table sources with time attributes. 
Thanks!


> Execution of TableSources with Time Indicators fails
> 
>
> Key: FLINK-7571
> URL: https://issues.apache.org/jira/browse/FLINK-7571
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>Priority: Critical
>
> The translation of queries that include a TableSource with time indicators 
> fails during the code generation because field names and field indicies are 
> not adjusted for the time indicators.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4635: [FLINK-7571] [table] Fix translation of TableSource with ...

2017-09-20 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/4635
  
@wuchong can you have a look at this PR? 
It fixes the code generation for table sources with time attributes. 
Thanks!


---


[jira] [Commented] (FLINK-7486) flink-mesos: Support for adding unique attribute / group_by attribute constraints

2017-09-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16173652#comment-16173652
 ] 

ASF GitHub Bot commented on FLINK-7486:
---

Github user EronWright commented on a diff in the pull request:

https://github.com/apache/flink/pull/4628#discussion_r140021958
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
 ---
@@ -366,6 +425,34 @@ public String call(String s) {
}
 
/**
+* Internal class that stores the parsed information about soft 
constraint
+* It encapsulates fields:
+*  1. Host attribute name
+*  2. Expected number of unique values for given host attribute
+*  3. A callback coTaskGetter used while evaluating balancing 
constraint
+*/
+   static class BalancedHostAttrConstraintParams {
--- End diff --

Please let us make this configuration class immutable.   Mark the 
`hostAttr` and `numOfExpectedUniqueValues` fields as final.  Move 
`coTasksGetter` elsewhere.


> flink-mesos: Support for adding unique attribute / group_by attribute 
> constraints
> -
>
> Key: FLINK-7486
> URL: https://issues.apache.org/jira/browse/FLINK-7486
> Project: Flink
>  Issue Type: Improvement
>  Components: Mesos
>Affects Versions: 1.3.2
>Reporter: Bhumika Bayani
>Assignee: Bhumika Bayani
>
> In our setup, we have multiple mesos-workers. Inspite of this, flink 
> application master most of the times ends up spawning all task-managers on 
> same mesos-worker.
> We intend to ensure HA of task managers. We would like to make sure each 
> task-manager is running on different mesos-worker as well as such 
> mesos-worker which does not share the AZ attribute with earlier task manager 
> instances.
> Netflix-fenzo supports adding UniqueHostAttribute and BalancedHostAttribute 
> contraints. Flink-mesos should also enable us to add these kind of 
> constraints.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7486) flink-mesos: Support for adding unique attribute / group_by attribute constraints

2017-09-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16173649#comment-16173649
 ] 

ASF GitHub Bot commented on FLINK-7486:
---

Github user EronWright commented on a diff in the pull request:

https://github.com/apache/flink/pull/4628#discussion_r140022455
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
 ---
@@ -155,7 +155,7 @@ public int getPorts() {
 
@Override
public List 
getSoftConstraints() {
-   return null;
+   return params.softConstraints();
--- End diff --

Let's pass the soft constraints to the launchable mesos worker in its 
constructor, rather than fetching them from the params.The params aren't a 
good vehicle for this information.


> flink-mesos: Support for adding unique attribute / group_by attribute 
> constraints
> -
>
> Key: FLINK-7486
> URL: https://issues.apache.org/jira/browse/FLINK-7486
> Project: Flink
>  Issue Type: Improvement
>  Components: Mesos
>Affects Versions: 1.3.2
>Reporter: Bhumika Bayani
>Assignee: Bhumika Bayani
>
> In our setup, we have multiple mesos-workers. Inspite of this, flink 
> application master most of the times ends up spawning all task-managers on 
> same mesos-worker.
> We intend to ensure HA of task managers. We would like to make sure each 
> task-manager is running on different mesos-worker as well as such 
> mesos-worker which does not share the AZ attribute with earlier task manager 
> instances.
> Netflix-fenzo supports adding UniqueHostAttribute and BalancedHostAttribute 
> contraints. Flink-mesos should also enable us to add these kind of 
> constraints.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7486) flink-mesos: Support for adding unique attribute / group_by attribute constraints

2017-09-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16173651#comment-16173651
 ] 

ASF GitHub Bot commented on FLINK-7486:
---

Github user EronWright commented on a diff in the pull request:

https://github.com/apache/flink/pull/4628#discussion_r140038861
  
--- Diff: docs/ops/deployment/mesos.md ---
@@ -226,6 +226,10 @@ When running Flink with Marathon, the whole Flink 
cluster including the job mana
 Takes a comma-separated list of key:value pairs corresponding to the 
attributes exposed by the target
 mesos agents.  Example: `az:eu-west-1a,series:t2`
 
+`mesos.constraints.soft.balanced`: Soft Constraints for balancing the 
tasks across mesos based on agent attributes (**DEFAULT**: None).
+Takes a comma-separated list of key=value pairs. Key corresponds to host 
attribute and value is number of expected unique values for given host 
attribute.
+Example: `az=3,rack_id=4`
--- End diff --

This syntax seems confusing since it reads like a constraint that `az` will 
be equal to `3`.  How about: `az(3),rack_id(4)`.

Feel free to use a regular expression to easily parse it (just a 
suggestion): `(?:(\w+)\((\d+)\)(?=,|$))`


> flink-mesos: Support for adding unique attribute / group_by attribute 
> constraints
> -
>
> Key: FLINK-7486
> URL: https://issues.apache.org/jira/browse/FLINK-7486
> Project: Flink
>  Issue Type: Improvement
>  Components: Mesos
>Affects Versions: 1.3.2
>Reporter: Bhumika Bayani
>Assignee: Bhumika Bayani
>
> In our setup, we have multiple mesos-workers. Inspite of this, flink 
> application master most of the times ends up spawning all task-managers on 
> same mesos-worker.
> We intend to ensure HA of task managers. We would like to make sure each 
> task-manager is running on different mesos-worker as well as such 
> mesos-worker which does not share the AZ attribute with earlier task manager 
> instances.
> Netflix-fenzo supports adding UniqueHostAttribute and BalancedHostAttribute 
> contraints. Flink-mesos should also enable us to add these kind of 
> constraints.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7486) flink-mesos: Support for adding unique attribute / group_by attribute constraints

2017-09-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16173650#comment-16173650
 ] 

ASF GitHub Bot commented on FLINK-7486:
---

Github user EronWright commented on a diff in the pull request:

https://github.com/apache/flink/pull/4628#discussion_r140051191
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
 ---
@@ -663,6 +666,7 @@ private void taskTerminated(Protos.TaskID taskID, 
Protos.TaskStatus status) {
// 

 
private LaunchableMesosWorker createLaunchableMesosWorker(Protos.TaskID 
taskID) {
+   setCoTaskGetter();
--- End diff --

I believe that this approach is violating an Akka rule by passing a direct 
reference to the RM object to the launch coordinator.   I understand the 
challenge to be that Fenzo's `BalancedHostAttrConstraint` expects a 
`coTasksGetter` function 
([ref](http://netflix.github.io/Fenzo/fenzo-core/com/netflix/fenzo/plugins/BalancedHostAttrConstraint.html#BalancedHostAttrConstraint-com.netflix.fenzo.functions.Func1-java.lang.String-int-)).
   But that function cannot keep a reference to the RM object.  

 An imperfect solution would be to pass the `taskIds` to the 
`LaunchableMesosWorker` constructor; imperfect because the list wouldn't be 
dynamic.

In truth we want all tasks to be balanced and so the getter is actually 
overkill.  If you look at the implementation of `BalancedHostAttrConstraint` 
([ref](https://github.com/Netflix/Fenzo/blob/master/fenzo-core/src/main/java/com/netflix/fenzo/plugins/BalancedHostAttrConstraint.java))
 it uses the function output to lookup tasks in the `TaskTrackerState`.   A 
custom constraint could simply use all tasks in `TaskTrackerState`  without 
need for a `coTasksGetter`.

Once we get this sorted out, some of the code should be moved to a utility 
class.


> flink-mesos: Support for adding unique attribute / group_by attribute 
> constraints
> -
>
> Key: FLINK-7486
> URL: https://issues.apache.org/jira/browse/FLINK-7486
> Project: Flink
>  Issue Type: Improvement
>  Components: Mesos
>Affects Versions: 1.3.2
>Reporter: Bhumika Bayani
>Assignee: Bhumika Bayani
>
> In our setup, we have multiple mesos-workers. Inspite of this, flink 
> application master most of the times ends up spawning all task-managers on 
> same mesos-worker.
> We intend to ensure HA of task managers. We would like to make sure each 
> task-manager is running on different mesos-worker as well as such 
> mesos-worker which does not share the AZ attribute with earlier task manager 
> instances.
> Netflix-fenzo supports adding UniqueHostAttribute and BalancedHostAttribute 
> contraints. Flink-mesos should also enable us to add these kind of 
> constraints.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4628: [FLINK-7486]:[flink-mesos]:Support for adding uniq...

2017-09-20 Thread EronWright
Github user EronWright commented on a diff in the pull request:

https://github.com/apache/flink/pull/4628#discussion_r140051191
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
 ---
@@ -663,6 +666,7 @@ private void taskTerminated(Protos.TaskID taskID, 
Protos.TaskStatus status) {
// 

 
private LaunchableMesosWorker createLaunchableMesosWorker(Protos.TaskID 
taskID) {
+   setCoTaskGetter();
--- End diff --

I believe that this approach is violating an Akka rule by passing a direct 
reference to the RM object to the launch coordinator.   I understand the 
challenge to be that Fenzo's `BalancedHostAttrConstraint` expects a 
`coTasksGetter` function 
([ref](http://netflix.github.io/Fenzo/fenzo-core/com/netflix/fenzo/plugins/BalancedHostAttrConstraint.html#BalancedHostAttrConstraint-com.netflix.fenzo.functions.Func1-java.lang.String-int-)).
   But that function cannot keep a reference to the RM object.  

 An imperfect solution would be to pass the `taskIds` to the 
`LaunchableMesosWorker` constructor; imperfect because the list wouldn't be 
dynamic.

In truth we want all tasks to be balanced and so the getter is actually 
overkill.  If you look at the implementation of `BalancedHostAttrConstraint` 
([ref](https://github.com/Netflix/Fenzo/blob/master/fenzo-core/src/main/java/com/netflix/fenzo/plugins/BalancedHostAttrConstraint.java))
 it uses the function output to lookup tasks in the `TaskTrackerState`.   A 
custom constraint could simply use all tasks in `TaskTrackerState`  without 
need for a `coTasksGetter`.

Once we get this sorted out, some of the code should be moved to a utility 
class.


---


[GitHub] flink pull request #4628: [FLINK-7486]:[flink-mesos]:Support for adding uniq...

2017-09-20 Thread EronWright
Github user EronWright commented on a diff in the pull request:

https://github.com/apache/flink/pull/4628#discussion_r140021958
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
 ---
@@ -366,6 +425,34 @@ public String call(String s) {
}
 
/**
+* Internal class that stores the parsed information about soft 
constraint
+* It encapsulates fields:
+*  1. Host attribute name
+*  2. Expected number of unique values for given host attribute
+*  3. A callback coTaskGetter used while evaluating balancing 
constraint
+*/
+   static class BalancedHostAttrConstraintParams {
--- End diff --

Please let us make this configuration class immutable.   Mark the 
`hostAttr` and `numOfExpectedUniqueValues` fields as final.  Move 
`coTasksGetter` elsewhere.


---


[GitHub] flink pull request #4628: [FLINK-7486]:[flink-mesos]:Support for adding uniq...

2017-09-20 Thread EronWright
Github user EronWright commented on a diff in the pull request:

https://github.com/apache/flink/pull/4628#discussion_r140022455
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
 ---
@@ -155,7 +155,7 @@ public int getPorts() {
 
@Override
public List 
getSoftConstraints() {
-   return null;
+   return params.softConstraints();
--- End diff --

Let's pass the soft constraints to the launchable mesos worker in its 
constructor, rather than fetching them from the params.The params aren't a 
good vehicle for this information.


---


[GitHub] flink pull request #4628: [FLINK-7486]:[flink-mesos]:Support for adding uniq...

2017-09-20 Thread EronWright
Github user EronWright commented on a diff in the pull request:

https://github.com/apache/flink/pull/4628#discussion_r140038861
  
--- Diff: docs/ops/deployment/mesos.md ---
@@ -226,6 +226,10 @@ When running Flink with Marathon, the whole Flink 
cluster including the job mana
 Takes a comma-separated list of key:value pairs corresponding to the 
attributes exposed by the target
 mesos agents.  Example: `az:eu-west-1a,series:t2`
 
+`mesos.constraints.soft.balanced`: Soft Constraints for balancing the 
tasks across mesos based on agent attributes (**DEFAULT**: None).
+Takes a comma-separated list of key=value pairs. Key corresponds to host 
attribute and value is number of expected unique values for given host 
attribute.
+Example: `az=3,rack_id=4`
--- End diff --

This syntax seems confusing since it reads like a constraint that `az` will 
be equal to `3`.  How about: `az(3),rack_id(4)`.

Feel free to use a regular expression to easily parse it (just a 
suggestion): `(?:(\w+)\((\d+)\)(?=,|$))`


---


[GitHub] flink issue #4657: [FLINK-7600][kinesis] shorten delay of KinesisProducerCon...

2017-09-20 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/4657
  
@tzulitai Thank you, Gordon!


---


[jira] [Commented] (FLINK-7508) switch FlinkKinesisProducer to use KPL's ThreadingMode to ThreadedPool mode rather than Per_Request mode

2017-09-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7508?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16173629#comment-16173629
 ] 

ASF GitHub Bot commented on FLINK-7508:
---

Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/4656
  
@tzulitai Thank you! 


> switch FlinkKinesisProducer to use KPL's ThreadingMode to ThreadedPool mode 
> rather than Per_Request mode
> 
>
> Key: FLINK-7508
> URL: https://issues.apache.org/jira/browse/FLINK-7508
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Affects Versions: 1.3.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Critical
> Fix For: 1.4.0
>
>
> KinesisProducerLibrary (KPL) 0.10.x had been using a 
> One-New-Thread-Per-Request model for all requests sent to AWS Kinesis, which 
> is very expensive.
> 0.12.4 introduced a new [ThreadingMode - 
> Pooled|https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/KinesisProducerConfiguration.java#L225],
>  which will use a thread pool. This hugely improves KPL's performance and 
> reduces consumed resources. By default, KPL still uses per-request mode. We 
> should explicitly switch FlinkKinesisProducer's KPL threading mode to 
> 'Pooled'.
> This work depends on FLINK-7366 and FLINK-7508
> Benchmarking I did:
> * Environment: Running a Flink hourly-sliding windowing job on 18-node EMR 
> cluster with R4.2xlarge instances. Each hourly-sliding window in the Flink 
> job generates about 21million UserRecords, which means that we generated a 
> test load of 21million UserRecords at the first minute of each hour.
> * Criteria: Test KPL throughput per minute. Since the default RecordTTL for 
> KPL is 30 sec, we can be sure that either all UserRecords are sent by KPL 
> within a minute, or we will see UserRecord expiration errors.
> * One-New-Thread-Per-Request model: max throughput is about 2million 
> UserRecords per min; it doesn't go beyond that because CPU utilization goes 
> to 100%, everything stopped working and that Flink job crashed.
> * Thread-Pool model with pool size of 10: it sends out 21million UserRecords 
> within 30 sec without any UserRecord expiration errors. The average peak CPU 
> utilization is about 20% - 30%. So 21million UserRecords/min is not the max 
> throughput of thread-pool model. We didn't go any further because 1) this 
> throughput is already a couple times more than what we really need, and 2) we 
> don't have a quick way of increasing the test load
> Thus, I propose switching FlinkKinesisProducer to Thread-Pool mode. 
> [~tzulitai] What do you think



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7600) shorten delay of KinesisProducerConfiguration.setCredentialsRefreshDelay() to avoid updateCredentials Exception

2017-09-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16173631#comment-16173631
 ] 

ASF GitHub Bot commented on FLINK-7600:
---

Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/4657
  
@tzulitai Thank you, Gordon!


> shorten delay of KinesisProducerConfiguration.setCredentialsRefreshDelay() to 
> avoid updateCredentials Exception
> ---
>
> Key: FLINK-7600
> URL: https://issues.apache.org/jira/browse/FLINK-7600
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.3.2
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Minor
> Fix For: 1.4.0, 1.3.3
>
>
> we saw the following warning in Flink log:
> {code:java}
> 2017-08-11 02:33:24,473 WARN  
> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.Daemon
>   - Exception during updateCredentials
> java.lang.InterruptedException: sleep interrupted
>   at java.lang.Thread.sleep(Native Method)
>   at 
> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.Daemon$5.run(Daemon.java:316)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
> {code}
> According to discussion in 
> https://github.com/awslabs/amazon-kinesis-producer/issues/10, setting the 
> delay to 100 will fix this issue



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4656: [FLINK-7508][kinesis] switch FlinkKinesisProducer to use ...

2017-09-20 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/4656
  
@tzulitai Thank you! 


---


[jira] [Commented] (FLINK-7603) Support within clause in MatchRecognize

2017-09-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7603?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16173626#comment-16173626
 ] 

ASF GitHub Bot commented on FLINK-7603:
---

Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/4684
  
cc @zentol 


> Support within clause in MatchRecognize
> ---
>
> Key: FLINK-7603
> URL: https://issues.apache.org/jira/browse/FLINK-7603
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API & SQL
>Reporter: Dian Fu
>Assignee: Dian Fu
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7603) Support within clause in MatchRecognize

2017-09-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7603?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16173625#comment-16173625
 ] 

ASF GitHub Bot commented on FLINK-7603:
---

Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4684#discussion_r140050554
  
--- Diff: docs/ops/cli.md ---
@@ -138,17 +138,30 @@ This allows the job to finish processing all inflight 
data.
 
 [Savepoints]({{site.baseurl}}/ops/state/savepoints.html) are controlled 
via the command line client:
 
- Trigger a savepoint
+ Trigger a Savepoint
 
 {% highlight bash %}
-./bin/flink savepoint  [savepointDirectory]
+./bin/flink savepoint  [savepointDirectory]
 {% endhighlight %}
 
-Returns the path of the created savepoint. You need this path to restore 
and dispose savepoints.
+This will trigger a savepoint for the job with ID `jobId`, and returns the 
path of the created savepoint. You need this path to restore and dispose 
savepoints.
 
-You can optionally specify a `savepointDirectory` when triggering the 
savepoint. If you don't specify one here, you need to configure a default 
savepoint directory for the Flink installation (see 
[Savepoints]({{site.baseurl}}/ops/state/savepoints.html#configuration)).
 
-# Cancel with a savepoint
+Furthermore, you can optionally specify a target file system directory to 
store the savepoint in. The directory needs to be accessible by the JobManager.
--- End diff --

what do you by 'specifying the target path'? AFAIU, it has to be a 
directory, because the checkpoint dir name is a random hash.

The wordage of this part is borrowed from [the original 
doc](https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/cli.html#trigger-a-savepoint)


> Support within clause in MatchRecognize
> ---
>
> Key: FLINK-7603
> URL: https://issues.apache.org/jira/browse/FLINK-7603
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API & SQL
>Reporter: Dian Fu
>Assignee: Dian Fu
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4684: [FLINK-7603][savepoint/doc] Document how to take a savepo...

2017-09-20 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/4684
  
cc @zentol 


---


[GitHub] flink pull request #4684: [FLINK-7603][savepoint/doc] Document how to take a...

2017-09-20 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4684#discussion_r140050554
  
--- Diff: docs/ops/cli.md ---
@@ -138,17 +138,30 @@ This allows the job to finish processing all inflight 
data.
 
 [Savepoints]({{site.baseurl}}/ops/state/savepoints.html) are controlled 
via the command line client:
 
- Trigger a savepoint
+ Trigger a Savepoint
 
 {% highlight bash %}
-./bin/flink savepoint  [savepointDirectory]
+./bin/flink savepoint  [savepointDirectory]
 {% endhighlight %}
 
-Returns the path of the created savepoint. You need this path to restore 
and dispose savepoints.
+This will trigger a savepoint for the job with ID `jobId`, and returns the 
path of the created savepoint. You need this path to restore and dispose 
savepoints.
 
-You can optionally specify a `savepointDirectory` when triggering the 
savepoint. If you don't specify one here, you need to configure a default 
savepoint directory for the Flink installation (see 
[Savepoints]({{site.baseurl}}/ops/state/savepoints.html#configuration)).
 
-# Cancel with a savepoint
+Furthermore, you can optionally specify a target file system directory to 
store the savepoint in. The directory needs to be accessible by the JobManager.
--- End diff --

what do you by 'specifying the target path'? AFAIU, it has to be a 
directory, because the checkpoint dir name is a random hash.

The wordage of this part is borrowed from [the original 
doc](https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/cli.html#trigger-a-savepoint)


---


[jira] [Commented] (FLINK-7647) Port JobManagerConfigHandler to new REST endpoint

2017-09-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7647?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16173526#comment-16173526
 ] 

ASF GitHub Bot commented on FLINK-7647:
---

GitHub user tzulitai opened a pull request:

https://github.com/apache/flink/pull/4691

[FLINK-7647] [flip6] Port JobManagerConfigHandler to new REST endpoint

## What is the purpose of the change

This PR ports the existing `JobManagerConfigHandler` to the new REST 
endpoint. This includes introducing the `ClusterConfiguration` response and 
`ClusterConfigurationHeaders`. The `DispatcherRestEndpoint` now registers the 
`JobManagerConfigHandler`.

Additionally, this PR also contains other cosmetic changes, such as 
properly renaming the `JobManagerConfigHandler` to `ClusterConfigHandler`, and 
refactoring common test logic for marshalling / unmarshalling of REST responses.

## Brief change log

- Let `JobManagerConfigHandler` implement `LegacyRestEHandler`
- Register `JobManagerConfigHandler` at `DispatcherRestEndpoint`
- Rename `JobManagerConfigHandler` to `ClusterConfigHandler`
- Introduce `RestResponseMarshallingTestBase`

## Verifying this change

This change is a trivial rework / code cleanup.
Only additional test is `ClusterConfigurationTest` for the marshalling of 
the `ClusterConfiguration`.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: yes

## Documentation

  - Does this pull request introduce a new feature? no
  - If yes, how is the feature documented? not applicable



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tzulitai/flink portJobManagerConfigHandler

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4691.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4691


commit bcfdd1ed3f3b9d4cd29e694c65d30a6711ee9d58
Author: Till Rohrmann 
Date:   2017-08-21T13:11:08Z

[FLINK-7535] Port DashboardConfigHandler to new REST endpoint

Lets DashboardConfigHandler implement the LegacyRestHandler. Moreover, this
commit defines the appropriate DashboardConfigurationHeaders.

The DispatcherRestEndpoint registers the DashboardConfigHandler.

This closes #4604.

commit ade082fbb27419a8e1a18d0086be267a3721e598
Author: Tzu-Li (Gordon) Tai 
Date:   2017-09-20T16:25:28Z

[FLINK-7647] [flip6] Port JobManagerConfigHandler to new REST endpoint

This commit lets the JobManagerConfigHandler implement the
LegacyRestHandler interface in order to be ported to the new REST
endpoint. This includes the introduction of ClusterConfiguration
response body and ClusterConfigurationHeaders.

The DispatcherRestEndpoint now also registers the
JobManagerConfigHandler.

commit 890faba271194a49fe9e5efea81917b236fc3b0a
Author: Tzu-Li (Gordon) Tai 
Date:   2017-09-20T16:51:27Z

[FLINK-7647] [flip6] Rename JobManagerConfigHandler to ClusterConfigHandler

The original naming wouldn't make sense for the FLIP-6 redesign, since
we would have multiple per-job JobManagers for each cluster, which
shares the same configuration.

The REST path is still left untouched and not part of this commit, as
that would involve more changes in flink-runtime-web.

commit 16902cc8775b03f79770a998873dc0786b73264f
Author: Tzu-Li (Gordon) Tai 
Date:   2017-09-20T17:07:11Z

[FLINK-7647] [flip6] Introduce test base for REST response marshalling

Introduces a common test base that for all REST responses, a subclass
should be implemented to verify that the response can be correctly
marshalled and unmarshalled.




> Port JobManagerConfigHandler to new REST endpoint
> -
>
> Key: FLINK-7647
> URL: https://issues.apache.org/jira/browse/FLINK-7647
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> Port existing {{JobManagerConfigHandler}} to new REST endpoint



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4691: [FLINK-7647] [flip6] Port JobManagerConfigHandler ...

2017-09-20 Thread tzulitai
GitHub user tzulitai opened a pull request:

https://github.com/apache/flink/pull/4691

[FLINK-7647] [flip6] Port JobManagerConfigHandler to new REST endpoint

## What is the purpose of the change

This PR ports the existing `JobManagerConfigHandler` to the new REST 
endpoint. This includes introducing the `ClusterConfiguration` response and 
`ClusterConfigurationHeaders`. The `DispatcherRestEndpoint` now registers the 
`JobManagerConfigHandler`.

Additionally, this PR also contains other cosmetic changes, such as 
properly renaming the `JobManagerConfigHandler` to `ClusterConfigHandler`, and 
refactoring common test logic for marshalling / unmarshalling of REST responses.

## Brief change log

- Let `JobManagerConfigHandler` implement `LegacyRestEHandler`
- Register `JobManagerConfigHandler` at `DispatcherRestEndpoint`
- Rename `JobManagerConfigHandler` to `ClusterConfigHandler`
- Introduce `RestResponseMarshallingTestBase`

## Verifying this change

This change is a trivial rework / code cleanup.
Only additional test is `ClusterConfigurationTest` for the marshalling of 
the `ClusterConfiguration`.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: yes

## Documentation

  - Does this pull request introduce a new feature? no
  - If yes, how is the feature documented? not applicable



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tzulitai/flink portJobManagerConfigHandler

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4691.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4691


commit bcfdd1ed3f3b9d4cd29e694c65d30a6711ee9d58
Author: Till Rohrmann 
Date:   2017-08-21T13:11:08Z

[FLINK-7535] Port DashboardConfigHandler to new REST endpoint

Lets DashboardConfigHandler implement the LegacyRestHandler. Moreover, this
commit defines the appropriate DashboardConfigurationHeaders.

The DispatcherRestEndpoint registers the DashboardConfigHandler.

This closes #4604.

commit ade082fbb27419a8e1a18d0086be267a3721e598
Author: Tzu-Li (Gordon) Tai 
Date:   2017-09-20T16:25:28Z

[FLINK-7647] [flip6] Port JobManagerConfigHandler to new REST endpoint

This commit lets the JobManagerConfigHandler implement the
LegacyRestHandler interface in order to be ported to the new REST
endpoint. This includes the introduction of ClusterConfiguration
response body and ClusterConfigurationHeaders.

The DispatcherRestEndpoint now also registers the
JobManagerConfigHandler.

commit 890faba271194a49fe9e5efea81917b236fc3b0a
Author: Tzu-Li (Gordon) Tai 
Date:   2017-09-20T16:51:27Z

[FLINK-7647] [flip6] Rename JobManagerConfigHandler to ClusterConfigHandler

The original naming wouldn't make sense for the FLIP-6 redesign, since
we would have multiple per-job JobManagers for each cluster, which
shares the same configuration.

The REST path is still left untouched and not part of this commit, as
that would involve more changes in flink-runtime-web.

commit 16902cc8775b03f79770a998873dc0786b73264f
Author: Tzu-Li (Gordon) Tai 
Date:   2017-09-20T17:07:11Z

[FLINK-7647] [flip6] Introduce test base for REST response marshalling

Introduces a common test base that for all REST responses, a subclass
should be implemented to verify that the response can be correctly
marshalled and unmarshalled.




---


[jira] [Commented] (FLINK-7657) SQL Timestamps Converted To Wrong Type By Optimizer Causing ClassCastException

2017-09-20 Thread Kent Murra (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16173505#comment-16173505
 ] 

Kent Murra commented on FLINK-7657:
---

Thats great, thanks!

> SQL Timestamps Converted To Wrong Type By Optimizer Causing ClassCastException
> --
>
> Key: FLINK-7657
> URL: https://issues.apache.org/jira/browse/FLINK-7657
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.3.1, 1.3.2
>Reporter: Kent Murra
>Assignee: Kent Murra
>Priority: Critical
>
> I have a SQL statement using the Tables API that has a timestamp in it. When 
> the execution environment tries to optimize the SQL, it causes an exception 
> (attached below).  The result is any SQL query with a timestamp, date, or 
> time literal is unexecutable if any table source is marked with 
> FilterableTableSource. 
> {code:none} 
> Exception in thread "main" java.lang.RuntimeException: Error while applying 
> rule PushFilterIntoTableSourceScanRule, args 
> [rel#30:FlinkLogicalCalc.LOGICAL(input=rel#29:Subset#0.LOGICAL,expr#0..1={inputs},expr#2=2017-05-01,expr#3=>($t1,
>  $t2),data=$t0,last_updated=$t1,$condition=$t3), Scan(table:[test_table], 
> fields:(data, last_updated))]
>   at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:235)
>   at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:650)
>   at 
> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:368)
>   at 
> org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:266)
>   at 
> org.apache.flink.table.api.BatchTableEnvironment.optimize(BatchTableEnvironment.scala:298)
>   at 
> org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:328)
>   at 
> org.apache.flink.table.api.BatchTableEnvironment.writeToSink(BatchTableEnvironment.scala:135)
>   at org.apache.flink.table.api.Table.writeToSink(table.scala:800)
>   at org.apache.flink.table.api.Table.writeToSink(table.scala:773)
>   at 
> com.remitly.flink.TestReproductionApp$.delayedEndpoint$com$remitly$flink$TestReproductionApp$1(TestReproductionApp.scala:27)
>   at 
> com.remitly.flink.TestReproductionApp$delayedInit$body.apply(TestReproductionApp.scala:22)
>   at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
>   at 
> scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
>   at scala.App$$anonfun$main$1.apply(App.scala:76)
>   at scala.App$$anonfun$main$1.apply(App.scala:76)
>   at scala.collection.immutable.List.foreach(List.scala:381)
>   at 
> scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
>   at scala.App$class.main(App.scala:76)
>   at 
> com.remitly.flink.TestReproductionApp$.main(TestReproductionApp.scala:22)
>   at com.remitly.flink.TestReproductionApp.main(TestReproductionApp.scala)
> Caused by: java.lang.ClassCastException: java.util.GregorianCalendar cannot 
> be cast to java.util.Date
>   at 
> org.apache.flink.table.expressions.Literal.dateToCalendar(literals.scala:107)
>   at 
> org.apache.flink.table.expressions.Literal.toRexNode(literals.scala:80)
>   at 
> org.apache.flink.table.expressions.BinaryComparison$$anonfun$toRexNode$1.apply(comparison.scala:35)
>   at 
> org.apache.flink.table.expressions.BinaryComparison$$anonfun$toRexNode$1.apply(comparison.scala:35)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at scala.collection.immutable.List.foreach(List.scala:381)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.immutable.List.map(List.scala:285)
>   at 
> org.apache.flink.table.expressions.BinaryComparison.toRexNode(comparison.scala:35)
>   at 
> org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule$$anonfun$1.apply(PushFilterIntoTableSourceScanRule.scala:92)
>   at 
> org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule$$anonfun$1.apply(PushFilterIntoTableSourceScanRule.scala:92)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> 

[jira] [Commented] (FLINK-7656) Switch to user ClassLoader when invoking initializeOnMaster finalizeOnMaster

2017-09-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7656?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16173502#comment-16173502
 ] 

ASF GitHub Bot commented on FLINK-7656:
---

GitHub user fhueske opened a pull request:

https://github.com/apache/flink/pull/4690

[FLINK-7656] [runtime] Switch to user classloader before calling 
initializeOnMaster and finalizeOnMaster.

## What is the purpose of the change

Fixes a classloading issue for `OutputFormat`s that implement 
`InitializeOnMaster` or `FinalizeOnMaster`. The initialize and finalize methods 
are called without setting the context classloader to the usercode classloader.

## Brief change log

* Set the context classloader to the usercode classloader before calling 
`InitializeOnMaster.initializeGlobal()` and `FinalizeOnMaster.finalizeGlobal()` 
and restore the original classloader afterwards.
* Extend test to check that the correct classloader is set.

## Verifying this change

* Run `JobTaskVertexTest.testOutputFormatVertex()`

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): **no**
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
  - The serializers: **no**
  - The runtime per-record code paths (performance sensitive): **no**
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: **YES**

## Documentation

  - Does this pull request introduce a new feature? **no**
  - If yes, how is the feature documented? **n/a**



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/fhueske/flink outputOnMasterFix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4690.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4690


commit 2dfc8a4550981b9b8c417585dfb5e02b3282c77a
Author: Fabian Hueske 
Date:   2017-09-20T14:26:47Z

[FLINK-7656] [runtime] Switch to user classloader before calling 
initializeOnMaster and finalizeOnMaster.




> Switch to user ClassLoader when invoking initializeOnMaster finalizeOnMaster
> 
>
> Key: FLINK-7656
> URL: https://issues.apache.org/jira/browse/FLINK-7656
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.3.2
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>
> The contract that Flink provides to usercode is that that the usercode 
> classloader is the context classloader whenever usercode is called.
> In {{OutputFormatVertex}} usercode is called in the {{initializeOnMaster()}} 
> and {{finalizeOnMaster()}} methods but the context classloader is not set to 
> the usercode classloader.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4690: [FLINK-7656] [runtime] Switch to user classloader ...

2017-09-20 Thread fhueske
GitHub user fhueske opened a pull request:

https://github.com/apache/flink/pull/4690

[FLINK-7656] [runtime] Switch to user classloader before calling 
initializeOnMaster and finalizeOnMaster.

## What is the purpose of the change

Fixes a classloading issue for `OutputFormat`s that implement 
`InitializeOnMaster` or `FinalizeOnMaster`. The initialize and finalize methods 
are called without setting the context classloader to the usercode classloader.

## Brief change log

* Set the context classloader to the usercode classloader before calling 
`InitializeOnMaster.initializeGlobal()` and `FinalizeOnMaster.finalizeGlobal()` 
and restore the original classloader afterwards.
* Extend test to check that the correct classloader is set.

## Verifying this change

* Run `JobTaskVertexTest.testOutputFormatVertex()`

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): **no**
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
  - The serializers: **no**
  - The runtime per-record code paths (performance sensitive): **no**
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: **YES**

## Documentation

  - Does this pull request introduce a new feature? **no**
  - If yes, how is the feature documented? **n/a**



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/fhueske/flink outputOnMasterFix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4690.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4690


commit 2dfc8a4550981b9b8c417585dfb5e02b3282c77a
Author: Fabian Hueske 
Date:   2017-09-20T14:26:47Z

[FLINK-7656] [runtime] Switch to user classloader before calling 
initializeOnMaster and finalizeOnMaster.




---


[jira] [Assigned] (FLINK-7486) flink-mesos: Support for adding unique attribute / group_by attribute constraints

2017-09-20 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7486?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann reassigned FLINK-7486:


Assignee: Bhumika Bayani

> flink-mesos: Support for adding unique attribute / group_by attribute 
> constraints
> -
>
> Key: FLINK-7486
> URL: https://issues.apache.org/jira/browse/FLINK-7486
> Project: Flink
>  Issue Type: Improvement
>  Components: Mesos
>Affects Versions: 1.3.2
>Reporter: Bhumika Bayani
>Assignee: Bhumika Bayani
>
> In our setup, we have multiple mesos-workers. Inspite of this, flink 
> application master most of the times ends up spawning all task-managers on 
> same mesos-worker.
> We intend to ensure HA of task managers. We would like to make sure each 
> task-manager is running on different mesos-worker as well as such 
> mesos-worker which does not share the AZ attribute with earlier task manager 
> instances.
> Netflix-fenzo supports adding UniqueHostAttribute and BalancedHostAttribute 
> contraints. Flink-mesos should also enable us to add these kind of 
> constraints.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16173486#comment-16173486
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r140009815
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/CollectAggFunction.scala
 ---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.aggfunctions
+
+import java.lang.{Iterable => JIterable}
+import java.util
+import java.util.function.BiFunction
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple1 => JTuple1}
+import org.apache.flink.api.java.typeutils.{GenericTypeInfo, TupleTypeInfo}
+import org.apache.flink.table.functions.AggregateFunction
+
+import scala.collection.JavaConverters._
+
+/** The initial accumulator for Collect aggregate function */
+class CollectAccumulator[E] extends JTuple1[util.Map[E, Integer]]
--- End diff --

We can use a `MapView` here. This feature was recently added and 
automatically backs the Map with a MapState if possible. Otherwise, it uses a 
Java HashMap (as right now). The benefit of backing the accumulator by MapState 
is that only the keys and values that are accessed need to be deserialized. In 
contrast, a regular HashMap is completely de/serialized every time the 
accumulator is read. Using MapView would require that the accumulator is 
implemented as a POJO (instead of a Tuple1). 

Check this class for details 
[MapView](https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/MapView.scala)
 and let me know if you have questions.


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16173484#comment-16173484
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r140022994
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/CollectAggFunction.scala
 ---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.aggfunctions
+
+import java.lang.{Iterable => JIterable}
+import java.util
+import java.util.function.BiFunction
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple1 => JTuple1}
+import org.apache.flink.api.java.typeutils.{GenericTypeInfo, TupleTypeInfo}
+import org.apache.flink.table.functions.AggregateFunction
+
+import scala.collection.JavaConverters._
+
+/** The initial accumulator for Collect aggregate function */
+class CollectAccumulator[E] extends JTuple1[util.Map[E, Integer]]
+
+abstract class CollectAggFunction[E]
+  extends AggregateFunction[util.Map[E, Integer], CollectAccumulator[E]] {
+
+  @transient
+  private lazy val addFunction = new BiFunction[Integer, Integer, Integer] 
{
+override def apply(t: Integer, u: Integer): Integer = t + u
+  }
+
+  override def createAccumulator(): CollectAccumulator[E] = {
+val acc = new CollectAccumulator[E]()
+acc.f0 = new util.HashMap[E, Integer]()
+acc
+  }
+
+  def accumulate(accumulator: CollectAccumulator[E], value: E): Unit = {
+if (value != null) {
+  if (accumulator.f0.containsKey(value)) {
+val add = (x: Integer, y: Integer) => x + y
--- End diff --

`add` is not used, right?


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16173485#comment-16173485
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r140026944
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/AggregateITCase.scala
 ---
@@ -329,6 +329,35 @@ class AggregateITCase(
   }
 
   @Test
+  def testTumbleWindowAggregateWithCollect(): Unit = {
+
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+val sqlQuery =
+  "SELECT b, COLLECT(b)" +
--- End diff --

Collect should be added to the [SQL 
documentation](https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/table/sql.html#built-in-functions)
 under "Built-in Function" -> "Aggregate Functions"

Moreover, we should add `MULTISET` to the [supported data 
types](https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/table/sql.html#data-types).

It would also be nice if you could open a JIRA to add support for COLLECT 
to the Table API. We try to keep both in sync and it helps if we have a list of 
things that need to be added.


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16173483#comment-16173483
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r140025368
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/MultisetRelDataType.scala
 ---
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.schema
+
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.sql.`type`.MultisetSqlType
+import org.apache.flink.api.common.typeinfo.TypeInformation
+
+class MultisetRelDataType(
+val typeInfo: TypeInformation[_],
+elementType: RelDataType,
+isNullable: Boolean)
+  extends MultisetSqlType(
+elementType,
+isNullable) {
+
+  override def toString = s"MULTISET($typeInfo)"
--- End diff --

should be rather `s"MULTISET($elementType)"`. `TypeInformation` is a Flink 
concept whereas RelDataType is in the Calcite context.


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16173487#comment-16173487
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r140024187
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/CollectAggFunction.scala
 ---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.aggfunctions
+
+import java.lang.{Iterable => JIterable}
+import java.util
+import java.util.function.BiFunction
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple1 => JTuple1}
+import org.apache.flink.api.java.typeutils.{GenericTypeInfo, TupleTypeInfo}
+import org.apache.flink.table.functions.AggregateFunction
+
+import scala.collection.JavaConverters._
+
+/** The initial accumulator for Collect aggregate function */
+class CollectAccumulator[E] extends JTuple1[util.Map[E, Integer]]
+
+abstract class CollectAggFunction[E]
+  extends AggregateFunction[util.Map[E, Integer], CollectAccumulator[E]] {
+
+  @transient
+  private lazy val addFunction = new BiFunction[Integer, Integer, Integer] 
{
+override def apply(t: Integer, u: Integer): Integer = t + u
+  }
+
+  override def createAccumulator(): CollectAccumulator[E] = {
+val acc = new CollectAccumulator[E]()
+acc.f0 = new util.HashMap[E, Integer]()
+acc
+  }
+
+  def accumulate(accumulator: CollectAccumulator[E], value: E): Unit = {
+if (value != null) {
+  if (accumulator.f0.containsKey(value)) {
+val add = (x: Integer, y: Integer) => x + y
+accumulator.f0.merge(value, 1, addFunction)
+  } else {
+accumulator.f0.put(value, 1)
+  }
+}
+  }
+
+  override def getValue(accumulator: CollectAccumulator[E]): util.Map[E, 
Integer] = {
+if (accumulator.f0.size() > 0) {
+  new util.HashMap(accumulator.f0)
+} else {
+  null.asInstanceOf[util.Map[E, Integer]]
+}
+  }
+
+  def resetAccumulator(acc: CollectAccumulator[E]): Unit = {
+acc.f0.clear()
+  }
+
+  override def getAccumulatorType: TypeInformation[CollectAccumulator[E]] 
= {
+new TupleTypeInfo(
+  classOf[CollectAccumulator[E]],
+  new GenericTypeInfo[util.Map[E, Integer]](classOf[util.Map[E, 
Integer]]))
--- End diff --

Don't use a generic type here. This will result in a KryoSerializer which 
can be quite inefficient and result in state that cannot be upgraded. Rather 
use `MapTypeInformation`.


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

2017-09-20 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r140024187
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/CollectAggFunction.scala
 ---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.aggfunctions
+
+import java.lang.{Iterable => JIterable}
+import java.util
+import java.util.function.BiFunction
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple1 => JTuple1}
+import org.apache.flink.api.java.typeutils.{GenericTypeInfo, TupleTypeInfo}
+import org.apache.flink.table.functions.AggregateFunction
+
+import scala.collection.JavaConverters._
+
+/** The initial accumulator for Collect aggregate function */
+class CollectAccumulator[E] extends JTuple1[util.Map[E, Integer]]
+
+abstract class CollectAggFunction[E]
+  extends AggregateFunction[util.Map[E, Integer], CollectAccumulator[E]] {
+
+  @transient
+  private lazy val addFunction = new BiFunction[Integer, Integer, Integer] 
{
+override def apply(t: Integer, u: Integer): Integer = t + u
+  }
+
+  override def createAccumulator(): CollectAccumulator[E] = {
+val acc = new CollectAccumulator[E]()
+acc.f0 = new util.HashMap[E, Integer]()
+acc
+  }
+
+  def accumulate(accumulator: CollectAccumulator[E], value: E): Unit = {
+if (value != null) {
+  if (accumulator.f0.containsKey(value)) {
+val add = (x: Integer, y: Integer) => x + y
+accumulator.f0.merge(value, 1, addFunction)
+  } else {
+accumulator.f0.put(value, 1)
+  }
+}
+  }
+
+  override def getValue(accumulator: CollectAccumulator[E]): util.Map[E, 
Integer] = {
+if (accumulator.f0.size() > 0) {
+  new util.HashMap(accumulator.f0)
+} else {
+  null.asInstanceOf[util.Map[E, Integer]]
+}
+  }
+
+  def resetAccumulator(acc: CollectAccumulator[E]): Unit = {
+acc.f0.clear()
+  }
+
+  override def getAccumulatorType: TypeInformation[CollectAccumulator[E]] 
= {
+new TupleTypeInfo(
+  classOf[CollectAccumulator[E]],
+  new GenericTypeInfo[util.Map[E, Integer]](classOf[util.Map[E, 
Integer]]))
--- End diff --

Don't use a generic type here. This will result in a KryoSerializer which 
can be quite inefficient and result in state that cannot be upgraded. Rather 
use `MapTypeInformation`.


---


[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

2017-09-20 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r140022994
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/CollectAggFunction.scala
 ---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.aggfunctions
+
+import java.lang.{Iterable => JIterable}
+import java.util
+import java.util.function.BiFunction
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple1 => JTuple1}
+import org.apache.flink.api.java.typeutils.{GenericTypeInfo, TupleTypeInfo}
+import org.apache.flink.table.functions.AggregateFunction
+
+import scala.collection.JavaConverters._
+
+/** The initial accumulator for Collect aggregate function */
+class CollectAccumulator[E] extends JTuple1[util.Map[E, Integer]]
+
+abstract class CollectAggFunction[E]
+  extends AggregateFunction[util.Map[E, Integer], CollectAccumulator[E]] {
+
+  @transient
+  private lazy val addFunction = new BiFunction[Integer, Integer, Integer] 
{
+override def apply(t: Integer, u: Integer): Integer = t + u
+  }
+
+  override def createAccumulator(): CollectAccumulator[E] = {
+val acc = new CollectAccumulator[E]()
+acc.f0 = new util.HashMap[E, Integer]()
+acc
+  }
+
+  def accumulate(accumulator: CollectAccumulator[E], value: E): Unit = {
+if (value != null) {
+  if (accumulator.f0.containsKey(value)) {
+val add = (x: Integer, y: Integer) => x + y
--- End diff --

`add` is not used, right?


---


[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

2017-09-20 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r140026944
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/AggregateITCase.scala
 ---
@@ -329,6 +329,35 @@ class AggregateITCase(
   }
 
   @Test
+  def testTumbleWindowAggregateWithCollect(): Unit = {
+
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+val sqlQuery =
+  "SELECT b, COLLECT(b)" +
--- End diff --

Collect should be added to the [SQL 
documentation](https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/table/sql.html#built-in-functions)
 under "Built-in Function" -> "Aggregate Functions"

Moreover, we should add `MULTISET` to the [supported data 
types](https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/table/sql.html#data-types).

It would also be nice if you could open a JIRA to add support for COLLECT 
to the Table API. We try to keep both in sync and it helps if we have a list of 
things that need to be added.


---


[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

2017-09-20 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r140009815
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/CollectAggFunction.scala
 ---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.aggfunctions
+
+import java.lang.{Iterable => JIterable}
+import java.util
+import java.util.function.BiFunction
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple1 => JTuple1}
+import org.apache.flink.api.java.typeutils.{GenericTypeInfo, TupleTypeInfo}
+import org.apache.flink.table.functions.AggregateFunction
+
+import scala.collection.JavaConverters._
+
+/** The initial accumulator for Collect aggregate function */
+class CollectAccumulator[E] extends JTuple1[util.Map[E, Integer]]
--- End diff --

We can use a `MapView` here. This feature was recently added and 
automatically backs the Map with a MapState if possible. Otherwise, it uses a 
Java HashMap (as right now). The benefit of backing the accumulator by MapState 
is that only the keys and values that are accessed need to be deserialized. In 
contrast, a regular HashMap is completely de/serialized every time the 
accumulator is read. Using MapView would require that the accumulator is 
implemented as a POJO (instead of a Tuple1). 

Check this class for details 
[MapView](https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/MapView.scala)
 and let me know if you have questions.


---


[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

2017-09-20 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r140025368
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/MultisetRelDataType.scala
 ---
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.schema
+
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.sql.`type`.MultisetSqlType
+import org.apache.flink.api.common.typeinfo.TypeInformation
+
+class MultisetRelDataType(
+val typeInfo: TypeInformation[_],
+elementType: RelDataType,
+isNullable: Boolean)
+  extends MultisetSqlType(
+elementType,
+isNullable) {
+
+  override def toString = s"MULTISET($typeInfo)"
--- End diff --

should be rather `s"MULTISET($elementType)"`. `TypeInformation` is a Flink 
concept whereas RelDataType is in the Calcite context.


---


[jira] [Commented] (FLINK-7657) SQL Timestamps Converted To Wrong Type By Optimizer Causing ClassCastException

2017-09-20 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16173442#comment-16173442
 ] 

Fabian Hueske commented on FLINK-7657:
--

Thanks for reporting this bug and investigating the cause [~kmurra]!
Looking forward to your bugfix.

I've given you contributor permissions and assigned the issue to you. 
Hope that's OK.

Best, Fabian

> SQL Timestamps Converted To Wrong Type By Optimizer Causing ClassCastException
> --
>
> Key: FLINK-7657
> URL: https://issues.apache.org/jira/browse/FLINK-7657
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.3.1, 1.3.2
>Reporter: Kent Murra
>Priority: Critical
>
> I have a SQL statement using the Tables API that has a timestamp in it. When 
> the execution environment tries to optimize the SQL, it causes an exception 
> (attached below).  The result is any SQL query with a timestamp, date, or 
> time literal is unexecutable if any table source is marked with 
> FilterableTableSource. 
> {code:none} 
> Exception in thread "main" java.lang.RuntimeException: Error while applying 
> rule PushFilterIntoTableSourceScanRule, args 
> [rel#30:FlinkLogicalCalc.LOGICAL(input=rel#29:Subset#0.LOGICAL,expr#0..1={inputs},expr#2=2017-05-01,expr#3=>($t1,
>  $t2),data=$t0,last_updated=$t1,$condition=$t3), Scan(table:[test_table], 
> fields:(data, last_updated))]
>   at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:235)
>   at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:650)
>   at 
> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:368)
>   at 
> org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:266)
>   at 
> org.apache.flink.table.api.BatchTableEnvironment.optimize(BatchTableEnvironment.scala:298)
>   at 
> org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:328)
>   at 
> org.apache.flink.table.api.BatchTableEnvironment.writeToSink(BatchTableEnvironment.scala:135)
>   at org.apache.flink.table.api.Table.writeToSink(table.scala:800)
>   at org.apache.flink.table.api.Table.writeToSink(table.scala:773)
>   at 
> com.remitly.flink.TestReproductionApp$.delayedEndpoint$com$remitly$flink$TestReproductionApp$1(TestReproductionApp.scala:27)
>   at 
> com.remitly.flink.TestReproductionApp$delayedInit$body.apply(TestReproductionApp.scala:22)
>   at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
>   at 
> scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
>   at scala.App$$anonfun$main$1.apply(App.scala:76)
>   at scala.App$$anonfun$main$1.apply(App.scala:76)
>   at scala.collection.immutable.List.foreach(List.scala:381)
>   at 
> scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
>   at scala.App$class.main(App.scala:76)
>   at 
> com.remitly.flink.TestReproductionApp$.main(TestReproductionApp.scala:22)
>   at com.remitly.flink.TestReproductionApp.main(TestReproductionApp.scala)
> Caused by: java.lang.ClassCastException: java.util.GregorianCalendar cannot 
> be cast to java.util.Date
>   at 
> org.apache.flink.table.expressions.Literal.dateToCalendar(literals.scala:107)
>   at 
> org.apache.flink.table.expressions.Literal.toRexNode(literals.scala:80)
>   at 
> org.apache.flink.table.expressions.BinaryComparison$$anonfun$toRexNode$1.apply(comparison.scala:35)
>   at 
> org.apache.flink.table.expressions.BinaryComparison$$anonfun$toRexNode$1.apply(comparison.scala:35)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at scala.collection.immutable.List.foreach(List.scala:381)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.immutable.List.map(List.scala:285)
>   at 
> org.apache.flink.table.expressions.BinaryComparison.toRexNode(comparison.scala:35)
>   at 
> org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule$$anonfun$1.apply(PushFilterIntoTableSourceScanRule.scala:92)
>   at 
> org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule$$anonfun$1.apply(PushFilterIntoTableSourceScanRule.scala:92)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>   at 

[jira] [Assigned] (FLINK-7657) SQL Timestamps Converted To Wrong Type By Optimizer Causing ClassCastException

2017-09-20 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7657?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske reassigned FLINK-7657:


Assignee: Kent Murra

> SQL Timestamps Converted To Wrong Type By Optimizer Causing ClassCastException
> --
>
> Key: FLINK-7657
> URL: https://issues.apache.org/jira/browse/FLINK-7657
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.3.1, 1.3.2
>Reporter: Kent Murra
>Assignee: Kent Murra
>Priority: Critical
>
> I have a SQL statement using the Tables API that has a timestamp in it. When 
> the execution environment tries to optimize the SQL, it causes an exception 
> (attached below).  The result is any SQL query with a timestamp, date, or 
> time literal is unexecutable if any table source is marked with 
> FilterableTableSource. 
> {code:none} 
> Exception in thread "main" java.lang.RuntimeException: Error while applying 
> rule PushFilterIntoTableSourceScanRule, args 
> [rel#30:FlinkLogicalCalc.LOGICAL(input=rel#29:Subset#0.LOGICAL,expr#0..1={inputs},expr#2=2017-05-01,expr#3=>($t1,
>  $t2),data=$t0,last_updated=$t1,$condition=$t3), Scan(table:[test_table], 
> fields:(data, last_updated))]
>   at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:235)
>   at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:650)
>   at 
> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:368)
>   at 
> org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:266)
>   at 
> org.apache.flink.table.api.BatchTableEnvironment.optimize(BatchTableEnvironment.scala:298)
>   at 
> org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:328)
>   at 
> org.apache.flink.table.api.BatchTableEnvironment.writeToSink(BatchTableEnvironment.scala:135)
>   at org.apache.flink.table.api.Table.writeToSink(table.scala:800)
>   at org.apache.flink.table.api.Table.writeToSink(table.scala:773)
>   at 
> com.remitly.flink.TestReproductionApp$.delayedEndpoint$com$remitly$flink$TestReproductionApp$1(TestReproductionApp.scala:27)
>   at 
> com.remitly.flink.TestReproductionApp$delayedInit$body.apply(TestReproductionApp.scala:22)
>   at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
>   at 
> scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
>   at scala.App$$anonfun$main$1.apply(App.scala:76)
>   at scala.App$$anonfun$main$1.apply(App.scala:76)
>   at scala.collection.immutable.List.foreach(List.scala:381)
>   at 
> scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
>   at scala.App$class.main(App.scala:76)
>   at 
> com.remitly.flink.TestReproductionApp$.main(TestReproductionApp.scala:22)
>   at com.remitly.flink.TestReproductionApp.main(TestReproductionApp.scala)
> Caused by: java.lang.ClassCastException: java.util.GregorianCalendar cannot 
> be cast to java.util.Date
>   at 
> org.apache.flink.table.expressions.Literal.dateToCalendar(literals.scala:107)
>   at 
> org.apache.flink.table.expressions.Literal.toRexNode(literals.scala:80)
>   at 
> org.apache.flink.table.expressions.BinaryComparison$$anonfun$toRexNode$1.apply(comparison.scala:35)
>   at 
> org.apache.flink.table.expressions.BinaryComparison$$anonfun$toRexNode$1.apply(comparison.scala:35)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at scala.collection.immutable.List.foreach(List.scala:381)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.immutable.List.map(List.scala:285)
>   at 
> org.apache.flink.table.expressions.BinaryComparison.toRexNode(comparison.scala:35)
>   at 
> org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule$$anonfun$1.apply(PushFilterIntoTableSourceScanRule.scala:92)
>   at 
> org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule$$anonfun$1.apply(PushFilterIntoTableSourceScanRule.scala:92)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at 

[jira] [Updated] (FLINK-7657) SQL Timestamps Converted To Wrong Type By Optimizer Causing ClassCastException

2017-09-20 Thread Kent Murra (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7657?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kent Murra updated FLINK-7657:
--
Affects Version/s: 1.3.1

> SQL Timestamps Converted To Wrong Type By Optimizer Causing ClassCastException
> --
>
> Key: FLINK-7657
> URL: https://issues.apache.org/jira/browse/FLINK-7657
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.3.1, 1.3.2
>Reporter: Kent Murra
>Priority: Critical
>
> I have a SQL statement using the Tables API that has a timestamp in it. When 
> the execution environment tries to optimize the SQL, it causes an exception 
> (attached below).  The result is any SQL query with a timestamp, date, or 
> time literal is unexecutable if any table source is marked with 
> FilterableTableSource. 
> {code:none} 
> Exception in thread "main" java.lang.RuntimeException: Error while applying 
> rule PushFilterIntoTableSourceScanRule, args 
> [rel#30:FlinkLogicalCalc.LOGICAL(input=rel#29:Subset#0.LOGICAL,expr#0..1={inputs},expr#2=2017-05-01,expr#3=>($t1,
>  $t2),data=$t0,last_updated=$t1,$condition=$t3), Scan(table:[test_table], 
> fields:(data, last_updated))]
>   at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:235)
>   at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:650)
>   at 
> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:368)
>   at 
> org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:266)
>   at 
> org.apache.flink.table.api.BatchTableEnvironment.optimize(BatchTableEnvironment.scala:298)
>   at 
> org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:328)
>   at 
> org.apache.flink.table.api.BatchTableEnvironment.writeToSink(BatchTableEnvironment.scala:135)
>   at org.apache.flink.table.api.Table.writeToSink(table.scala:800)
>   at org.apache.flink.table.api.Table.writeToSink(table.scala:773)
>   at 
> com.remitly.flink.TestReproductionApp$.delayedEndpoint$com$remitly$flink$TestReproductionApp$1(TestReproductionApp.scala:27)
>   at 
> com.remitly.flink.TestReproductionApp$delayedInit$body.apply(TestReproductionApp.scala:22)
>   at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
>   at 
> scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
>   at scala.App$$anonfun$main$1.apply(App.scala:76)
>   at scala.App$$anonfun$main$1.apply(App.scala:76)
>   at scala.collection.immutable.List.foreach(List.scala:381)
>   at 
> scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
>   at scala.App$class.main(App.scala:76)
>   at 
> com.remitly.flink.TestReproductionApp$.main(TestReproductionApp.scala:22)
>   at com.remitly.flink.TestReproductionApp.main(TestReproductionApp.scala)
> Caused by: java.lang.ClassCastException: java.util.GregorianCalendar cannot 
> be cast to java.util.Date
>   at 
> org.apache.flink.table.expressions.Literal.dateToCalendar(literals.scala:107)
>   at 
> org.apache.flink.table.expressions.Literal.toRexNode(literals.scala:80)
>   at 
> org.apache.flink.table.expressions.BinaryComparison$$anonfun$toRexNode$1.apply(comparison.scala:35)
>   at 
> org.apache.flink.table.expressions.BinaryComparison$$anonfun$toRexNode$1.apply(comparison.scala:35)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at scala.collection.immutable.List.foreach(List.scala:381)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.immutable.List.map(List.scala:285)
>   at 
> org.apache.flink.table.expressions.BinaryComparison.toRexNode(comparison.scala:35)
>   at 
> org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule$$anonfun$1.apply(PushFilterIntoTableSourceScanRule.scala:92)
>   at 
> org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule$$anonfun$1.apply(PushFilterIntoTableSourceScanRule.scala:92)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at 

[jira] [Updated] (FLINK-7657) SQL Timestamps Converted To Wrong Type By Optimizer Causing ClassCastException

2017-09-20 Thread Kent Murra (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7657?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kent Murra updated FLINK-7657:
--
Affects Version/s: 1.3.2

> SQL Timestamps Converted To Wrong Type By Optimizer Causing ClassCastException
> --
>
> Key: FLINK-7657
> URL: https://issues.apache.org/jira/browse/FLINK-7657
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.3.2
>Reporter: Kent Murra
>Priority: Critical
>
> I have a SQL statement using the Tables API that has a timestamp in it. When 
> the execution environment tries to optimize the SQL, it causes an exception 
> (attached below).  The result is any SQL query with a timestamp, date, or 
> time literal is unexecutable if any table source is marked with 
> FilterableTableSource. 
> {code:none} 
> Exception in thread "main" java.lang.RuntimeException: Error while applying 
> rule PushFilterIntoTableSourceScanRule, args 
> [rel#30:FlinkLogicalCalc.LOGICAL(input=rel#29:Subset#0.LOGICAL,expr#0..1={inputs},expr#2=2017-05-01,expr#3=>($t1,
>  $t2),data=$t0,last_updated=$t1,$condition=$t3), Scan(table:[test_table], 
> fields:(data, last_updated))]
>   at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:235)
>   at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:650)
>   at 
> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:368)
>   at 
> org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:266)
>   at 
> org.apache.flink.table.api.BatchTableEnvironment.optimize(BatchTableEnvironment.scala:298)
>   at 
> org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:328)
>   at 
> org.apache.flink.table.api.BatchTableEnvironment.writeToSink(BatchTableEnvironment.scala:135)
>   at org.apache.flink.table.api.Table.writeToSink(table.scala:800)
>   at org.apache.flink.table.api.Table.writeToSink(table.scala:773)
>   at 
> com.remitly.flink.TestReproductionApp$.delayedEndpoint$com$remitly$flink$TestReproductionApp$1(TestReproductionApp.scala:27)
>   at 
> com.remitly.flink.TestReproductionApp$delayedInit$body.apply(TestReproductionApp.scala:22)
>   at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
>   at 
> scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
>   at scala.App$$anonfun$main$1.apply(App.scala:76)
>   at scala.App$$anonfun$main$1.apply(App.scala:76)
>   at scala.collection.immutable.List.foreach(List.scala:381)
>   at 
> scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
>   at scala.App$class.main(App.scala:76)
>   at 
> com.remitly.flink.TestReproductionApp$.main(TestReproductionApp.scala:22)
>   at com.remitly.flink.TestReproductionApp.main(TestReproductionApp.scala)
> Caused by: java.lang.ClassCastException: java.util.GregorianCalendar cannot 
> be cast to java.util.Date
>   at 
> org.apache.flink.table.expressions.Literal.dateToCalendar(literals.scala:107)
>   at 
> org.apache.flink.table.expressions.Literal.toRexNode(literals.scala:80)
>   at 
> org.apache.flink.table.expressions.BinaryComparison$$anonfun$toRexNode$1.apply(comparison.scala:35)
>   at 
> org.apache.flink.table.expressions.BinaryComparison$$anonfun$toRexNode$1.apply(comparison.scala:35)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at scala.collection.immutable.List.foreach(List.scala:381)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.immutable.List.map(List.scala:285)
>   at 
> org.apache.flink.table.expressions.BinaryComparison.toRexNode(comparison.scala:35)
>   at 
> org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule$$anonfun$1.apply(PushFilterIntoTableSourceScanRule.scala:92)
>   at 
> org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule$$anonfun$1.apply(PushFilterIntoTableSourceScanRule.scala:92)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at 

[jira] [Comment Edited] (FLINK-7657) SQL Timestamps Converted To Wrong Type By Optimizer Causing ClassCastException

2017-09-20 Thread Kent Murra (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16173419#comment-16173419
 ] 

Kent Murra edited comment on FLINK-7657 at 9/20/17 4:12 PM:


I'm looking at taking my bugfix and formatting it as a code submission, but 
need some time to get it aligned with standards and write some reproduction 
test cases.

Also: I marked this as Critical per the linked guidelines as this causes an 
exception (and therefore crash) in well-formed Flink Table programs.


was (Author: kmurra):
I'm looking at taking my bugfix and formatting it as a code submission, but 
need some time to get it aligned with standards and write some reproduction 
test cases.

> SQL Timestamps Converted To Wrong Type By Optimizer Causing ClassCastException
> --
>
> Key: FLINK-7657
> URL: https://issues.apache.org/jira/browse/FLINK-7657
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.3.2
>Reporter: Kent Murra
>Priority: Critical
>
> I have a SQL statement using the Tables API that has a timestamp in it. When 
> the execution environment tries to optimize the SQL, it causes an exception 
> (attached below).  The result is any SQL query with a timestamp, date, or 
> time literal is unexecutable if any table source is marked with 
> FilterableTableSource. 
> {code:none} 
> Exception in thread "main" java.lang.RuntimeException: Error while applying 
> rule PushFilterIntoTableSourceScanRule, args 
> [rel#30:FlinkLogicalCalc.LOGICAL(input=rel#29:Subset#0.LOGICAL,expr#0..1={inputs},expr#2=2017-05-01,expr#3=>($t1,
>  $t2),data=$t0,last_updated=$t1,$condition=$t3), Scan(table:[test_table], 
> fields:(data, last_updated))]
>   at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:235)
>   at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:650)
>   at 
> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:368)
>   at 
> org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:266)
>   at 
> org.apache.flink.table.api.BatchTableEnvironment.optimize(BatchTableEnvironment.scala:298)
>   at 
> org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:328)
>   at 
> org.apache.flink.table.api.BatchTableEnvironment.writeToSink(BatchTableEnvironment.scala:135)
>   at org.apache.flink.table.api.Table.writeToSink(table.scala:800)
>   at org.apache.flink.table.api.Table.writeToSink(table.scala:773)
>   at 
> com.remitly.flink.TestReproductionApp$.delayedEndpoint$com$remitly$flink$TestReproductionApp$1(TestReproductionApp.scala:27)
>   at 
> com.remitly.flink.TestReproductionApp$delayedInit$body.apply(TestReproductionApp.scala:22)
>   at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
>   at 
> scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
>   at scala.App$$anonfun$main$1.apply(App.scala:76)
>   at scala.App$$anonfun$main$1.apply(App.scala:76)
>   at scala.collection.immutable.List.foreach(List.scala:381)
>   at 
> scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
>   at scala.App$class.main(App.scala:76)
>   at 
> com.remitly.flink.TestReproductionApp$.main(TestReproductionApp.scala:22)
>   at com.remitly.flink.TestReproductionApp.main(TestReproductionApp.scala)
> Caused by: java.lang.ClassCastException: java.util.GregorianCalendar cannot 
> be cast to java.util.Date
>   at 
> org.apache.flink.table.expressions.Literal.dateToCalendar(literals.scala:107)
>   at 
> org.apache.flink.table.expressions.Literal.toRexNode(literals.scala:80)
>   at 
> org.apache.flink.table.expressions.BinaryComparison$$anonfun$toRexNode$1.apply(comparison.scala:35)
>   at 
> org.apache.flink.table.expressions.BinaryComparison$$anonfun$toRexNode$1.apply(comparison.scala:35)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at scala.collection.immutable.List.foreach(List.scala:381)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.immutable.List.map(List.scala:285)
>   at 
> org.apache.flink.table.expressions.BinaryComparison.toRexNode(comparison.scala:35)
>   at 
> org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule$$anonfun$1.apply(PushFilterIntoTableSourceScanRule.scala:92)
>   at 
> 

[jira] [Updated] (FLINK-7657) SQL Timestamps Converted To Wrong Type By Optimizer Causing ClassCastException

2017-09-20 Thread Kent Murra (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7657?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kent Murra updated FLINK-7657:
--
Description: 
I have a SQL statement using the Tables API that has a timestamp in it. When 
the execution environment tries to optimize the SQL, it causes an exception 
(attached below).  The result is any SQL query with a timestamp, date, or time 
literal is unexecutable if any table source is marked with 
FilterableTableSource. 

{code:none} 
Exception in thread "main" java.lang.RuntimeException: Error while applying 
rule PushFilterIntoTableSourceScanRule, args 
[rel#30:FlinkLogicalCalc.LOGICAL(input=rel#29:Subset#0.LOGICAL,expr#0..1={inputs},expr#2=2017-05-01,expr#3=>($t1,
 $t2),data=$t0,last_updated=$t1,$condition=$t3), Scan(table:[test_table], 
fields:(data, last_updated))]
at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:235)
at 
org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:650)
at 
org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:368)
at 
org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:266)
at 
org.apache.flink.table.api.BatchTableEnvironment.optimize(BatchTableEnvironment.scala:298)
at 
org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:328)
at 
org.apache.flink.table.api.BatchTableEnvironment.writeToSink(BatchTableEnvironment.scala:135)
at org.apache.flink.table.api.Table.writeToSink(table.scala:800)
at org.apache.flink.table.api.Table.writeToSink(table.scala:773)
at 
com.remitly.flink.TestReproductionApp$.delayedEndpoint$com$remitly$flink$TestReproductionApp$1(TestReproductionApp.scala:27)
at 
com.remitly.flink.TestReproductionApp$delayedInit$body.apply(TestReproductionApp.scala:22)
at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
at 
scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.collection.immutable.List.foreach(List.scala:381)
at 
scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.App$class.main(App.scala:76)
at 
com.remitly.flink.TestReproductionApp$.main(TestReproductionApp.scala:22)
at com.remitly.flink.TestReproductionApp.main(TestReproductionApp.scala)
Caused by: java.lang.ClassCastException: java.util.GregorianCalendar cannot be 
cast to java.util.Date
at 
org.apache.flink.table.expressions.Literal.dateToCalendar(literals.scala:107)
at 
org.apache.flink.table.expressions.Literal.toRexNode(literals.scala:80)
at 
org.apache.flink.table.expressions.BinaryComparison$$anonfun$toRexNode$1.apply(comparison.scala:35)
at 
org.apache.flink.table.expressions.BinaryComparison$$anonfun$toRexNode$1.apply(comparison.scala:35)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:285)
at 
org.apache.flink.table.expressions.BinaryComparison.toRexNode(comparison.scala:35)
at 
org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule$$anonfun$1.apply(PushFilterIntoTableSourceScanRule.scala:92)
at 
org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule$$anonfun$1.apply(PushFilterIntoTableSourceScanRule.scala:92)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at 
org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule.pushFilterIntoScan(PushFilterIntoTableSourceScanRule.scala:92)
at 
org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule.onMatch(PushFilterIntoTableSourceScanRule.scala:56)
at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:211)
... 19 more
{code}

I've done quite a bit of debugging on this and 

[jira] [Commented] (FLINK-7657) SQL Timestamps Converted To Wrong Type By Optimizer Causing ClassCastException

2017-09-20 Thread Kent Murra (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16173419#comment-16173419
 ] 

Kent Murra commented on FLINK-7657:
---

I'm looking at taking my bugfix and formatting it as a code submission, but 
need some time to get it aligned with standards and write some reproduction 
test cases.

> SQL Timestamps Converted To Wrong Type By Optimizer Causing ClassCastException
> --
>
> Key: FLINK-7657
> URL: https://issues.apache.org/jira/browse/FLINK-7657
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Kent Murra
>Priority: Critical
>
> I have a SQL statement using the Tables API that has a timestamp in it. When 
> the execution environment tries to optimize the SQL, it causes an exception 
> (attached below).  The result is any SQL query with a timestamp, date, or 
> time literal is unexecutable if any table source is marked with 
> FilterableTableSource. 
> {code:none} 
> Exception in thread "main" java.lang.RuntimeException: Error while applying 
> rule PushFilterIntoTableSourceScanRule, args 
> [rel#30:FlinkLogicalCalc.LOGICAL(input=rel#29:Subset#0.LOGICAL,expr#0..1={inputs},expr#2=2017-05-01,expr#3=>($t1,
>  $t2),data=$t0,last_updated=$t1,$condition=$t3), Scan(table:[test_table], 
> fields:(data, last_updated))]
>   at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:235)
>   at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:650)
>   at 
> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:368)
>   at 
> org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:266)
>   at 
> org.apache.flink.table.api.BatchTableEnvironment.optimize(BatchTableEnvironment.scala:298)
>   at 
> org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:328)
>   at 
> org.apache.flink.table.api.BatchTableEnvironment.writeToSink(BatchTableEnvironment.scala:135)
>   at org.apache.flink.table.api.Table.writeToSink(table.scala:800)
>   at org.apache.flink.table.api.Table.writeToSink(table.scala:773)
>   at 
> com.remitly.flink.TestReproductionApp$.delayedEndpoint$com$remitly$flink$TestReproductionApp$1(TestReproductionApp.scala:27)
>   at 
> com.remitly.flink.TestReproductionApp$delayedInit$body.apply(TestReproductionApp.scala:22)
>   at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
>   at 
> scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
>   at scala.App$$anonfun$main$1.apply(App.scala:76)
>   at scala.App$$anonfun$main$1.apply(App.scala:76)
>   at scala.collection.immutable.List.foreach(List.scala:381)
>   at 
> scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
>   at scala.App$class.main(App.scala:76)
>   at 
> com.remitly.flink.TestReproductionApp$.main(TestReproductionApp.scala:22)
>   at com.remitly.flink.TestReproductionApp.main(TestReproductionApp.scala)
> Caused by: java.lang.ClassCastException: java.util.GregorianCalendar cannot 
> be cast to java.util.Date
>   at 
> org.apache.flink.table.expressions.Literal.dateToCalendar(literals.scala:107)
>   at 
> org.apache.flink.table.expressions.Literal.toRexNode(literals.scala:80)
>   at 
> org.apache.flink.table.expressions.BinaryComparison$$anonfun$toRexNode$1.apply(comparison.scala:35)
>   at 
> org.apache.flink.table.expressions.BinaryComparison$$anonfun$toRexNode$1.apply(comparison.scala:35)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at scala.collection.immutable.List.foreach(List.scala:381)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.immutable.List.map(List.scala:285)
>   at 
> org.apache.flink.table.expressions.BinaryComparison.toRexNode(comparison.scala:35)
>   at 
> org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule$$anonfun$1.apply(PushFilterIntoTableSourceScanRule.scala:92)
>   at 
> org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule$$anonfun$1.apply(PushFilterIntoTableSourceScanRule.scala:92)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
>   at 

[jira] [Updated] (FLINK-7657) SQL Timestamps Converted To Wrong Type By Optimizer Causing ClassCastException

2017-09-20 Thread Kent Murra (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7657?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kent Murra updated FLINK-7657:
--
Description: 
I have a SQL statement using the Tables API that has a timestamp in it. When 
the execution environment tries to optimize the SQL, it causes an exception 
(attached below).  The result is any SQL query with a timestamp, date, or time 
literal is unexecutable if any table source is marked with 
FilterableTableSource. 

{code:none} 
Exception in thread "main" java.lang.RuntimeException: Error while applying 
rule PushFilterIntoTableSourceScanRule, args 
[rel#30:FlinkLogicalCalc.LOGICAL(input=rel#29:Subset#0.LOGICAL,expr#0..1={inputs},expr#2=2017-05-01,expr#3=>($t1,
 $t2),data=$t0,last_updated=$t1,$condition=$t3), Scan(table:[test_table], 
fields:(data, last_updated))]
at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:235)
at 
org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:650)
at 
org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:368)
at 
org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:266)
at 
org.apache.flink.table.api.BatchTableEnvironment.optimize(BatchTableEnvironment.scala:298)
at 
org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:328)
at 
org.apache.flink.table.api.BatchTableEnvironment.writeToSink(BatchTableEnvironment.scala:135)
at org.apache.flink.table.api.Table.writeToSink(table.scala:800)
at org.apache.flink.table.api.Table.writeToSink(table.scala:773)
at 
com.remitly.flink.TestReproductionApp$.delayedEndpoint$com$remitly$flink$TestReproductionApp$1(TestReproductionApp.scala:27)
at 
com.remitly.flink.TestReproductionApp$delayedInit$body.apply(TestReproductionApp.scala:22)
at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
at 
scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.collection.immutable.List.foreach(List.scala:381)
at 
scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.App$class.main(App.scala:76)
at 
com.remitly.flink.TestReproductionApp$.main(TestReproductionApp.scala:22)
at com.remitly.flink.TestReproductionApp.main(TestReproductionApp.scala)
Caused by: java.lang.ClassCastException: java.util.GregorianCalendar cannot be 
cast to java.util.Date
at 
org.apache.flink.table.expressions.Literal.dateToCalendar(literals.scala:107)
at 
org.apache.flink.table.expressions.Literal.toRexNode(literals.scala:80)
at 
org.apache.flink.table.expressions.BinaryComparison$$anonfun$toRexNode$1.apply(comparison.scala:35)
at 
org.apache.flink.table.expressions.BinaryComparison$$anonfun$toRexNode$1.apply(comparison.scala:35)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:285)
at 
org.apache.flink.table.expressions.BinaryComparison.toRexNode(comparison.scala:35)
at 
org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule$$anonfun$1.apply(PushFilterIntoTableSourceScanRule.scala:92)
at 
org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule$$anonfun$1.apply(PushFilterIntoTableSourceScanRule.scala:92)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at 
org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule.pushFilterIntoScan(PushFilterIntoTableSourceScanRule.scala:92)
at 
org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule.onMatch(PushFilterIntoTableSourceScanRule.scala:56)
at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:211)
... 19 more
{code}

I've done quite a bit of debugging on this and 

[jira] [Updated] (FLINK-7657) SQL Timestamps Converted To Wrong Type By Optimizer Causing ClassCastException

2017-09-20 Thread Kent Murra (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7657?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kent Murra updated FLINK-7657:
--
Description: 
I have a SQL statement using the Tables API that has a timestamp in it. When 
the execution environment tries to optimize the SQL, it causes an exception 
(attached below).  The result is any SQL query with a timestamp, date, or time 
literal is unexecutable if any table source is marked with 
FilterableTableSource. 

{code:none} 
Exception in thread "main" java.lang.RuntimeException: Error while applying 
rule PushFilterIntoTableSourceScanRule, args 
[rel#30:FlinkLogicalCalc.LOGICAL(input=rel#29:Subset#0.LOGICAL,expr#0..1={inputs},expr#2=2017-05-01,expr#3=>($t1,
 $t2),data=$t0,last_updated=$t1,$condition=$t3), Scan(table:[test_table], 
fields:(data, last_updated))]
at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:235)
at 
org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:650)
at 
org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:368)
at 
org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:266)
at 
org.apache.flink.table.api.BatchTableEnvironment.optimize(BatchTableEnvironment.scala:298)
at 
org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:328)
at 
org.apache.flink.table.api.BatchTableEnvironment.writeToSink(BatchTableEnvironment.scala:135)
at org.apache.flink.table.api.Table.writeToSink(table.scala:800)
at org.apache.flink.table.api.Table.writeToSink(table.scala:773)
at 
com.remitly.flink.TestReproductionApp$.delayedEndpoint$com$remitly$flink$TestReproductionApp$1(TestReproductionApp.scala:27)
at 
com.remitly.flink.TestReproductionApp$delayedInit$body.apply(TestReproductionApp.scala:22)
at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
at 
scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.collection.immutable.List.foreach(List.scala:381)
at 
scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.App$class.main(App.scala:76)
at 
com.remitly.flink.TestReproductionApp$.main(TestReproductionApp.scala:22)
at com.remitly.flink.TestReproductionApp.main(TestReproductionApp.scala)
Caused by: java.lang.ClassCastException: java.util.GregorianCalendar cannot be 
cast to java.util.Date
at 
org.apache.flink.table.expressions.Literal.dateToCalendar(literals.scala:107)
at 
org.apache.flink.table.expressions.Literal.toRexNode(literals.scala:80)
at 
org.apache.flink.table.expressions.BinaryComparison$$anonfun$toRexNode$1.apply(comparison.scala:35)
at 
org.apache.flink.table.expressions.BinaryComparison$$anonfun$toRexNode$1.apply(comparison.scala:35)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:285)
at 
org.apache.flink.table.expressions.BinaryComparison.toRexNode(comparison.scala:35)
at 
org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule$$anonfun$1.apply(PushFilterIntoTableSourceScanRule.scala:92)
at 
org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule$$anonfun$1.apply(PushFilterIntoTableSourceScanRule.scala:92)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at 
org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule.pushFilterIntoScan(PushFilterIntoTableSourceScanRule.scala:92)
at 
org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule.onMatch(PushFilterIntoTableSourceScanRule.scala:56)
at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:211)
... 19 more
{code}

I've done quite a bit of debugging on this and 

[jira] [Created] (FLINK-7657) SQL Timestamps Converted To Wrong Type By Optimizer Causing ClassCastException

2017-09-20 Thread Kent Murra (JIRA)
Kent Murra created FLINK-7657:
-

 Summary: SQL Timestamps Converted To Wrong Type By Optimizer 
Causing ClassCastException
 Key: FLINK-7657
 URL: https://issues.apache.org/jira/browse/FLINK-7657
 Project: Flink
  Issue Type: Bug
  Components: Table API & SQL
Reporter: Kent Murra
Priority: Critical


I have a SQL statement using the Tables API that has a timestamp in it. When 
the execution environment tries to optimize the SQL, it causes an exception 
(attached below).  The result is any SQL query with a timestamp, date, or time 
literal is unexecutable if any table source is marked with 
FilterableTableSource. {code:none} Exception in thread "main" 
java.lang.RuntimeException: Error while applying rule 
PushFilterIntoTableSourceScanRule, args 
[rel#30:FlinkLogicalCalc.LOGICAL(input=rel#29:Subset#0.LOGICAL,expr#0..1={inputs},expr#2=2017-05-01,expr#3=>($t1,
 $t2),data=$t0,last_updated=$t1,$condition=$t3), Scan(table:[test_table], 
fields:(data, last_updated))] at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:235)
 at 
org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:650)
 at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:368) at 
org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:266)
 at 
org.apache.flink.table.api.BatchTableEnvironment.optimize(BatchTableEnvironment.scala:298)
 at 
org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:328)
 at 
org.apache.flink.table.api.BatchTableEnvironment.writeToSink(BatchTableEnvironment.scala:135)
 at org.apache.flink.table.api.Table.writeToSink(table.scala:800) at 
org.apache.flink.table.api.Table.writeToSink(table.scala:773) at 
com.remitly.flink.TestReproductionApp$.delayedEndpoint$com$remitly$flink$TestReproductionApp$1(TestReproductionApp.scala:27)
 at 
com.remitly.flink.TestReproductionApp$delayedInit$body.apply(TestReproductionApp.scala:22)
 at scala.Function0$class.apply$mcV$sp(Function0.scala:34) at 
scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) at 
scala.App$$anonfun$main$1.apply(App.scala:76) at 
scala.App$$anonfun$main$1.apply(App.scala:76) at 
scala.collection.immutable.List.foreach(List.scala:381) at 
scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
 at scala.App$class.main(App.scala:76) at 
com.remitly.flink.TestReproductionApp$.main(TestReproductionApp.scala:22) at 
com.remitly.flink.TestReproductionApp.main(TestReproductionApp.scala) Caused 
by: java.lang.ClassCastException: java.util.GregorianCalendar cannot be cast to 
java.util.Date at 
org.apache.flink.table.expressions.Literal.dateToCalendar(literals.scala:107) 
at org.apache.flink.table.expressions.Literal.toRexNode(literals.scala:80) at 
org.apache.flink.table.expressions.BinaryComparison$$anonfun$toRexNode$1.apply(comparison.scala:35)
 at 
org.apache.flink.table.expressions.BinaryComparison$$anonfun$toRexNode$1.apply(comparison.scala:35)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at scala.collection.immutable.List.foreach(List.scala:381) at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at 
scala.collection.immutable.List.map(List.scala:285) at 
org.apache.flink.table.expressions.BinaryComparison.toRexNode(comparison.scala:35)
 at 
org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule$$anonfun$1.apply(PushFilterIntoTableSourceScanRule.scala:92)
 at 
org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule$$anonfun$1.apply(PushFilterIntoTableSourceScanRule.scala:92)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at scala.collection.Iterator$class.foreach(Iterator.scala:893) at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at 
scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at 
scala.collection.AbstractIterable.foreach(Iterable.scala:54) at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at 
scala.collection.AbstractTraversable.map(Traversable.scala:104) at 
org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule.pushFilterIntoScan(PushFilterIntoTableSourceScanRule.scala:92)
 at 
org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule.onMatch(PushFilterIntoTableSourceScanRule.scala:56)
 at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:211)
 ... 19 more {code} I've done quite a bit of debugging on this and tracked it 
down to a problem with the way a Calcite AST is translated into an Expression 
tree for the 

[GitHub] flink pull request #4603: [FLINK-7534] Create LegacyRestHandlerAdapter for o...

2017-09-20 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4603


---


[jira] [Commented] (FLINK-7534) Create LegacyRestHandlerAdapter for the old REST handlers

2017-09-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7534?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16173388#comment-16173388
 ] 

ASF GitHub Bot commented on FLINK-7534:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4603


> Create LegacyRestHandlerAdapter for the old REST handlers
> -
>
> Key: FLINK-7534
> URL: https://issues.apache.org/jira/browse/FLINK-7534
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST, Webfrontend
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> Create a {{LegacyRestHandlerAdapter}} to integrate the old REST handler with 
> the new {{RestServerEndpoint}} and the {{AbstractRestHandler}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-7534) Create LegacyRestHandlerAdapter for the old REST handlers

2017-09-20 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7534?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-7534.

   Resolution: Fixed
Fix Version/s: 1.4.0

Added via dbabdb1cc2c122dbf1e83ffb9960491eaf4914bb

> Create LegacyRestHandlerAdapter for the old REST handlers
> -
>
> Key: FLINK-7534
> URL: https://issues.apache.org/jira/browse/FLINK-7534
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST, Webfrontend
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> Create a {{LegacyRestHandlerAdapter}} to integrate the old REST handler with 
> the new {{RestServerEndpoint}} and the {{AbstractRestHandler}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7655) Revisit default non-leader id for FencedRpcEndpoints

2017-09-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16173367#comment-16173367
 ] 

ASF GitHub Bot commented on FLINK-7655:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/4689

[FLINK-7655] [flip6] Set fencing token to null if not leader

## What is the purpose of the change

This commit changes the fencing behaviour such that a component which is 
not the
leader will set its fencing token to null. This distinction allows to throw 
different
exceptions depending on whether it is a token mismatch or whether the 
receiver has
no fencing token set (== not being the leader).

## Brief change log

- allow the fencing token to be null in `FencedRpcEndpoint`
- allow `FencedAkkaInvocationHandler` to send a `null` fencing token
- filter out messages if component is not a leader in `FencedAkkaRpcActor`
- Adapt `Dispatcher`, `ResourceManager` and `JobMaster` to set fencing 
token to `null` at start-up

## Verifying this change

This change is already covered by existing tests, such as 
`FencedRpcEndpointTest` and `AsyncCallsTest`.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink defaultLeaderIds

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4689.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4689


commit c4bf767afcc28f96fbfed35cc97ce58b5de009fa
Author: Till Rohrmann 
Date:   2017-09-20T15:39:35Z

[FLINK-7655] [flip6] Set fencing token to null if not leader

This commit changes the fencing behaviour such that a component which is 
not the
leader will set its fencing token to null. This distinction allows to throw 
different
exceptions depending on whether it is a token mismatch or whether the 
receiver has
no fencing token set (== not being the leader).




> Revisit default non-leader id for FencedRpcEndpoints
> 
>
> Key: FLINK-7655
> URL: https://issues.apache.org/jira/browse/FLINK-7655
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: flip-6
>
> Currently, when a {{FencedRpcEndpoint}} loses leadership, we set its leader 
> id to a random value. This can be problematic, even though it's unlikely, 
> because we might set it to a value which is used somewhere else (e.g. the 
> currently valid leader id). I think it would  be better to simply set the 
> leader id to {{null}} in order to properly encode that the 
> {{FencedRpcEndpoint}} is no longer a leader.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4689: [FLINK-7655] [flip6] Set fencing token to null if ...

2017-09-20 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/4689

[FLINK-7655] [flip6] Set fencing token to null if not leader

## What is the purpose of the change

This commit changes the fencing behaviour such that a component which is 
not the
leader will set its fencing token to null. This distinction allows to throw 
different
exceptions depending on whether it is a token mismatch or whether the 
receiver has
no fencing token set (== not being the leader).

## Brief change log

- allow the fencing token to be null in `FencedRpcEndpoint`
- allow `FencedAkkaInvocationHandler` to send a `null` fencing token
- filter out messages if component is not a leader in `FencedAkkaRpcActor`
- Adapt `Dispatcher`, `ResourceManager` and `JobMaster` to set fencing 
token to `null` at start-up

## Verifying this change

This change is already covered by existing tests, such as 
`FencedRpcEndpointTest` and `AsyncCallsTest`.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink defaultLeaderIds

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4689.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4689


commit c4bf767afcc28f96fbfed35cc97ce58b5de009fa
Author: Till Rohrmann 
Date:   2017-09-20T15:39:35Z

[FLINK-7655] [flip6] Set fencing token to null if not leader

This commit changes the fencing behaviour such that a component which is 
not the
leader will set its fencing token to null. This distinction allows to throw 
different
exceptions depending on whether it is a token mismatch or whether the 
receiver has
no fencing token set (== not being the leader).




---


[jira] [Commented] (FLINK-7638) Port CurrentJobsOverviewHandler to new REST endpoint

2017-09-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16173295#comment-16173295
 ] 

ASF GitHub Bot commented on FLINK-7638:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/4688

[FLINK-7638] [flip6] Port CurrentJobsOverviewHandler to new REST endpoint

## What is the purpose of the change

Ports the CurrentJobsOverviewHandler to the new REST endpoint by letting it 
implement
the LegacyRestHandler interface. This commit changes the JobDetails JSON 
such that it
now contains the number of tasks for each ExecutionState, including 
SCHEDULED,
DEPLOYING, CREATED and RECONCILING. These state will now also be displayed 
in the
web frontend.

## Verifying this change

- Have tested manually that the old web frontend is still working
- Added `JobDetailsTest` and `MultipleJobsDetailsTest` which test the 
un/marshalling

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink 
portCurrentJobsOverviewHandler

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4688.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4688


commit d794ac015dfa081f637b5bb490919cef7b3eed3a
Author: Till Rohrmann 
Date:   2017-08-18T14:18:19Z

[FLINK-7534] Create LegacyRestHandlerAdapter for old REST handlers

Introduce LegacyRestHandler interface which the old REST handler have to 
implement
in order to make them usable for the RestServerEndpoint in combination with 
the
LegacyRestHandlerAdapter. The LegacyRestHandlerAdapter extends the 
AbstractRestHandler
and runs the LegacyRestHandler implementation.

As an example, this commit ports the ClusterOverviewHandler to the new 
interface. The
Dispatcher side still has to be properly implemented.

commit dd21855227b577c09ed69a43e31aad2bfd23f95a
Author: Till Rohrmann 
Date:   2017-08-21T13:11:08Z

[FLINK-7535] Port DashboardConfigHandler to new REST endpoint

Lets DashboardConfigHandler implement the LegacyRestHandler. Moreover, this
commit defines the appropriate DashboardConfigurationHeaders.

The DispatcherRestEndpoint registers the DashboardConfigHandler.

This closes #4604.

commit 60c4ac7b0456167e0a3e4e067b7d6641ca7cd5b6
Author: Till Rohrmann 
Date:   2017-09-18T20:58:22Z

[FLINK-7638] [flip6] Port CurrentJobsOverviewHandler to new REST endpoint

Ports the CurrentJobsOverviewHandler to the new REST endpoint by letting it 
implement
the LegacyRestHandler interface. This commit changes the JobDetails JSON 
such that it
now contains the number of tasks for each ExecutionState, including 
SCHEDULED,
DEPLOYING, CREATED and RECONCILING. These state will now also be displayed 
in the
web frontend.




> Port CurrentJobsOverviewHandler to new REST endpoint
> 
>
> Key: FLINK-7638
> URL: https://issues.apache.org/jira/browse/FLINK-7638
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST, Webfrontend
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: flip-6
>
> Port the existing {{CurrentJobsOverviewHandler}} to the new REST endpoint.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4688: [FLINK-7638] [flip6] Port CurrentJobsOverviewHandl...

2017-09-20 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/4688

[FLINK-7638] [flip6] Port CurrentJobsOverviewHandler to new REST endpoint

## What is the purpose of the change

Ports the CurrentJobsOverviewHandler to the new REST endpoint by letting it 
implement
the LegacyRestHandler interface. This commit changes the JobDetails JSON 
such that it
now contains the number of tasks for each ExecutionState, including 
SCHEDULED,
DEPLOYING, CREATED and RECONCILING. These state will now also be displayed 
in the
web frontend.

## Verifying this change

- Have tested manually that the old web frontend is still working
- Added `JobDetailsTest` and `MultipleJobsDetailsTest` which test the 
un/marshalling

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink 
portCurrentJobsOverviewHandler

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4688.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4688


commit d794ac015dfa081f637b5bb490919cef7b3eed3a
Author: Till Rohrmann 
Date:   2017-08-18T14:18:19Z

[FLINK-7534] Create LegacyRestHandlerAdapter for old REST handlers

Introduce LegacyRestHandler interface which the old REST handler have to 
implement
in order to make them usable for the RestServerEndpoint in combination with 
the
LegacyRestHandlerAdapter. The LegacyRestHandlerAdapter extends the 
AbstractRestHandler
and runs the LegacyRestHandler implementation.

As an example, this commit ports the ClusterOverviewHandler to the new 
interface. The
Dispatcher side still has to be properly implemented.

commit dd21855227b577c09ed69a43e31aad2bfd23f95a
Author: Till Rohrmann 
Date:   2017-08-21T13:11:08Z

[FLINK-7535] Port DashboardConfigHandler to new REST endpoint

Lets DashboardConfigHandler implement the LegacyRestHandler. Moreover, this
commit defines the appropriate DashboardConfigurationHeaders.

The DispatcherRestEndpoint registers the DashboardConfigHandler.

This closes #4604.

commit 60c4ac7b0456167e0a3e4e067b7d6641ca7cd5b6
Author: Till Rohrmann 
Date:   2017-09-18T20:58:22Z

[FLINK-7638] [flip6] Port CurrentJobsOverviewHandler to new REST endpoint

Ports the CurrentJobsOverviewHandler to the new REST endpoint by letting it 
implement
the LegacyRestHandler interface. This commit changes the JobDetails JSON 
such that it
now contains the number of tasks for each ExecutionState, including 
SCHEDULED,
DEPLOYING, CREATED and RECONCILING. These state will now also be displayed 
in the
web frontend.




---


[jira] [Updated] (FLINK-5372) Fix RocksDBAsyncSnapshotTest.testCancelFullyAsyncCheckpoints()

2017-09-20 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5372?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-5372:

Priority: Blocker  (was: Major)

> Fix RocksDBAsyncSnapshotTest.testCancelFullyAsyncCheckpoints()
> --
>
> Key: FLINK-5372
> URL: https://issues.apache.org/jira/browse/FLINK-5372
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.4.0
>
>
> The test is currently {{@Ignored}}. We have to change 
> {{AsyncCheckpointOperator}} to make sure that we can run fully 
> asynchronously. Then, the test will still fail because the canceling 
> behaviour was changed in the meantime.
> {code}
> public static class AsyncCheckpointOperator
> extends AbstractStreamOperator
> implements OneInputStreamOperator {
> @Override
> public void open() throws Exception {
> super.open();
> // also get the state in open, this way we are sure that it was 
> created before
> // we trigger the test checkpoint
> ValueState state = getPartitionedState(
> VoidNamespace.INSTANCE,
> VoidNamespaceSerializer.INSTANCE,
> new ValueStateDescriptor<>("count",
> StringSerializer.INSTANCE, "hello"));
> }
> @Override
> public void processElement(StreamRecord element) throws Exception 
> {
> // we also don't care
> ValueState state = getPartitionedState(
> VoidNamespace.INSTANCE,
> VoidNamespaceSerializer.INSTANCE,
> new ValueStateDescriptor<>("count",
> StringSerializer.INSTANCE, "hello"));
> state.update(element.getValue());
> }
> @Override
> public void snapshotState(StateSnapshotContext context) throws Exception {
> // do nothing so that we don't block
> }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-5372) Fix RocksDBAsyncSnapshotTest.testCancelFullyAsyncCheckpoints()

2017-09-20 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5372?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-5372:

Fix Version/s: 1.4.0

> Fix RocksDBAsyncSnapshotTest.testCancelFullyAsyncCheckpoints()
> --
>
> Key: FLINK-5372
> URL: https://issues.apache.org/jira/browse/FLINK-5372
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 1.4.0
>
>
> The test is currently {{@Ignored}}. We have to change 
> {{AsyncCheckpointOperator}} to make sure that we can run fully 
> asynchronously. Then, the test will still fail because the canceling 
> behaviour was changed in the meantime.
> {code}
> public static class AsyncCheckpointOperator
> extends AbstractStreamOperator
> implements OneInputStreamOperator {
> @Override
> public void open() throws Exception {
> super.open();
> // also get the state in open, this way we are sure that it was 
> created before
> // we trigger the test checkpoint
> ValueState state = getPartitionedState(
> VoidNamespace.INSTANCE,
> VoidNamespaceSerializer.INSTANCE,
> new ValueStateDescriptor<>("count",
> StringSerializer.INSTANCE, "hello"));
> }
> @Override
> public void processElement(StreamRecord element) throws Exception 
> {
> // we also don't care
> ValueState state = getPartitionedState(
> VoidNamespace.INSTANCE,
> VoidNamespaceSerializer.INSTANCE,
> new ValueStateDescriptor<>("count",
> StringSerializer.INSTANCE, "hello"));
> state.update(element.getValue());
> }
> @Override
> public void snapshotState(StateSnapshotContext context) throws Exception {
> // do nothing so that we don't block
> }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-4228) RocksDB semi-async snapshot to S3AFileSystem fails

2017-09-20 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16173278#comment-16173278
 ] 

Aljoscha Krettek commented on FLINK-4228:
-

The other parts are not relevant anymore since the RocksDB backend has evolved 
quite a bit.

> RocksDB semi-async snapshot to S3AFileSystem fails
> --
>
> Key: FLINK-4228
> URL: https://issues.apache.org/jira/browse/FLINK-4228
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Using the {{RocksDBStateBackend}} with semi-async snapshots (current default) 
> leads to an Exception when uploading the snapshot to S3 when using the 
> {{S3AFileSystem}}.
> {code}
> AsynchronousException{com.amazonaws.AmazonClientException: Unable to 
> calculate MD5 hash: 
> /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
>  (Is a directory)}
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointThread.run(StreamTask.java:870)
> Caused by: com.amazonaws.AmazonClientException: Unable to calculate MD5 hash: 
> /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
>  (Is a directory)
>   at 
> com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1298)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:108)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:100)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.upload(UploadMonitor.java:192)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:150)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:50)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.FileNotFoundException: 
> /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
>  (Is a directory)
>   at java.io.FileInputStream.open0(Native Method)
>   at java.io.FileInputStream.open(FileInputStream.java:195)
>   at java.io.FileInputStream.(FileInputStream.java:138)
>   at 
> com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1294)
>   ... 9 more
> {code}
> Running with S3NFileSystem, the error does not occur. The problem might be 
> due to {{HDFSCopyToLocal}} assuming that sub-folders are going to be created 
> automatically. We might need to manually create folders and copy only actual 
> files for {{S3AFileSystem}}. More investigation is required.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-4228) RocksDB semi-async snapshot to S3AFileSystem fails

2017-09-20 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-4228:

Priority: Blocker  (was: Major)

> RocksDB semi-async snapshot to S3AFileSystem fails
> --
>
> Key: FLINK-4228
> URL: https://issues.apache.org/jira/browse/FLINK-4228
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Using the {{RocksDBStateBackend}} with semi-async snapshots (current default) 
> leads to an Exception when uploading the snapshot to S3 when using the 
> {{S3AFileSystem}}.
> {code}
> AsynchronousException{com.amazonaws.AmazonClientException: Unable to 
> calculate MD5 hash: 
> /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
>  (Is a directory)}
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointThread.run(StreamTask.java:870)
> Caused by: com.amazonaws.AmazonClientException: Unable to calculate MD5 hash: 
> /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
>  (Is a directory)
>   at 
> com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1298)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:108)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:100)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.upload(UploadMonitor.java:192)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:150)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:50)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.FileNotFoundException: 
> /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
>  (Is a directory)
>   at java.io.FileInputStream.open0(Native Method)
>   at java.io.FileInputStream.open(FileInputStream.java:195)
>   at java.io.FileInputStream.(FileInputStream.java:138)
>   at 
> com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1294)
>   ... 9 more
> {code}
> Running with S3NFileSystem, the error does not occur. The problem might be 
> due to {{HDFSCopyToLocal}} assuming that sub-folders are going to be created 
> automatically. We might need to manually create folders and copy only actual 
> files for {{S3AFileSystem}}. More investigation is required.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-4228) RocksDB semi-async snapshot to S3AFileSystem fails

2017-09-20 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-4228:

Fix Version/s: 1.4.0

> RocksDB semi-async snapshot to S3AFileSystem fails
> --
>
> Key: FLINK-4228
> URL: https://issues.apache.org/jira/browse/FLINK-4228
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
> Fix For: 1.4.0
>
>
> Using the {{RocksDBStateBackend}} with semi-async snapshots (current default) 
> leads to an Exception when uploading the snapshot to S3 when using the 
> {{S3AFileSystem}}.
> {code}
> AsynchronousException{com.amazonaws.AmazonClientException: Unable to 
> calculate MD5 hash: 
> /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
>  (Is a directory)}
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointThread.run(StreamTask.java:870)
> Caused by: com.amazonaws.AmazonClientException: Unable to calculate MD5 hash: 
> /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
>  (Is a directory)
>   at 
> com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1298)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:108)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:100)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.upload(UploadMonitor.java:192)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:150)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:50)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.FileNotFoundException: 
> /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
>  (Is a directory)
>   at java.io.FileInputStream.open0(Native Method)
>   at java.io.FileInputStream.open(FileInputStream.java:195)
>   at java.io.FileInputStream.(FileInputStream.java:138)
>   at 
> com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1294)
>   ... 9 more
> {code}
> Running with S3NFileSystem, the error does not occur. The problem might be 
> due to {{HDFSCopyToLocal}} assuming that sub-folders are going to be created 
> automatically. We might need to manually create folders and copy only actual 
> files for {{S3AFileSystem}}. More investigation is required.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-4228) RocksDB semi-async snapshot to S3AFileSystem fails

2017-09-20 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16173277#comment-16173277
 ] 

Aljoscha Krettek commented on FLINK-4228:
-

Moved to critical for the part about checking whether submission using s3a as 
configured FileSystem works.

> RocksDB semi-async snapshot to S3AFileSystem fails
> --
>
> Key: FLINK-4228
> URL: https://issues.apache.org/jira/browse/FLINK-4228
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Using the {{RocksDBStateBackend}} with semi-async snapshots (current default) 
> leads to an Exception when uploading the snapshot to S3 when using the 
> {{S3AFileSystem}}.
> {code}
> AsynchronousException{com.amazonaws.AmazonClientException: Unable to 
> calculate MD5 hash: 
> /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
>  (Is a directory)}
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointThread.run(StreamTask.java:870)
> Caused by: com.amazonaws.AmazonClientException: Unable to calculate MD5 hash: 
> /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
>  (Is a directory)
>   at 
> com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1298)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:108)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:100)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.upload(UploadMonitor.java:192)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:150)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:50)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.FileNotFoundException: 
> /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
>  (Is a directory)
>   at java.io.FileInputStream.open0(Native Method)
>   at java.io.FileInputStream.open(FileInputStream.java:195)
>   at java.io.FileInputStream.(FileInputStream.java:138)
>   at 
> com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1294)
>   ... 9 more
> {code}
> Running with S3NFileSystem, the error does not occur. The problem might be 
> due to {{HDFSCopyToLocal}} assuming that sub-folders are going to be created 
> automatically. We might need to manually create folders and copy only actual 
> files for {{S3AFileSystem}}. More investigation is required.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6233) Support rowtime inner equi-join between two streams in the SQL API

2017-09-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16173246#comment-16173246
 ] 

ASF GitHub Bot commented on FLINK-6233:
---

Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4625#discussion_r139986193
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala
 ---
@@ -383,13 +384,158 @@ class JoinHarnessTest extends HarnessTestBase{
 val expectedOutput = new ConcurrentLinkedQueue[Object]()
 
 expectedOutput.add(new StreamRecord(
-  CRow(Row.of(2: JInt, "aaa2", 2: JInt, "bbb7"), true), 7))
+  CRow(Row.of(2L: JLong, "aaa2", 2L: JLong, "bbb7"), true), 7))
 expectedOutput.add(new StreamRecord(
-  CRow(Row.of(1: JInt, "aaa3", 1: JInt, "bbb12"), true), 12))
+  CRow(Row.of(1L: JLong, "aaa3", 1L: JLong, "bbb12"), true), 12))
 
 verify(expectedOutput, result, new RowResultSortComparator())
 
 testHarness.close()
   }
 
+  /** a.c1 >= b.rowtime - 10 and a.rowtime <= b.rowtime + 20 **/
+  @Test
+  def testCommonRowTimeJoin() {
--- End diff --

Oh, sorry I miss this part. Will add soon.


> Support rowtime inner equi-join between two streams in the SQL API
> --
>
> Key: FLINK-6233
> URL: https://issues.apache.org/jira/browse/FLINK-6233
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: hongyuhong
>Assignee: Xingcan Cui
>
> The goal of this issue is to add support for inner equi-join on proc time 
> streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT o.rowtime , o.productId, o.orderId, s.rowtime AS shipTime 
> FROM Orders AS o 
> JOIN Shipments AS s 
> ON o.orderId = s.orderId 
> AND o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL '1' HOUR;
> {code}
> The following restrictions should initially apply:
> * The join hint only support inner join
> * The ON clause should include equi-join condition
> * The time-condition {{o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL 
> '1' HOUR}} only can use rowtime that is a system attribute, the time 
> condition only support bounded time range like {{o.rowtime BETWEEN s.rowtime 
> - INTERVAL '1' HOUR AND s.rowtime + INTERVAL '1' HOUR}}, not support 
> unbounded like {{o.rowtime  s.rowtime}} ,  and  should include both two 
> stream's rowtime attribute, {{o.rowtime between rowtime () and rowtime () + 
> 1}} should also not be supported.
> An row-time streams join will not be able to handle late data, because this 
> would mean in insert a row into a sorted order shift all other computations. 
> This would be too expensive to maintain. Therefore, we will throw an error if 
> a user tries to use an row-time stream join with late data handling.
> This issue includes:
> * Design of the DataStream operator to deal with stream join
> * Translation from Calcite's RelNode representation (LogicalJoin). 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-7357) HOP_START() HOP_END() does not work when using HAVING clause with GROUP BY HOP window

2017-09-20 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7357?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske closed FLINK-7357.

   Resolution: Fixed
Fix Version/s: 1.3.3
   1.4.0
 Release Note: 
Fixed for 1.3.3 with cc71dec108f28562bca5f99c53950a7be6d1ba54
Fixed for 1.4.0 with df5efe9cead172994abb2fd4858a27caacd9468c

> HOP_START() HOP_END() does not work when using HAVING clause with GROUP BY 
> HOP window
> -
>
> Key: FLINK-7357
> URL: https://issues.apache.org/jira/browse/FLINK-7357
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.3.1
>Reporter: Rong Rong
>Assignee: Rong Rong
> Fix For: 1.4.0, 1.3.3
>
>
> The following SQL does not compile:
> {code:title=invalid_having_hop_start_sql}
> SELECT 
>   c AS k, 
>   COUNT(a) AS v, 
>   HOP_START(rowtime, INTERVAL '1' MINUTE, INTERVAL '1' MINUTE) AS 
> windowStart, 
>   HOP_END(rowtime, INTERVAL '1' MINUTE, INTERVAL '1' MINUTE) AS windowEnd 
> FROM 
>   T1 
> GROUP BY 
>   HOP(rowtime, INTERVAL '1' MINUTE, INTERVAL '1' MINUTE), 
>   c 
> HAVING 
>   SUM(b) > 1
> {code}
> While individually keeping HAVING clause or HOP_START field compiles and runs 
> without issue.
> more details: 
> https://github.com/apache/flink/compare/master...walterddr:having_does_not_work_with_hop_start_end



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...

2017-09-20 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4625#discussion_r139986193
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala
 ---
@@ -383,13 +384,158 @@ class JoinHarnessTest extends HarnessTestBase{
 val expectedOutput = new ConcurrentLinkedQueue[Object]()
 
 expectedOutput.add(new StreamRecord(
-  CRow(Row.of(2: JInt, "aaa2", 2: JInt, "bbb7"), true), 7))
+  CRow(Row.of(2L: JLong, "aaa2", 2L: JLong, "bbb7"), true), 7))
 expectedOutput.add(new StreamRecord(
-  CRow(Row.of(1: JInt, "aaa3", 1: JInt, "bbb12"), true), 12))
+  CRow(Row.of(1L: JLong, "aaa3", 1L: JLong, "bbb12"), true), 12))
 
 verify(expectedOutput, result, new RowResultSortComparator())
 
 testHarness.close()
   }
 
+  /** a.c1 >= b.rowtime - 10 and a.rowtime <= b.rowtime + 20 **/
+  @Test
+  def testCommonRowTimeJoin() {
--- End diff --

Oh, sorry I miss this part. Will add soon.


---


[jira] [Commented] (FLINK-6233) Support rowtime inner equi-join between two streams in the SQL API

2017-09-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16173236#comment-16173236
 ] 

ASF GitHub Bot commented on FLINK-6233:
---

Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4625#discussion_r139983961
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala
 ---
@@ -131,340 +116,308 @@ class TimeBoundedStreamInnerJoin(
 // Initialize the data caches.
 val leftListTypeInfo: TypeInformation[JList[Row]] = new 
ListTypeInfo[Row](leftType)
 val leftStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
-  new MapStateDescriptor[Long, JList[Row]](timeIndicator + 
"InnerJoinLeftCache",
-BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
leftListTypeInfo)
+  new MapStateDescriptor[Long, JList[Row]](
+timeIndicator + "InnerJoinLeftCache",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
+leftListTypeInfo)
 leftCache = getRuntimeContext.getMapState(leftStateDescriptor)
 
 val rightListTypeInfo: TypeInformation[JList[Row]] = new 
ListTypeInfo[Row](rightType)
 val rightStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
-  new MapStateDescriptor[Long, JList[Row]](timeIndicator + 
"InnerJoinRightCache",
-BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rightListTypeInfo)
+  new MapStateDescriptor[Long, JList[Row]](
+timeIndicator + "InnerJoinRightCache",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
+rightListTypeInfo)
 rightCache = getRuntimeContext.getMapState(rightStateDescriptor)
 
 // Initialize the timer states.
 val leftTimerStateDesc: ValueStateDescriptor[Long] =
-  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinLeftTimerState",
-classOf[Long])
+  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinLeftTimerState", classOf[Long])
 leftTimerState = getRuntimeContext.getState(leftTimerStateDesc)
 
 val rightTimerStateDesc: ValueStateDescriptor[Long] =
-  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinRightTimerState",
-classOf[Long])
+  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinRightTimerState", classOf[Long])
 rightTimerState = getRuntimeContext.getState(rightTimerStateDesc)
   }
 
   /**
-* Process records from the left stream.
-*
-* @param cRowValue the input record
-* @param ctx   the context to register timer or get current time
-* @param out   the collector for outputting results
-*
+* Process rows from the left stream.
 */
   override def processElement1(
-cRowValue: CRow,
-ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
-out: Collector[CRow]): Unit = {
-val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, true)
-getCurrentOperatorTime(ctx)
+  cRowValue: CRow,
+  ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+  out: Collector[CRow]): Unit = {
+updateOperatorTime(ctx)
+val rowTime: Long = getTimeForLeftStream(ctx, cRowValue)
+val oppositeLowerBound: Long = rowTime - rightRelativeSize
+val oppositeUpperBound: Long = rowTime + leftRelativeSize
 processElement(
   cRowValue,
-  timeForRecord,
+  rowTime,
   ctx,
   out,
   leftOperatorTime,
+  oppositeLowerBound,
+  oppositeUpperBound,
   rightOperatorTime,
   rightTimerState,
   leftCache,
   rightCache,
-  true
+  leftRow = true
 )
   }
 
   /**
-* Process records from the right stream.
-*
-* @param cRowValue the input record
-* @param ctx   the context to get current time
-* @param out   the collector for outputting results
-*
+* Process rows from the right stream.
 */
   override def processElement2(
-cRowValue: CRow,
-ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
-out: Collector[CRow]): Unit = {
-val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, false)
-getCurrentOperatorTime(ctx)
+  cRowValue: CRow,
+  ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+  out: Collector[CRow]): Unit = {
+updateOperatorTime(ctx)
+val rowTime: Long = getTimeForRightStream(ctx, cRowValue)
+val oppositeLowerBound: Long = rowTime - leftRelativeSize
+val oppositeUpperBound: Long =  rowTime + rightRelativeSize
 

[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...

2017-09-20 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4625#discussion_r139983961
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala
 ---
@@ -131,340 +116,308 @@ class TimeBoundedStreamInnerJoin(
 // Initialize the data caches.
 val leftListTypeInfo: TypeInformation[JList[Row]] = new 
ListTypeInfo[Row](leftType)
 val leftStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
-  new MapStateDescriptor[Long, JList[Row]](timeIndicator + 
"InnerJoinLeftCache",
-BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
leftListTypeInfo)
+  new MapStateDescriptor[Long, JList[Row]](
+timeIndicator + "InnerJoinLeftCache",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
+leftListTypeInfo)
 leftCache = getRuntimeContext.getMapState(leftStateDescriptor)
 
 val rightListTypeInfo: TypeInformation[JList[Row]] = new 
ListTypeInfo[Row](rightType)
 val rightStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
-  new MapStateDescriptor[Long, JList[Row]](timeIndicator + 
"InnerJoinRightCache",
-BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rightListTypeInfo)
+  new MapStateDescriptor[Long, JList[Row]](
+timeIndicator + "InnerJoinRightCache",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
+rightListTypeInfo)
 rightCache = getRuntimeContext.getMapState(rightStateDescriptor)
 
 // Initialize the timer states.
 val leftTimerStateDesc: ValueStateDescriptor[Long] =
-  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinLeftTimerState",
-classOf[Long])
+  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinLeftTimerState", classOf[Long])
 leftTimerState = getRuntimeContext.getState(leftTimerStateDesc)
 
 val rightTimerStateDesc: ValueStateDescriptor[Long] =
-  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinRightTimerState",
-classOf[Long])
+  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinRightTimerState", classOf[Long])
 rightTimerState = getRuntimeContext.getState(rightTimerStateDesc)
   }
 
   /**
-* Process records from the left stream.
-*
-* @param cRowValue the input record
-* @param ctx   the context to register timer or get current time
-* @param out   the collector for outputting results
-*
+* Process rows from the left stream.
 */
   override def processElement1(
-cRowValue: CRow,
-ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
-out: Collector[CRow]): Unit = {
-val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, true)
-getCurrentOperatorTime(ctx)
+  cRowValue: CRow,
+  ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+  out: Collector[CRow]): Unit = {
+updateOperatorTime(ctx)
+val rowTime: Long = getTimeForLeftStream(ctx, cRowValue)
+val oppositeLowerBound: Long = rowTime - rightRelativeSize
+val oppositeUpperBound: Long = rowTime + leftRelativeSize
 processElement(
   cRowValue,
-  timeForRecord,
+  rowTime,
   ctx,
   out,
   leftOperatorTime,
+  oppositeLowerBound,
+  oppositeUpperBound,
   rightOperatorTime,
   rightTimerState,
   leftCache,
   rightCache,
-  true
+  leftRow = true
 )
   }
 
   /**
-* Process records from the right stream.
-*
-* @param cRowValue the input record
-* @param ctx   the context to get current time
-* @param out   the collector for outputting results
-*
+* Process rows from the right stream.
 */
   override def processElement2(
-cRowValue: CRow,
-ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
-out: Collector[CRow]): Unit = {
-val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, false)
-getCurrentOperatorTime(ctx)
+  cRowValue: CRow,
+  ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+  out: Collector[CRow]): Unit = {
+updateOperatorTime(ctx)
+val rowTime: Long = getTimeForRightStream(ctx, cRowValue)
+val oppositeLowerBound: Long = rowTime - leftRelativeSize
+val oppositeUpperBound: Long =  rowTime + rightRelativeSize
 processElement(
   cRowValue,
-  timeForRecord,
+  rowTime,
   ctx,
   out,
   rightOperatorTime,
+  oppositeLowerBound,
+  oppositeUpperBound,
   leftOperatorTime,
  

[jira] [Created] (FLINK-7656) Switch to user ClassLoader when invoking initializeOnMaster finalizeOnMaster

2017-09-20 Thread Fabian Hueske (JIRA)
Fabian Hueske created FLINK-7656:


 Summary: Switch to user ClassLoader when invoking 
initializeOnMaster finalizeOnMaster
 Key: FLINK-7656
 URL: https://issues.apache.org/jira/browse/FLINK-7656
 Project: Flink
  Issue Type: Bug
  Components: Local Runtime
Affects Versions: 1.3.2
Reporter: Fabian Hueske
Assignee: Fabian Hueske


The contract that Flink provides to usercode is that that the usercode 
classloader is the context classloader whenever usercode is called.

In {{OutputFormatVertex}} usercode is called in the {{initializeOnMaster()}} 
and {{finalizeOnMaster()}} methods but the context classloader is not set to 
the usercode classloader.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4687: [hotfix][docs][CEP] wrong method name for PatternF...

2017-09-20 Thread alpinegizmo
GitHub user alpinegizmo opened a pull request:

https://github.com/apache/flink/pull/4687

[hotfix][docs][CEP] wrong method name for PatternFlatSelectFunction

The name of the single abstract method in the PatternFlatSelectFunction 
interface is flatSelect, not select.

This PR is just a trivial change to the docs.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/alpinegizmo/flink PatternFlatSelectFunction

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4687.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4687


commit bf5bcf4b980f168fc01652df6d0bd0e0fe48a8ff
Author: David Anderson 
Date:   2017-09-20T13:57:40Z

[hotfix][docs] method name for PatternFlatSelectFunction should be 
flatSelect




---


[jira] [Commented] (FLINK-7608) LatencyGauge change to histogram metric

2017-09-20 Thread Robert Metzger (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16173193#comment-16173193
 ] 

Robert Metzger commented on FLINK-7608:
---

[~Zentol] Is there a nice way to solve this generically?
I fear that we need to implement some special casing into each MetricReporter 
to properly report data from the LatencyGauge.

> LatencyGauge change to  histogram metric
> 
>
> Key: FLINK-7608
> URL: https://issues.apache.org/jira/browse/FLINK-7608
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Reporter: Hai Zhou_UTC+8
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
>
> I used slf4jReporter[https://issues.apache.org/jira/browse/FLINK-4831]  to 
> export metrics the log file.
> I found:
> {noformat}
> -- Gauges 
> -
> ..
> zhouhai-mbp.taskmanager.f3fd3a269c8c3da4e8319c8f6a201a57.Flink Streaming 
> Job.Map.0.latency:
>  value={LatencySourceDescriptor{vertexID=1, subtaskIndex=-1}={p99=116.0, 
> p50=59.5, min=11.0, max=116.0, p95=116.0, mean=61.836}}
> zhouhai-mbp.taskmanager.f3fd3a269c8c3da4e8319c8f6a201a57.Flink Streaming 
> Job.Sink- Unnamed.0.latency: 
> value={LatencySourceDescriptor{vertexID=1, subtaskIndex=0}={p99=195.0, 
> p50=163.5, min=115.0, max=195.0, p95=195.0, mean=161.0}}
> ..
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4628: [FLINK-7486]:[flink-mesos]:Support for adding unique attr...

2017-09-20 Thread bbayani
Github user bbayani commented on the issue:

https://github.com/apache/flink/pull/4628
  
@EronWright , @tillrohrmann Can you please review this?


---


[jira] [Commented] (FLINK-7486) flink-mesos: Support for adding unique attribute / group_by attribute constraints

2017-09-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16173192#comment-16173192
 ] 

ASF GitHub Bot commented on FLINK-7486:
---

Github user bbayani commented on the issue:

https://github.com/apache/flink/pull/4628
  
@EronWright , @tillrohrmann Can you please review this?


> flink-mesos: Support for adding unique attribute / group_by attribute 
> constraints
> -
>
> Key: FLINK-7486
> URL: https://issues.apache.org/jira/browse/FLINK-7486
> Project: Flink
>  Issue Type: Improvement
>  Components: Mesos
>Affects Versions: 1.3.2
>Reporter: Bhumika Bayani
>
> In our setup, we have multiple mesos-workers. Inspite of this, flink 
> application master most of the times ends up spawning all task-managers on 
> same mesos-worker.
> We intend to ensure HA of task managers. We would like to make sure each 
> task-manager is running on different mesos-worker as well as such 
> mesos-worker which does not share the AZ attribute with earlier task manager 
> instances.
> Netflix-fenzo supports adding UniqueHostAttribute and BalancedHostAttribute 
> contraints. Flink-mesos should also enable us to add these kind of 
> constraints.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4572: [Flink-7243] [connectors] Add ParquetInputFormat

2017-09-20 Thread fpompermaier
Github user fpompermaier commented on a diff in the pull request:

https://github.com/apache/flink/pull/4572#discussion_r139971451
  
--- Diff: flink-connectors/flink-parquet/pom.xml ---
@@ -0,0 +1,81 @@
+
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd;>
+
+4.0.0
+
+
+org.apache.flink
+flink-connectors
+1.4-SNAPSHOT
+..
+
+
+flink-parquet_${scala.binary.version}
+flink-parquet
+
+jar
+
+
+1.8.2
--- End diff --

Why not parquet 1.9.0? Is there any good reason for that?


---


[jira] [Commented] (FLINK-7608) LatencyGauge change to histogram metric

2017-09-20 Thread Hai Zhou_UTC+8 (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16173185#comment-16173185
 ] 

Hai Zhou_UTC+8 commented on FLINK-7608:
---

[~rmetzger]   I do not know jmx, do not know why it is a HashMap, so I do not 
know how to do it.

I will go to explore it if i have free.

> LatencyGauge change to  histogram metric
> 
>
> Key: FLINK-7608
> URL: https://issues.apache.org/jira/browse/FLINK-7608
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Reporter: Hai Zhou_UTC+8
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
>
> I used slf4jReporter[https://issues.apache.org/jira/browse/FLINK-4831]  to 
> export metrics the log file.
> I found:
> {noformat}
> -- Gauges 
> -
> ..
> zhouhai-mbp.taskmanager.f3fd3a269c8c3da4e8319c8f6a201a57.Flink Streaming 
> Job.Map.0.latency:
>  value={LatencySourceDescriptor{vertexID=1, subtaskIndex=-1}={p99=116.0, 
> p50=59.5, min=11.0, max=116.0, p95=116.0, mean=61.836}}
> zhouhai-mbp.taskmanager.f3fd3a269c8c3da4e8319c8f6a201a57.Flink Streaming 
> Job.Sink- Unnamed.0.latency: 
> value={LatencySourceDescriptor{vertexID=1, subtaskIndex=0}={p99=195.0, 
> p50=163.5, min=115.0, max=195.0, p95=195.0, mean=161.0}}
> ..
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...

2017-09-20 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4358#discussion_r139969639
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/TransientBlobCache.java
 ---
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetSocketAddress;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Provides access to transient BLOB files stored at the {@link 
BlobServer}.
+ *
+ * TODO: currently, this is still cache-based with local copies - make 
this truly transient, i.e. return file streams with no local copy
+ */
+public class TransientBlobCache implements TransientBlobService {
+
+   /** The log object used for debugging. */
+   private static final Logger LOG = 
LoggerFactory.getLogger(TransientBlobCache.class);
+
+   /** Counter to generate unique names for temporary files. */
+   private final AtomicLong tempFileCounter = new AtomicLong(0);
+
+   private final InetSocketAddress serverAddress;
+
+   /**
+* Root directory for local file storage
+*/
+   private final File storageDir;
+
+   private final AtomicBoolean shutdownRequested = new AtomicBoolean();
+
+   /** Shutdown hook thread to ensure deletion of the local storage 
directory. */
+   private final Thread shutdownHook;
+
+   /** The number of retries when the transfer fails */
+   private final int numFetchRetries;
+
+   /** Configuration for the blob client like ssl parameters required to 
connect to the blob server */
+   private final Configuration blobClientConfig;
+
+   /** Lock guarding concurrent file accesses */
+   private final ReadWriteLock readWriteLock;
+
+   /**
+* Instantiates a new BLOB cache.
+*
+* @param serverAddress
+*  address of the {@link BlobServer} to use for fetching 
files from
+* @param blobClientConfig
+*  global configuration
+*
+* @throws IOException
+*  thrown if the (local or distributed) file storage 
cannot be created or is not usable
+*/
+   public TransientBlobCache(
+   final InetSocketAddress serverAddress,
+   final Configuration blobClientConfig) throws 
IOException {
+
+   this.serverAddress = checkNotNull(serverAddress);
+   this.blobClientConfig = checkNotNull(blobClientConfig);
+   this.readWriteLock = new ReentrantReadWriteLock();
+
+   // configure and create the storage directory
+   String storageDirectory = 
blobClientConfig.getString(BlobServerOptions.STORAGE_DIRECTORY);
+   this.storageDir = 
BlobUtils.initLocalStorageDirectory(storageDirectory);
+   LOG.info("Created transient BLOB cache storage directory " + 
storageDir);
+
+   // configure the number of fetch retries
+   final int fetchRetries = 
blobClientConfig.getInteger(BlobServerOptions.FETCH_RETRIES);
+   if (fetchRetries >= 0) {
+   this.numFetchRetries = fetchRetries;
+   } else {
+  

[jira] [Commented] (FLINK-7068) change BlobService sub-classes for permanent and transient BLOBs

2017-09-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16173180#comment-16173180
 ] 

ASF GitHub Bot commented on FLINK-7068:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4358#discussion_r139969639
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/TransientBlobCache.java
 ---
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetSocketAddress;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Provides access to transient BLOB files stored at the {@link 
BlobServer}.
+ *
+ * TODO: currently, this is still cache-based with local copies - make 
this truly transient, i.e. return file streams with no local copy
+ */
+public class TransientBlobCache implements TransientBlobService {
+
+   /** The log object used for debugging. */
+   private static final Logger LOG = 
LoggerFactory.getLogger(TransientBlobCache.class);
+
+   /** Counter to generate unique names for temporary files. */
+   private final AtomicLong tempFileCounter = new AtomicLong(0);
+
+   private final InetSocketAddress serverAddress;
+
+   /**
+* Root directory for local file storage
+*/
+   private final File storageDir;
+
+   private final AtomicBoolean shutdownRequested = new AtomicBoolean();
+
+   /** Shutdown hook thread to ensure deletion of the local storage 
directory. */
+   private final Thread shutdownHook;
+
+   /** The number of retries when the transfer fails */
+   private final int numFetchRetries;
+
+   /** Configuration for the blob client like ssl parameters required to 
connect to the blob server */
+   private final Configuration blobClientConfig;
+
+   /** Lock guarding concurrent file accesses */
+   private final ReadWriteLock readWriteLock;
+
+   /**
+* Instantiates a new BLOB cache.
+*
+* @param serverAddress
+*  address of the {@link BlobServer} to use for fetching 
files from
+* @param blobClientConfig
+*  global configuration
+*
+* @throws IOException
+*  thrown if the (local or distributed) file storage 
cannot be created or is not usable
+*/
+   public TransientBlobCache(
+   final InetSocketAddress serverAddress,
+   final Configuration blobClientConfig) throws 
IOException {
+
+   this.serverAddress = checkNotNull(serverAddress);
+   this.blobClientConfig = checkNotNull(blobClientConfig);
+   this.readWriteLock = new ReentrantReadWriteLock();
+
+   // configure and create the storage directory
+   String storageDirectory = 
blobClientConfig.getString(BlobServerOptions.STORAGE_DIRECTORY);
+   this.storageDir = 
BlobUtils.initLocalStorageDirectory(storageDirectory);
+   LOG.info("Created transient BLOB cache storage directory " + 
storageDir);
+
+   // configure the number of fetch retries
+  

[jira] [Assigned] (FLINK-7608) LatencyGauge change to histogram metric

2017-09-20 Thread Hai Zhou_UTC+8 (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hai Zhou_UTC+8 reassigned FLINK-7608:
-

Assignee: (was: Hai Zhou_UTC+8)

> LatencyGauge change to  histogram metric
> 
>
> Key: FLINK-7608
> URL: https://issues.apache.org/jira/browse/FLINK-7608
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Reporter: Hai Zhou_UTC+8
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
>
> I used slf4jReporter[https://issues.apache.org/jira/browse/FLINK-4831]  to 
> export metrics the log file.
> I found:
> {noformat}
> -- Gauges 
> -
> ..
> zhouhai-mbp.taskmanager.f3fd3a269c8c3da4e8319c8f6a201a57.Flink Streaming 
> Job.Map.0.latency:
>  value={LatencySourceDescriptor{vertexID=1, subtaskIndex=-1}={p99=116.0, 
> p50=59.5, min=11.0, max=116.0, p95=116.0, mean=61.836}}
> zhouhai-mbp.taskmanager.f3fd3a269c8c3da4e8319c8f6a201a57.Flink Streaming 
> Job.Sink- Unnamed.0.latency: 
> value={LatencySourceDescriptor{vertexID=1, subtaskIndex=0}={p99=195.0, 
> p50=163.5, min=115.0, max=195.0, p95=195.0, mean=161.0}}
> ..
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7068) change BlobService sub-classes for permanent and transient BLOBs

2017-09-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16173176#comment-16173176
 ] 

ASF GitHub Bot commented on FLINK-7068:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4358#discussion_r139968617
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobCache.java
 ---
@@ -0,0 +1,429 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Provides a cache for permanent BLOB files including a per-job 
ref-counting and a staged cleanup.
+ * 
+ * When requesting BLOBs via {@link #getHAFile(JobID, BlobKey)}, the cache 
will first attempt to
+ * serve the file from its local cache. Only if the local cache does not 
contain the desired BLOB,
+ * it will try to download it from a distributed HA file system (if 
available) or the BLOB server.
+ * 
+ * If files for a job are not needed any more, they will enter a staged, 
i.e. deferred, cleanup.
+ * Files may thus still be be accessible upon recovery and do not need to 
be re-downloaded.
+ */
+public class PermanentBlobCache extends TimerTask implements 
PermanentBlobService {
+
+   /** The log object used for debugging. */
+   private static final Logger LOG = 
LoggerFactory.getLogger(PermanentBlobCache.class);
+
+   /** Counter to generate unique names for temporary files. */
+   private final AtomicLong tempFileCounter = new AtomicLong(0);
+
+   private final InetSocketAddress serverAddress;
+
+   /** Root directory for local file storage */
+   private final File storageDir;
+
+   /** Blob store for distributed file storage, e.g. in HA */
+   private final BlobView blobView;
+
+   private final AtomicBoolean shutdownRequested = new AtomicBoolean();
+
+   /** Shutdown hook thread to ensure deletion of the storage directory. */
+   private final Thread shutdownHook;
+
+   /** The number of retries when the transfer fails */
+   private final int numFetchRetries;
+
+   /** Configuration for the blob client like ssl parameters required to 
connect to the blob server */
+   private final Configuration blobClientConfig;
+
+   /** Lock guarding concurrent file accesses */
+   private final ReadWriteLock readWriteLock;
+
+   // 

+
+   /**
+* Job reference counters with a time-to-live (TTL).
+*/
+   @VisibleForTesting
+   static class RefCount {
+   /**
+* Number of references to a job.
+*/
+   public int references = 0;
+
+   /**
+* Timestamp in milliseconds when any job data should be 
cleaned up (no cleanup for
+* non-positive values).
+*/
+   public long keepUntil = -1;
+ 

[jira] [Commented] (FLINK-7068) change BlobService sub-classes for permanent and transient BLOBs

2017-09-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16173179#comment-16173179
 ] 

ASF GitHub Bot commented on FLINK-7068:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4358#discussion_r139969076
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/TransientBlobCache.java
 ---
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetSocketAddress;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Provides access to transient BLOB files stored at the {@link 
BlobServer}.
+ *
+ * TODO: currently, this is still cache-based with local copies - make 
this truly transient, i.e. return file streams with no local copy
+ */
+public class TransientBlobCache implements TransientBlobService {
+
+   /** The log object used for debugging. */
+   private static final Logger LOG = 
LoggerFactory.getLogger(TransientBlobCache.class);
+
+   /** Counter to generate unique names for temporary files. */
+   private final AtomicLong tempFileCounter = new AtomicLong(0);
+
+   private final InetSocketAddress serverAddress;
+
+   /**
+* Root directory for local file storage
+*/
+   private final File storageDir;
+
+   private final AtomicBoolean shutdownRequested = new AtomicBoolean();
+
+   /** Shutdown hook thread to ensure deletion of the local storage 
directory. */
+   private final Thread shutdownHook;
+
+   /** The number of retries when the transfer fails */
+   private final int numFetchRetries;
+
+   /** Configuration for the blob client like ssl parameters required to 
connect to the blob server */
+   private final Configuration blobClientConfig;
+
+   /** Lock guarding concurrent file accesses */
+   private final ReadWriteLock readWriteLock;
+
+   /**
+* Instantiates a new BLOB cache.
+*
+* @param serverAddress
+*  address of the {@link BlobServer} to use for fetching 
files from
+* @param blobClientConfig
+*  global configuration
+*
+* @throws IOException
+*  thrown if the (local or distributed) file storage 
cannot be created or is not usable
+*/
+   public TransientBlobCache(
+   final InetSocketAddress serverAddress,
+   final Configuration blobClientConfig) throws 
IOException {
--- End diff --

Yes, that's mostly why. And passing all parameters that the `BlobClient` 
requires is also not too nice, cumbersome and error-prone.


> change BlobService sub-classes for permanent and transient BLOBs
> 
>
> Key: FLINK-7068
> URL: https://issues.apache.org/jira/browse/FLINK-7068
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> A 

[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...

2017-09-20 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4358#discussion_r139969076
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/TransientBlobCache.java
 ---
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetSocketAddress;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Provides access to transient BLOB files stored at the {@link 
BlobServer}.
+ *
+ * TODO: currently, this is still cache-based with local copies - make 
this truly transient, i.e. return file streams with no local copy
+ */
+public class TransientBlobCache implements TransientBlobService {
+
+   /** The log object used for debugging. */
+   private static final Logger LOG = 
LoggerFactory.getLogger(TransientBlobCache.class);
+
+   /** Counter to generate unique names for temporary files. */
+   private final AtomicLong tempFileCounter = new AtomicLong(0);
+
+   private final InetSocketAddress serverAddress;
+
+   /**
+* Root directory for local file storage
+*/
+   private final File storageDir;
+
+   private final AtomicBoolean shutdownRequested = new AtomicBoolean();
+
+   /** Shutdown hook thread to ensure deletion of the local storage 
directory. */
+   private final Thread shutdownHook;
+
+   /** The number of retries when the transfer fails */
+   private final int numFetchRetries;
+
+   /** Configuration for the blob client like ssl parameters required to 
connect to the blob server */
+   private final Configuration blobClientConfig;
+
+   /** Lock guarding concurrent file accesses */
+   private final ReadWriteLock readWriteLock;
+
+   /**
+* Instantiates a new BLOB cache.
+*
+* @param serverAddress
+*  address of the {@link BlobServer} to use for fetching 
files from
+* @param blobClientConfig
+*  global configuration
+*
+* @throws IOException
+*  thrown if the (local or distributed) file storage 
cannot be created or is not usable
+*/
+   public TransientBlobCache(
+   final InetSocketAddress serverAddress,
+   final Configuration blobClientConfig) throws 
IOException {
--- End diff --

Yes, that's mostly why. And passing all parameters that the `BlobClient` 
requires is also not too nice, cumbersome and error-prone.


---


[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...

2017-09-20 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4358#discussion_r139968617
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobCache.java
 ---
@@ -0,0 +1,429 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Provides a cache for permanent BLOB files including a per-job 
ref-counting and a staged cleanup.
+ * 
+ * When requesting BLOBs via {@link #getHAFile(JobID, BlobKey)}, the cache 
will first attempt to
+ * serve the file from its local cache. Only if the local cache does not 
contain the desired BLOB,
+ * it will try to download it from a distributed HA file system (if 
available) or the BLOB server.
+ * 
+ * If files for a job are not needed any more, they will enter a staged, 
i.e. deferred, cleanup.
+ * Files may thus still be be accessible upon recovery and do not need to 
be re-downloaded.
+ */
+public class PermanentBlobCache extends TimerTask implements 
PermanentBlobService {
+
+   /** The log object used for debugging. */
+   private static final Logger LOG = 
LoggerFactory.getLogger(PermanentBlobCache.class);
+
+   /** Counter to generate unique names for temporary files. */
+   private final AtomicLong tempFileCounter = new AtomicLong(0);
+
+   private final InetSocketAddress serverAddress;
+
+   /** Root directory for local file storage */
+   private final File storageDir;
+
+   /** Blob store for distributed file storage, e.g. in HA */
+   private final BlobView blobView;
+
+   private final AtomicBoolean shutdownRequested = new AtomicBoolean();
+
+   /** Shutdown hook thread to ensure deletion of the storage directory. */
+   private final Thread shutdownHook;
+
+   /** The number of retries when the transfer fails */
+   private final int numFetchRetries;
+
+   /** Configuration for the blob client like ssl parameters required to 
connect to the blob server */
+   private final Configuration blobClientConfig;
+
+   /** Lock guarding concurrent file accesses */
+   private final ReadWriteLock readWriteLock;
+
+   // 

+
+   /**
+* Job reference counters with a time-to-live (TTL).
+*/
+   @VisibleForTesting
+   static class RefCount {
+   /**
+* Number of references to a job.
+*/
+   public int references = 0;
+
+   /**
+* Timestamp in milliseconds when any job data should be 
cleaned up (no cleanup for
+* non-positive values).
+*/
+   public long keepUntil = -1;
+   }
+
+   /** Map to store the number of references to a specific job */
+   private final Map jobRefCounters = new HashMap<>();
+
+   /** Time interval (ms) to run the cleanup task; also used as the 

[jira] [Commented] (FLINK-7068) change BlobService sub-classes for permanent and transient BLOBs

2017-09-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16173169#comment-16173169
 ] 

ASF GitHub Bot commented on FLINK-7068:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4358#discussion_r139966738
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobCache.java
 ---
@@ -0,0 +1,429 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Provides a cache for permanent BLOB files including a per-job 
ref-counting and a staged cleanup.
+ * 
+ * When requesting BLOBs via {@link #getHAFile(JobID, BlobKey)}, the cache 
will first attempt to
+ * serve the file from its local cache. Only if the local cache does not 
contain the desired BLOB,
+ * it will try to download it from a distributed HA file system (if 
available) or the BLOB server.
+ * 
+ * If files for a job are not needed any more, they will enter a staged, 
i.e. deferred, cleanup.
+ * Files may thus still be be accessible upon recovery and do not need to 
be re-downloaded.
+ */
+public class PermanentBlobCache extends TimerTask implements 
PermanentBlobService {
--- End diff --

The latter is not possible since each class has its uniquely offered 
methods, e.g. `TransientBlobCache` allows deleting BLOBs, while 
`PermanentBlobCache` does not. I can come up with a common base class though.


> change BlobService sub-classes for permanent and transient BLOBs
> 
>
> Key: FLINK-7068
> URL: https://issues.apache.org/jira/browse/FLINK-7068
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> A {{PermanentBlobStore}} should resemble use cases for BLOBs that are 
> permanently stored for a job's life time (HA and non-HA).
> A {{TransientBlobStore}} should reflect BLOB offloading for logs, RPC, etc. 
> which even does not have to be reflected by files.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...

2017-09-20 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4358#discussion_r139966738
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobCache.java
 ---
@@ -0,0 +1,429 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Provides a cache for permanent BLOB files including a per-job 
ref-counting and a staged cleanup.
+ * 
+ * When requesting BLOBs via {@link #getHAFile(JobID, BlobKey)}, the cache 
will first attempt to
+ * serve the file from its local cache. Only if the local cache does not 
contain the desired BLOB,
+ * it will try to download it from a distributed HA file system (if 
available) or the BLOB server.
+ * 
+ * If files for a job are not needed any more, they will enter a staged, 
i.e. deferred, cleanup.
+ * Files may thus still be be accessible upon recovery and do not need to 
be re-downloaded.
+ */
+public class PermanentBlobCache extends TimerTask implements 
PermanentBlobService {
--- End diff --

The latter is not possible since each class has its uniquely offered 
methods, e.g. `TransientBlobCache` allows deleting BLOBs, while 
`PermanentBlobCache` does not. I can come up with a common base class though.


---


[jira] [Commented] (FLINK-7552) Extend SinkFunction interface with SinkContext

2017-09-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16173158#comment-16173158
 ] 

ASF GitHub Bot commented on FLINK-7552:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/4616
  
+1 from my side to merge it like this (using boxed long, dropping the 
`hasTimestamp()` method.

My reasoning is that this is consistent with `ProcessFunction` (like 
@EronWright said), it is also not in a hot loop and lazily created, so I do not 
expect a big performance hit.

@aljoscha and me talked a lot about the whole design of when records have 
timestamps and whether in the future we should just assume that records always 
have timestamps. That biased my towards dropping the `hasTimestamp()` method.


> Extend SinkFunction interface with SinkContext
> --
>
> Key: FLINK-7552
> URL: https://issues.apache.org/jira/browse/FLINK-7552
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 1.4.0
>
>
> Now that we require Java 8 we can extend the {{SinkFunction}} interface 
> without breaking backwards compatibility. I'm proposing this:
> {code}
> /**
>  * Interface for implementing user defined sink functionality.
>  *
>  * @param  Input type parameter.
>  */
> @Public
> public interface SinkFunction extends Function, Serializable {
>   /**
>* Function for standard sink behaviour. This function is called for 
> every record.
>*
>* @param value The input record.
>* @throws Exception
>* @deprecated Use {@link #invoke(SinkContext, Object)}.
>*/
>   @Deprecated
>   default void invoke(IN value) throws Exception {
>   }
>   /**
>* Writes the given value to the sink. This function is called for 
> every record.
>*
>* @param context Additional context about the input record.
>* @param value The input record.
>* @throws Exception
>*/
>   default void invoke(SinkContext context, IN value) throws Exception {
>   invoke(value);
>   }
>   /**
>* Context that {@link SinkFunction SinkFunctions } can use for getting 
> additional data about
>* an input record.
>*
>* @param  The type of elements accepted by the sink.
>*/
>   @Public // Interface might be extended in the future with additional 
> methods.
>   interface SinkContext {
>   /**
>* Returns the timestamp of the current input record.
>*/
>   long timestamp();
>   }
> }
> {code}
> For now, this only allows access to the element timestamp. This would allow 
> us to fix the abomination that is {{FlinkKafkaProducer010}}, which is a 
> hybrid {{SinkFunction}}/{{StreamOperator}} only because it needs access to 
> timestamps.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4572: [Flink-7243] [connectors] Add ParquetInputFormat

2017-09-20 Thread mustafaakin
Github user mustafaakin commented on the issue:

https://github.com/apache/flink/pull/4572
  
Any inputs on this? Would be much useful and I can get rid of Drill / Presto


---


  1   2   3   >