[jira] [Assigned] (FLINK-7076) Implement container release to support dynamic scaling

2017-08-02 Thread yuemeng (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuemeng reassigned FLINK-7076:
--

Assignee: yuemeng

> Implement container release to support dynamic scaling
> --
>
> Key: FLINK-7076
> URL: https://issues.apache.org/jira/browse/FLINK-7076
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Reporter: Till Rohrmann
>Assignee: yuemeng
>  Labels: flip-6
>
> In order to support dynamic scaling, the {{YarnResourceManager}} has to be 
> able to dynamically free containers. We have to implement the 
> {{YarnResourceManager#stopWorker}} method.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling

2017-08-02 Thread yuemeng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16112117#comment-16112117
 ] 

yuemeng commented on FLINK-7076:


[~till.rohrmann]
To support dynamic scaling,for my understand. we need to know when the new 
container to be allocate from RM,and when to release container which marked as 
free.
this issue to solve how to define a free container(TM) and release it from RM.

> Implement container release to support dynamic scaling
> --
>
> Key: FLINK-7076
> URL: https://issues.apache.org/jira/browse/FLINK-7076
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Reporter: Till Rohrmann
>Assignee: yuemeng
>  Labels: flip-6
>
> In order to support dynamic scaling, the {{YarnResourceManager}} has to be 
> able to dynamically free containers. We have to implement the 
> {{YarnResourceManager#stopWorker}} method.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-7309) NullPointerException in CodeGenUtils.timePointToInternalCode() generated code

2017-08-03 Thread yuemeng (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7309?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuemeng reassigned FLINK-7309:
--

Assignee: Sihua Zhou

> NullPointerException in CodeGenUtils.timePointToInternalCode() generated code
> -
>
> Key: FLINK-7309
> URL: https://issues.apache.org/jira/browse/FLINK-7309
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.3.1
>Reporter: Liangliang Chen
>Assignee: Sihua Zhou
>Priority: Critical
>
> The code generated by CodeGenUtils.timePointToInternalCode() will cause a 
> NullPointerException when SQL table field type is `TIMESTAMP` and the field 
> value is `null`.
> Example for reproduce:
> {code}
> object StreamSQLExample {
>   def main(args: Array[String]): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val tEnv = TableEnvironment.getTableEnvironment(env)
> // null field value
> val orderA: DataStream[Order] = env.fromCollection(Seq(
>   Order(null, "beer", 3)))
>   
> tEnv.registerDataStream("OrderA", orderA, 'ts, 'product, 'amount)
> val result = tEnv.sql("SELECT * FROM OrderA")
> result.toAppendStream[Order].print()
> 
> env.execute()
>   }
>   case class Order(ts: Timestamp, product: String, amount: Int)
> }
> {code}
> In the above example, timePointToInternalCode() will generated some 
> statements like this:
> {code}
> ...
>   long result$1 = 
> org.apache.calcite.runtime.SqlFunctions.toLong((java.sql.Timestamp) in1.ts());
>   boolean isNull$2 = (java.sql.Timestamp) in1.ts() == null;
> ...
> {code}
> so, the NPE will happen when in1.ts() is null.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-7309) NullPointerException in CodeGenUtils.timePointToInternalCode() generated code

2017-08-03 Thread yuemeng (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7309?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuemeng reassigned FLINK-7309:
--

Assignee: yuemeng  (was: Sihua Zhou)

> NullPointerException in CodeGenUtils.timePointToInternalCode() generated code
> -
>
> Key: FLINK-7309
> URL: https://issues.apache.org/jira/browse/FLINK-7309
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.3.1
>Reporter: Liangliang Chen
>Assignee: yuemeng
>Priority: Critical
>
> The code generated by CodeGenUtils.timePointToInternalCode() will cause a 
> NullPointerException when SQL table field type is `TIMESTAMP` and the field 
> value is `null`.
> Example for reproduce:
> {code}
> object StreamSQLExample {
>   def main(args: Array[String]): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val tEnv = TableEnvironment.getTableEnvironment(env)
> // null field value
> val orderA: DataStream[Order] = env.fromCollection(Seq(
>   Order(null, "beer", 3)))
>   
> tEnv.registerDataStream("OrderA", orderA, 'ts, 'product, 'amount)
> val result = tEnv.sql("SELECT * FROM OrderA")
> result.toAppendStream[Order].print()
> 
> env.execute()
>   }
>   case class Order(ts: Timestamp, product: String, amount: Int)
> }
> {code}
> In the above example, timePointToInternalCode() will generated some 
> statements like this:
> {code}
> ...
>   long result$1 = 
> org.apache.calcite.runtime.SqlFunctions.toLong((java.sql.Timestamp) in1.ts());
>   boolean isNull$2 = (java.sql.Timestamp) in1.ts() == null;
> ...
> {code}
> so, the NPE will happen when in1.ts() is null.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-7309) NullPointerException in CodeGenUtils.timePointToInternalCode() generated code

2017-08-03 Thread yuemeng (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7309?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuemeng reassigned FLINK-7309:
--

Assignee: (was: yuemeng)

> NullPointerException in CodeGenUtils.timePointToInternalCode() generated code
> -
>
> Key: FLINK-7309
> URL: https://issues.apache.org/jira/browse/FLINK-7309
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.3.1
>Reporter: Liangliang Chen
>Priority: Critical
>
> The code generated by CodeGenUtils.timePointToInternalCode() will cause a 
> NullPointerException when SQL table field type is `TIMESTAMP` and the field 
> value is `null`.
> Example for reproduce:
> {code}
> object StreamSQLExample {
>   def main(args: Array[String]): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val tEnv = TableEnvironment.getTableEnvironment(env)
> // null field value
> val orderA: DataStream[Order] = env.fromCollection(Seq(
>   Order(null, "beer", 3)))
>   
> tEnv.registerDataStream("OrderA", orderA, 'ts, 'product, 'amount)
> val result = tEnv.sql("SELECT * FROM OrderA")
> result.toAppendStream[Order].print()
> 
> env.execute()
>   }
>   case class Order(ts: Timestamp, product: String, amount: Int)
> }
> {code}
> In the above example, timePointToInternalCode() will generated some 
> statements like this:
> {code}
> ...
>   long result$1 = 
> org.apache.calcite.runtime.SqlFunctions.toLong((java.sql.Timestamp) in1.ts());
>   boolean isNull$2 = (java.sql.Timestamp) in1.ts() == null;
> ...
> {code}
> so, the NPE will happen when in1.ts() is null.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-11057) where in grammar will cause stream inner join loigcal

2018-12-03 Thread yuemeng (JIRA)
yuemeng created FLINK-11057:
---

 Summary: where in grammar will cause stream inner join loigcal
 Key: FLINK-11057
 URL: https://issues.apache.org/jira/browse/FLINK-11057
 Project: Flink
  Issue Type: Bug
Reporter: yuemeng


{code}

select action , count ( * ) as cnt from user_action where action in ( 'view' , 
'impress' , 'sysaction' , 'commentimpress' , 'play' , 'click' , 'page' , 
'abtestreqsuss' , 'bannerimpress' , 'abtestserver' , 'active' , 'search' , 
'activeclient' , 'like' , 'zan' , 'adclick' , 'login' , 'comment' , 
'subscribeartist' , 'subscribevideo' , 'subscribedjradio' , 'share' , 'private' 
, 'register' , 'downloadall' , 'forward' , 'newdj' , 'recommendimpress' , 
'hotkeywordimpress' , 'nogetad' , 'add' , 'subscribe' , 'follow' , 'new' ) 
group by tumble ( proctime , interval '60' SECOND ) , action

{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11057) where in grammar will cause stream inner join loigcal

2018-12-03 Thread yuemeng (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11057?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuemeng updated FLINK-11057:

Description: 
{code:java}
select action , count ( * ) as cnt from user_action where action in ( 'view' , 
'impress' , 'sysaction' , 'commentimpress' , 'play' , 'click' , 'page' , 
'abtestreqsuss' , 'bannerimpress' , 'abtestserver' , 'active' , 'search' , 
'activeclient' , 'like' , 'zan' , 'adclick' , 'login' , 'comment' , 
'subscribeartist' , 'subscribevideo' , 'subscribedjradio' , 'share' , 'private' 
, 'register' , 'downloadall' , 'forward' , 'newdj' , 'recommendimpress' , 
'hotkeywordimpress' , 'nogetad' , 'add' , 'subscribe' , 'follow' , 'new' ) 
group by tumble ( proctime , interval '60' SECOND ) , action
{code}

sql such as this will be cause a stream inner join logical.
but if i reduce the the element of parentheses,it can't cause the stream inner 
join logical


  was:
{code}

select action , count ( * ) as cnt from user_action where action in ( 'view' , 
'impress' , 'sysaction' , 'commentimpress' , 'play' , 'click' , 'page' , 
'abtestreqsuss' , 'bannerimpress' , 'abtestserver' , 'active' , 'search' , 
'activeclient' , 'like' , 'zan' , 'adclick' , 'login' , 'comment' , 
'subscribeartist' , 'subscribevideo' , 'subscribedjradio' , 'share' , 'private' 
, 'register' , 'downloadall' , 'forward' , 'newdj' , 'recommendimpress' , 
'hotkeywordimpress' , 'nogetad' , 'add' , 'subscribe' , 'follow' , 'new' ) 
group by tumble ( proctime , interval '60' SECOND ) , action

{code}


> where in grammar will cause stream inner join loigcal
> -
>
> Key: FLINK-11057
> URL: https://issues.apache.org/jira/browse/FLINK-11057
> Project: Flink
>  Issue Type: Bug
>Reporter: yuemeng
>Priority: Critical
>
> {code:java}
> select action , count ( * ) as cnt from user_action where action in ( 'view' 
> , 'impress' , 'sysaction' , 'commentimpress' , 'play' , 'click' , 'page' , 
> 'abtestreqsuss' , 'bannerimpress' , 'abtestserver' , 'active' , 'search' , 
> 'activeclient' , 'like' , 'zan' , 'adclick' , 'login' , 'comment' , 
> 'subscribeartist' , 'subscribevideo' , 'subscribedjradio' , 'share' , 
> 'private' , 'register' , 'downloadall' , 'forward' , 'newdj' , 
> 'recommendimpress' , 'hotkeywordimpress' , 'nogetad' , 'add' , 'subscribe' , 
> 'follow' , 'new' ) group by tumble ( proctime , interval '60' SECOND ) , 
> action
> {code}
> sql such as this will be cause a stream inner join logical.
> but if i reduce the the element of parentheses,it can't cause the stream 
> inner join logical



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11057) where in grammar will cause stream inner join loigcal

2018-12-03 Thread yuemeng (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11057?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuemeng updated FLINK-11057:

Description: 
{code:java}
select action , count ( * ) as cnt from user_action where action in ( 'view' , 
'impress' , 'sysaction' , 'commentimpress' , 'play' , 'click' , 'page' , 
'abtestreqsuss' , 'bannerimpress' , 'abtestserver' , 'active' , 'search' , 
'activeclient' , 'like' , 'zan' , 'adclick' , 'login' , 'comment' , 
'subscribeartist' , 'subscribevideo' , 'subscribedjradio' , 'share' , 'private' 
, 'register' , 'downloadall' , 'forward' , 'newdj' , 'recommendimpress' , 
'hotkeywordimpress' , 'nogetad' , 'add' , 'subscribe' , 'follow' , 'new' ) 
group by tumble ( proctime , interval '60' SECOND ) , action
{code}

sql such as this will be cause a stream inner join logical.
but if i reduce the element of parentheses,it can't cause the stream inner join 
logical


  was:
{code:java}
select action , count ( * ) as cnt from user_action where action in ( 'view' , 
'impress' , 'sysaction' , 'commentimpress' , 'play' , 'click' , 'page' , 
'abtestreqsuss' , 'bannerimpress' , 'abtestserver' , 'active' , 'search' , 
'activeclient' , 'like' , 'zan' , 'adclick' , 'login' , 'comment' , 
'subscribeartist' , 'subscribevideo' , 'subscribedjradio' , 'share' , 'private' 
, 'register' , 'downloadall' , 'forward' , 'newdj' , 'recommendimpress' , 
'hotkeywordimpress' , 'nogetad' , 'add' , 'subscribe' , 'follow' , 'new' ) 
group by tumble ( proctime , interval '60' SECOND ) , action
{code}

sql such as this will be cause a stream inner join logical.
but if i reduce the the element of parentheses,it can't cause the stream inner 
join logical



> where in grammar will cause stream inner join loigcal
> -
>
> Key: FLINK-11057
> URL: https://issues.apache.org/jira/browse/FLINK-11057
> Project: Flink
>  Issue Type: Bug
>Reporter: yuemeng
>Priority: Critical
>
> {code:java}
> select action , count ( * ) as cnt from user_action where action in ( 'view' 
> , 'impress' , 'sysaction' , 'commentimpress' , 'play' , 'click' , 'page' , 
> 'abtestreqsuss' , 'bannerimpress' , 'abtestserver' , 'active' , 'search' , 
> 'activeclient' , 'like' , 'zan' , 'adclick' , 'login' , 'comment' , 
> 'subscribeartist' , 'subscribevideo' , 'subscribedjradio' , 'share' , 
> 'private' , 'register' , 'downloadall' , 'forward' , 'newdj' , 
> 'recommendimpress' , 'hotkeywordimpress' , 'nogetad' , 'add' , 'subscribe' , 
> 'follow' , 'new' ) group by tumble ( proctime , interval '60' SECOND ) , 
> action
> {code}
> sql such as this will be cause a stream inner join logical.
> but if i reduce the element of parentheses,it can't cause the stream inner 
> join logical



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11057) where in grammar will cause stream inner join loigcal

2018-12-04 Thread yuemeng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16708480#comment-16708480
 ] 

yuemeng commented on FLINK-11057:
-

[~twalthr]
thanks for your reply,FLINK-10474 can resolve this issue

> where in grammar will cause stream inner join loigcal
> -
>
> Key: FLINK-11057
> URL: https://issues.apache.org/jira/browse/FLINK-11057
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: yuemeng
>Priority: Critical
>
> {code:java}
> select action , count ( * ) as cnt from user_action where action in ( 'view' 
> , 'impress' , 'sysaction' , 'commentimpress' , 'play' , 'click' , 'page' , 
> 'abtestreqsuss' , 'bannerimpress' , 'abtestserver' , 'active' , 'search' , 
> 'activeclient' , 'like' , 'zan' , 'adclick' , 'login' , 'comment' , 
> 'subscribeartist' , 'subscribevideo' , 'subscribedjradio' , 'share' , 
> 'private' , 'register' , 'downloadall' , 'forward' , 'newdj' , 
> 'recommendimpress' , 'hotkeywordimpress' , 'nogetad' , 'add' , 'subscribe' , 
> 'follow' , 'new' ) group by tumble ( proctime , interval '60' SECOND ) , 
> action
> {code}
> sql such as this will be cause a stream inner join logical.
> but if i reduce the element of parentheses,it can't cause the stream inner 
> join logical



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-6065) Make TransportClient for ES5 pluggable

2018-12-07 Thread yuemeng (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-6065?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuemeng reassigned FLINK-6065:
--

Assignee: yuemeng

> Make TransportClient for ES5 pluggable
> --
>
> Key: FLINK-6065
> URL: https://issues.apache.org/jira/browse/FLINK-6065
> Project: Flink
>  Issue Type: Improvement
>  Components: ElasticSearch Connector, Streaming Connectors
>Reporter: Robert Metzger
>Assignee: yuemeng
>Priority: Major
>  Labels: pull-request-available
>
> This JIRA is based on a user request: 
> http://stackoverflow.com/questions/42807454/flink-xpack-elasticsearch-5-elasticsearchsecurityexception-missing-autentication?noredirect=1#comment72728053_42807454
> Currently, in the {{Elasticsearch5ApiCallBridge}} the 
> {{PreBuiltTransportClient}} is hardcoded. It would be nice to make this 
> client pluggable to allow using other clients such as the 
> {{PreBuiltXPackTransportClient}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-4827) Sql on streaming example use scala with wrong variable name

2016-10-13 Thread yuemeng (JIRA)
yuemeng created FLINK-4827:
--

 Summary: Sql on streaming example use scala with wrong variable 
name
 Key: FLINK-4827
 URL: https://issues.apache.org/jira/browse/FLINK-4827
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Affects Versions: 1.1.2, 1.1.0
Reporter: yuemeng
Priority: Minor
 Fix For: 1.1.3


val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)

// read a DataStream from an external source
val ds: DataStream[(Long, String, Integer)] = env.addSource(...)
// register the DataStream under the name "Orders"
tableEnv.registerDataStream("Orders", ds, 'user, 'product, 'amount)
// run a SQL query on the Table and retrieve the result as a new Table
val result = tableEnv.sql(
  "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")


There is no variable named tableEnv defined here,it's tEnv defined here



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4827) Sql on streaming example use scala with wrong variable name

2016-10-13 Thread yuemeng (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4827?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuemeng updated FLINK-4827:
---
Description: 
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)

// read a DataStream from an external source
val ds: DataStream[(Long, String, Integer)] = env.addSource(...)
// register the DataStream under the name "Orders"
tableEnv.registerDataStream("Orders", ds, 'user, 'product, 'amount)
// run a SQL query on the Table and retrieve the result as a new Table
val result = tableEnv.sql(
  "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")


There is no variable named tableEnv defined here,only tEnv defined here

  was:
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)

// read a DataStream from an external source
val ds: DataStream[(Long, String, Integer)] = env.addSource(...)
// register the DataStream under the name "Orders"
tableEnv.registerDataStream("Orders", ds, 'user, 'product, 'amount)
// run a SQL query on the Table and retrieve the result as a new Table
val result = tableEnv.sql(
  "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")


There is no variable named tableEnv defined here,it's tEnv defined here


> Sql on streaming example use scala with wrong variable name
> ---
>
> Key: FLINK-4827
> URL: https://issues.apache.org/jira/browse/FLINK-4827
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.1.0, 1.1.2
>Reporter: yuemeng
>Priority: Minor
> Fix For: 1.1.3
>
>
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val tEnv = TableEnvironment.getTableEnvironment(env)
> // read a DataStream from an external source
> val ds: DataStream[(Long, String, Integer)] = env.addSource(...)
> // register the DataStream under the name "Orders"
> tableEnv.registerDataStream("Orders", ds, 'user, 'product, 'amount)
> // run a SQL query on the Table and retrieve the result as a new Table
> val result = tableEnv.sql(
>   "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
> There is no variable named tableEnv defined here,only tEnv defined here



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4827) The example use scala of SQL on Streaming Tables with wrong variable name

2016-10-13 Thread yuemeng (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4827?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuemeng updated FLINK-4827:
---
Summary: The example use scala of SQL on Streaming Tables  with wrong 
variable name  (was: Sql on streaming example use scala with wrong variable 
name)

> The example use scala of SQL on Streaming Tables  with wrong variable name
> --
>
> Key: FLINK-4827
> URL: https://issues.apache.org/jira/browse/FLINK-4827
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.1.0, 1.1.2
>Reporter: yuemeng
>Priority: Minor
> Fix For: 1.1.3
>
>
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val tEnv = TableEnvironment.getTableEnvironment(env)
> // read a DataStream from an external source
> val ds: DataStream[(Long, String, Integer)] = env.addSource(...)
> // register the DataStream under the name "Orders"
> tableEnv.registerDataStream("Orders", ds, 'user, 'product, 'amount)
> // run a SQL query on the Table and retrieve the result as a new Table
> val result = tableEnv.sql(
>   "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
> There is no variable named tableEnv defined here,only tEnv defined here



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4827) The code example use scala of SQL on Streaming Tables with wrong variable name in flink document

2016-10-13 Thread yuemeng (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4827?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuemeng updated FLINK-4827:
---
Summary: The code example use scala of SQL on Streaming Tables  with wrong 
variable name in flink document  (was: The example use scala of SQL on 
Streaming Tables  with wrong variable name)

> The code example use scala of SQL on Streaming Tables  with wrong variable 
> name in flink document
> -
>
> Key: FLINK-4827
> URL: https://issues.apache.org/jira/browse/FLINK-4827
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.1.0, 1.1.2
>Reporter: yuemeng
>Priority: Minor
> Fix For: 1.1.3
>
>
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val tEnv = TableEnvironment.getTableEnvironment(env)
> // read a DataStream from an external source
> val ds: DataStream[(Long, String, Integer)] = env.addSource(...)
> // register the DataStream under the name "Orders"
> tableEnv.registerDataStream("Orders", ds, 'user, 'product, 'amount)
> // run a SQL query on the Table and retrieve the result as a new Table
> val result = tableEnv.sql(
>   "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
> There is no variable named tableEnv defined here,only tEnv defined here



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4827) The code example use scala of SQL on Streaming Tables with wrong variable name in flink document

2016-10-13 Thread yuemeng (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4827?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuemeng updated FLINK-4827:
---
Description: 
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)

// read a DataStream from an external source
val ds: DataStream[(Long, String, Integer)] = env.addSource(...)
// register the DataStream under the name "Orders"
tableEnv.registerDataStream("Orders", ds, 'user, 'product, 'amount)
// run a SQL query on the Table and retrieve the result as a new Table
val result = tableEnv.sql(
  "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")


There is no variable named tableEnv had defined ,Only tEnv defined here

  was:
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)

// read a DataStream from an external source
val ds: DataStream[(Long, String, Integer)] = env.addSource(...)
// register the DataStream under the name "Orders"
tableEnv.registerDataStream("Orders", ds, 'user, 'product, 'amount)
// run a SQL query on the Table and retrieve the result as a new Table
val result = tableEnv.sql(
  "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")


There is no variable named tableEnv had defined ,only tEnv defined here


> The code example use scala of SQL on Streaming Tables  with wrong variable 
> name in flink document
> -
>
> Key: FLINK-4827
> URL: https://issues.apache.org/jira/browse/FLINK-4827
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.1.0, 1.1.2
>Reporter: yuemeng
>Priority: Minor
> Fix For: 1.1.3
>
>
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val tEnv = TableEnvironment.getTableEnvironment(env)
> // read a DataStream from an external source
> val ds: DataStream[(Long, String, Integer)] = env.addSource(...)
> // register the DataStream under the name "Orders"
> tableEnv.registerDataStream("Orders", ds, 'user, 'product, 'amount)
> // run a SQL query on the Table and retrieve the result as a new Table
> val result = tableEnv.sql(
>   "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
> There is no variable named tableEnv had defined ,Only tEnv defined here



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4827) The code example use scala of SQL on Streaming Tables with wrong variable name in flink document

2016-10-13 Thread yuemeng (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4827?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuemeng updated FLINK-4827:
---
Description: 
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)

// read a DataStream from an external source
val ds: DataStream[(Long, String, Integer)] = env.addSource(...)
// register the DataStream under the name "Orders"
tableEnv.registerDataStream("Orders", ds, 'user, 'product, 'amount)
// run a SQL query on the Table and retrieve the result as a new Table
val result = tableEnv.sql(
  "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")


There is no variable named tableEnv had defined ,only tEnv defined here

  was:
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)

// read a DataStream from an external source
val ds: DataStream[(Long, String, Integer)] = env.addSource(...)
// register the DataStream under the name "Orders"
tableEnv.registerDataStream("Orders", ds, 'user, 'product, 'amount)
// run a SQL query on the Table and retrieve the result as a new Table
val result = tableEnv.sql(
  "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")


There is no variable named tableEnv defined here,only tEnv defined here


> The code example use scala of SQL on Streaming Tables  with wrong variable 
> name in flink document
> -
>
> Key: FLINK-4827
> URL: https://issues.apache.org/jira/browse/FLINK-4827
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.1.0, 1.1.2
>Reporter: yuemeng
>Priority: Minor
> Fix For: 1.1.3
>
>
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val tEnv = TableEnvironment.getTableEnvironment(env)
> // read a DataStream from an external source
> val ds: DataStream[(Long, String, Integer)] = env.addSource(...)
> // register the DataStream under the name "Orders"
> tableEnv.registerDataStream("Orders", ds, 'user, 'product, 'amount)
> // run a SQL query on the Table and retrieve the result as a new Table
> val result = tableEnv.sql(
>   "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
> There is no variable named tableEnv had defined ,only tEnv defined here



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4827) The scala example of SQL on Streaming Tables with wrong variable name in flink document

2016-10-13 Thread yuemeng (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4827?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuemeng updated FLINK-4827:
---
Summary: The scala example of SQL on Streaming Tables  with wrong variable 
name in flink document  (was: The code example use scala of SQL on Streaming 
Tables  with wrong variable name in flink document)

> The scala example of SQL on Streaming Tables  with wrong variable name in 
> flink document
> 
>
> Key: FLINK-4827
> URL: https://issues.apache.org/jira/browse/FLINK-4827
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.1.0, 1.1.2
>Reporter: yuemeng
>Priority: Minor
> Fix For: 1.1.3
>
>
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val tEnv = TableEnvironment.getTableEnvironment(env)
> // read a DataStream from an external source
> val ds: DataStream[(Long, String, Integer)] = env.addSource(...)
> // register the DataStream under the name "Orders"
> tableEnv.registerDataStream("Orders", ds, 'user, 'product, 'amount)
> // run a SQL query on the Table and retrieve the result as a new Table
> val result = tableEnv.sql(
>   "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
> There is no variable named tableEnv had defined ,Only tEnv defined here



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4827) The scala example of SQL on Streaming Tables with wrong variable name in flink document

2016-10-13 Thread yuemeng (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4827?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuemeng updated FLINK-4827:
---
Attachment: 0001-The-scala-example-of-SQL-on-Streaming-Tables-with-wr.patch

> The scala example of SQL on Streaming Tables  with wrong variable name in 
> flink document
> 
>
> Key: FLINK-4827
> URL: https://issues.apache.org/jira/browse/FLINK-4827
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.1.0, 1.1.2
>Reporter: yuemeng
>Priority: Minor
> Fix For: 1.1.3
>
> Attachments: 
> 0001-The-scala-example-of-SQL-on-Streaming-Tables-with-wr.patch
>
>
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val tEnv = TableEnvironment.getTableEnvironment(env)
> // read a DataStream from an external source
> val ds: DataStream[(Long, String, Integer)] = env.addSource(...)
> // register the DataStream under the name "Orders"
> tableEnv.registerDataStream("Orders", ds, 'user, 'product, 'amount)
> // run a SQL query on the Table and retrieve the result as a new Table
> val result = tableEnv.sql(
>   "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
> There is no variable named tableEnv had defined ,Only tEnv defined here



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4879) class KafkaTableSource should be public just like KafkaTableSink

2016-10-21 Thread yuemeng (JIRA)
yuemeng created FLINK-4879:
--

 Summary: class KafkaTableSource should be public just like 
KafkaTableSink
 Key: FLINK-4879
 URL: https://issues.apache.org/jira/browse/FLINK-4879
 Project: Flink
  Issue Type: Bug
  Components: Kafka Connector
Affects Versions: 1.1.3, 1.1.1
Reporter: yuemeng
Priority: Minor
 Fix For: 1.1.4


class KafkaTableSource should be public just like KafkaTableSink,by 
default,it's modifier is default ,and we cann't access out of it's package,for 
example:
 def createKafkaTableSource(
  topic: String,
  properties: Properties,
  deserializationSchema: DeserializationSchema[Row],
  fieldsNames: Array[String],
  typeInfo: Array[TypeInformation[_]]): KafkaTableSource = {

if (deserializationSchema != null) {
  new Kafka09TableSource(topic, properties, deserializationSchema, 
fieldsNames, typeInfo)
} else {
  new Kafka09JsonTableSource(topic, properties, fieldsNames, typeInfo)
}
  }


Because of the class KafkaTableSource modifier is default,we cann't define this 
function result type with KafkaTableSource ,we must give the specific type.
if some other kafka source extends KafkaTableSource ,and we don't sure which 
subclass of KafkaTableSource should be use,how can we specific the type?






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4879) class KafkaTableSource should be public just like KafkaTableSink

2016-10-21 Thread yuemeng (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4879?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuemeng updated FLINK-4879:
---
Description: 
class KafkaTableSource should be public just like KafkaTableSink,by 
default,it's modifier is default ,and we cann't access out of it's package,
for example:
 def createKafkaTableSource(
  topic: String,
  properties: Properties,
  deserializationSchema: DeserializationSchema[Row],
  fieldsNames: Array[String],
  typeInfo: Array[TypeInformation[_]]): KafkaTableSource = {

if (deserializationSchema != null) {
  new Kafka09TableSource(topic, properties, deserializationSchema, 
fieldsNames, typeInfo)
} else {
  new Kafka09JsonTableSource(topic, properties, fieldsNames, typeInfo)
}
  }


Because of the class KafkaTableSource modifier is default,we cann't define this 
function result type with KafkaTableSource ,we must give the specific type.
if some other kafka source extends KafkaTableSource ,and we don't sure which 
subclass of KafkaTableSource should be use,how can we specific the type?




  was:
class KafkaTableSource should be public just like KafkaTableSink,by 
default,it's modifier is default ,and we cann't access out of it's package,for 
example:
 def createKafkaTableSource(
  topic: String,
  properties: Properties,
  deserializationSchema: DeserializationSchema[Row],
  fieldsNames: Array[String],
  typeInfo: Array[TypeInformation[_]]): KafkaTableSource = {

if (deserializationSchema != null) {
  new Kafka09TableSource(topic, properties, deserializationSchema, 
fieldsNames, typeInfo)
} else {
  new Kafka09JsonTableSource(topic, properties, fieldsNames, typeInfo)
}
  }


Because of the class KafkaTableSource modifier is default,we cann't define this 
function result type with KafkaTableSource ,we must give the specific type.
if some other kafka source extends KafkaTableSource ,and we don't sure which 
subclass of KafkaTableSource should be use,how can we specific the type?





> class KafkaTableSource should be public just like KafkaTableSink
> 
>
> Key: FLINK-4879
> URL: https://issues.apache.org/jira/browse/FLINK-4879
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.1.1, 1.1.3
>Reporter: yuemeng
>Priority: Minor
> Fix For: 1.1.4
>
>
> class KafkaTableSource should be public just like KafkaTableSink,by 
> default,it's modifier is default ,and we cann't access out of it's package,
> for example:
>  def createKafkaTableSource(
>   topic: String,
>   properties: Properties,
>   deserializationSchema: DeserializationSchema[Row],
>   fieldsNames: Array[String],
>   typeInfo: Array[TypeInformation[_]]): KafkaTableSource = {
> if (deserializationSchema != null) {
>   new Kafka09TableSource(topic, properties, deserializationSchema, 
> fieldsNames, typeInfo)
> } else {
>   new Kafka09JsonTableSource(topic, properties, fieldsNames, typeInfo)
> }
>   }
> Because of the class KafkaTableSource modifier is default,we cann't define 
> this function result type with KafkaTableSource ,we must give the specific 
> type.
> if some other kafka source extends KafkaTableSource ,and we don't sure which 
> subclass of KafkaTableSource should be use,how can we specific the type?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4879) class KafkaTableSource should be public just like KafkaTableSink

2016-10-21 Thread yuemeng (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4879?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuemeng updated FLINK-4879:
---
Description: 
class KafkaTableSource should be public just like KafkaTableSink,by 
default,it's modifier is default ,and we cann't access out of it's package,
for example:


 def createKafkaTableSource(
  topic: String,
  properties: Properties,
  deserializationSchema: DeserializationSchema[Row],
  fieldsNames: Array[String],
  typeInfo: Array[TypeInformation[_]]): KafkaTableSource = {

if (deserializationSchema != null) {
  new Kafka09TableSource(topic, properties, deserializationSchema, 
fieldsNames, typeInfo)
} else {
  new Kafka09JsonTableSource(topic, properties, fieldsNames, typeInfo)
}
  }





Because of the class KafkaTableSource modifier is default,we cann't define this 
function result type with KafkaTableSource ,we must give the specific type.
if some other kafka source extends KafkaTableSource ,and we don't sure which 
subclass of KafkaTableSource should be use,how can we specific the type?




  was:
class KafkaTableSource should be public just like KafkaTableSink,by 
default,it's modifier is default ,and we cann't access out of it's package,
for example:
 def createKafkaTableSource(
  topic: String,
  properties: Properties,
  deserializationSchema: DeserializationSchema[Row],
  fieldsNames: Array[String],
  typeInfo: Array[TypeInformation[_]]): KafkaTableSource = {

if (deserializationSchema != null) {
  new Kafka09TableSource(topic, properties, deserializationSchema, 
fieldsNames, typeInfo)
} else {
  new Kafka09JsonTableSource(topic, properties, fieldsNames, typeInfo)
}
  }


Because of the class KafkaTableSource modifier is default,we cann't define this 
function result type with KafkaTableSource ,we must give the specific type.
if some other kafka source extends KafkaTableSource ,and we don't sure which 
subclass of KafkaTableSource should be use,how can we specific the type?





> class KafkaTableSource should be public just like KafkaTableSink
> 
>
> Key: FLINK-4879
> URL: https://issues.apache.org/jira/browse/FLINK-4879
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.1.1, 1.1.3
>Reporter: yuemeng
>Priority: Minor
> Fix For: 1.1.4
>
>
> class KafkaTableSource should be public just like KafkaTableSink,by 
> default,it's modifier is default ,and we cann't access out of it's package,
> for example:
> 
>  def createKafkaTableSource(
>   topic: String,
>   properties: Properties,
>   deserializationSchema: DeserializationSchema[Row],
>   fieldsNames: Array[String],
>   typeInfo: Array[TypeInformation[_]]): KafkaTableSource = {
> if (deserializationSchema != null) {
>   new Kafka09TableSource(topic, properties, deserializationSchema, 
> fieldsNames, typeInfo)
> } else {
>   new Kafka09JsonTableSource(topic, properties, fieldsNames, typeInfo)
> }
>   }
> 
> Because of the class KafkaTableSource modifier is default,we cann't define 
> this function result type with KafkaTableSource ,we must give the specific 
> type.
> if some other kafka source extends KafkaTableSource ,and we don't sure which 
> subclass of KafkaTableSource should be use,how can we specific the type?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4879) class KafkaTableSource should be public just like KafkaTableSink

2016-10-21 Thread yuemeng (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4879?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuemeng updated FLINK-4879:
---
Description: 
*** class KafkaTableSource should be public just like KafkaTableSink,by 
default,it's modifier is default ,and we cann't access out of it's package***,
for example:


 def createKafkaTableSource(
  topic: String,
  properties: Properties,
  deserializationSchema: DeserializationSchema[Row],
  fieldsNames: Array[String],
  typeInfo: Array[TypeInformation[_]]): KafkaTableSource = {

if (deserializationSchema != null) {
  new Kafka09TableSource(topic, properties, deserializationSchema, 
fieldsNames, typeInfo)
} else {
  new Kafka09JsonTableSource(topic, properties, fieldsNames, typeInfo)
}
  }





Because of the class KafkaTableSource modifier is default,we cann't define this 
function result type with KafkaTableSource ,we must give the specific type.
if some other kafka source extends KafkaTableSource ,and we don't sure which 
subclass of KafkaTableSource should be use,how can we specific the type?




  was:
class KafkaTableSource should be public just like KafkaTableSink,by 
default,it's modifier is default ,and we cann't access out of it's package,
for example:


 def createKafkaTableSource(
  topic: String,
  properties: Properties,
  deserializationSchema: DeserializationSchema[Row],
  fieldsNames: Array[String],
  typeInfo: Array[TypeInformation[_]]): KafkaTableSource = {

if (deserializationSchema != null) {
  new Kafka09TableSource(topic, properties, deserializationSchema, 
fieldsNames, typeInfo)
} else {
  new Kafka09JsonTableSource(topic, properties, fieldsNames, typeInfo)
}
  }





Because of the class KafkaTableSource modifier is default,we cann't define this 
function result type with KafkaTableSource ,we must give the specific type.
if some other kafka source extends KafkaTableSource ,and we don't sure which 
subclass of KafkaTableSource should be use,how can we specific the type?





> class KafkaTableSource should be public just like KafkaTableSink
> 
>
> Key: FLINK-4879
> URL: https://issues.apache.org/jira/browse/FLINK-4879
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.1.1, 1.1.3
>Reporter: yuemeng
>Priority: Minor
> Fix For: 1.1.4
>
>
> *** class KafkaTableSource should be public just like KafkaTableSink,by 
> default,it's modifier is default ,and we cann't access out of it's package***,
> for example:
> 
>  def createKafkaTableSource(
>   topic: String,
>   properties: Properties,
>   deserializationSchema: DeserializationSchema[Row],
>   fieldsNames: Array[String],
>   typeInfo: Array[TypeInformation[_]]): KafkaTableSource = {
> if (deserializationSchema != null) {
>   new Kafka09TableSource(topic, properties, deserializationSchema, 
> fieldsNames, typeInfo)
> } else {
>   new Kafka09JsonTableSource(topic, properties, fieldsNames, typeInfo)
> }
>   }
> 
> Because of the class KafkaTableSource modifier is default,we cann't define 
> this function result type with KafkaTableSource ,we must give the specific 
> type.
> if some other kafka source extends KafkaTableSource ,and we don't sure which 
> subclass of KafkaTableSource should be use,how can we specific the type?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4879) class KafkaTableSource should be public just like KafkaTableSink

2016-10-21 Thread yuemeng (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4879?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuemeng updated FLINK-4879:
---
Description: 
class KafkaTableSource should be public just like KafkaTableSink,by 
default,it's modifier is default ,and we cann't access out of it's package,
for example:



 def createKafkaTableSource(
  topic: String,
  properties: Properties,
  deserializationSchema: DeserializationSchema[Row],
  fieldsNames: Array[String],
  typeInfo: Array[TypeInformation[_]]): KafkaTableSource = {

if (deserializationSchema != null) {
  new Kafka09TableSource(topic, properties, deserializationSchema, 
fieldsNames, typeInfo)
} else {
  new Kafka09JsonTableSource(topic, properties, fieldsNames, typeInfo)
}
  }





Because of the class KafkaTableSource modifier is default,we cann't define this 
function result type with KafkaTableSource ,we must give the specific type.
if some other kafka source extends KafkaTableSource ,and we don't sure which 
subclass of KafkaTableSource should be use,how can we specific the type?




  was:
*** class KafkaTableSource should be public just like KafkaTableSink,by 
default,it's modifier is default ,and we cann't access out of it's package***,
for example:


 def createKafkaTableSource(
  topic: String,
  properties: Properties,
  deserializationSchema: DeserializationSchema[Row],
  fieldsNames: Array[String],
  typeInfo: Array[TypeInformation[_]]): KafkaTableSource = {

if (deserializationSchema != null) {
  new Kafka09TableSource(topic, properties, deserializationSchema, 
fieldsNames, typeInfo)
} else {
  new Kafka09JsonTableSource(topic, properties, fieldsNames, typeInfo)
}
  }





Because of the class KafkaTableSource modifier is default,we cann't define this 
function result type with KafkaTableSource ,we must give the specific type.
if some other kafka source extends KafkaTableSource ,and we don't sure which 
subclass of KafkaTableSource should be use,how can we specific the type?





> class KafkaTableSource should be public just like KafkaTableSink
> 
>
> Key: FLINK-4879
> URL: https://issues.apache.org/jira/browse/FLINK-4879
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.1.1, 1.1.3
>Reporter: yuemeng
>Priority: Minor
> Fix For: 1.1.4
>
>
> class KafkaTableSource should be public just like KafkaTableSink,by 
> default,it's modifier is default ,and we cann't access out of it's package,
> for example:
>  def createKafkaTableSource(
>   topic: String,
>   properties: Properties,
>   deserializationSchema: DeserializationSchema[Row],
>   fieldsNames: Array[String],
>   typeInfo: Array[TypeInformation[_]]): KafkaTableSource = {
> if (deserializationSchema != null) {
>   new Kafka09TableSource(topic, properties, deserializationSchema, 
> fieldsNames, typeInfo)
> } else {
>   new Kafka09JsonTableSource(topic, properties, fieldsNames, typeInfo)
> }
>   }
> Because of the class KafkaTableSource modifier is default,we cann't define 
> this function result type with KafkaTableSource ,we must give the specific 
> type.
> if some other kafka source extends KafkaTableSource ,and we don't sure which 
> subclass of KafkaTableSource should be use,how can we specific the type?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4879) class KafkaTableSource should be public just like KafkaTableSink

2016-10-21 Thread yuemeng (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4879?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuemeng updated FLINK-4879:
---
Attachment: 0001-class-KafkaTableSource-should-be-public.patch

> class KafkaTableSource should be public just like KafkaTableSink
> 
>
> Key: FLINK-4879
> URL: https://issues.apache.org/jira/browse/FLINK-4879
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.1.1, 1.1.3
>Reporter: yuemeng
>Priority: Minor
> Fix For: 1.1.4
>
> Attachments: 0001-class-KafkaTableSource-should-be-public.patch
>
>
> class KafkaTableSource should be public just like KafkaTableSink,by 
> default,it's modifier is default ,and we cann't access out of it's package,
> for example:
>  def createKafkaTableSource(
>   topic: String,
>   properties: Properties,
>   deserializationSchema: DeserializationSchema[Row],
>   fieldsNames: Array[String],
>   typeInfo: Array[TypeInformation[_]]): KafkaTableSource = {
> if (deserializationSchema != null) {
>   new Kafka09TableSource(topic, properties, deserializationSchema, 
> fieldsNames, typeInfo)
> } else {
>   new Kafka09JsonTableSource(topic, properties, fieldsNames, typeInfo)
> }
>   }
> Because of the class KafkaTableSource modifier is default,we cann't define 
> this function result type with KafkaTableSource ,we must give the specific 
> type.
> if some other kafka source extends KafkaTableSource ,and we don't sure which 
> subclass of KafkaTableSource should be use,how can we specific the type?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4879) class KafkaTableSource should be public just like KafkaTableSink

2016-10-21 Thread yuemeng (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4879?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuemeng updated FLINK-4879:
---
Description: 
*class KafkaTableSource should be public just like KafkaTableSink,by 
default,it's modifier is default ,and we cann't access out of it's package*,
for example:



 {code}
def createKafkaTableSource(
  topic: String,
  properties: Properties,
  deserializationSchema: DeserializationSchema[Row],
  fieldsNames: Array[String],
  typeInfo: Array[TypeInformation[_]]): KafkaTableSource = {

if (deserializationSchema != null) {
  new Kafka09TableSource(topic, properties, deserializationSchema, 
fieldsNames, typeInfo)
} else {
  new Kafka09JsonTableSource(topic, properties, fieldsNames, typeInfo)
}
  }
{code}




Because of the class KafkaTableSource modifier is default,we cann't define this 
function result type with KafkaTableSource ,we must give the specific type.
if some other kafka source extends KafkaTableSource ,and we don't sure which 
subclass of KafkaTableSource should be use,how can we specific the type?




  was:
class KafkaTableSource should be public just like KafkaTableSink,by 
default,it's modifier is default ,and we cann't access out of it's package,
for example:



 def createKafkaTableSource(
  topic: String,
  properties: Properties,
  deserializationSchema: DeserializationSchema[Row],
  fieldsNames: Array[String],
  typeInfo: Array[TypeInformation[_]]): KafkaTableSource = {

if (deserializationSchema != null) {
  new Kafka09TableSource(topic, properties, deserializationSchema, 
fieldsNames, typeInfo)
} else {
  new Kafka09JsonTableSource(topic, properties, fieldsNames, typeInfo)
}
  }





Because of the class KafkaTableSource modifier is default,we cann't define this 
function result type with KafkaTableSource ,we must give the specific type.
if some other kafka source extends KafkaTableSource ,and we don't sure which 
subclass of KafkaTableSource should be use,how can we specific the type?





> class KafkaTableSource should be public just like KafkaTableSink
> 
>
> Key: FLINK-4879
> URL: https://issues.apache.org/jira/browse/FLINK-4879
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.1.1, 1.1.3
>Reporter: yuemeng
>Priority: Minor
> Fix For: 1.1.4
>
> Attachments: 0001-class-KafkaTableSource-should-be-public.patch
>
>
> *class KafkaTableSource should be public just like KafkaTableSink,by 
> default,it's modifier is default ,and we cann't access out of it's package*,
> for example:
>  {code}
> def createKafkaTableSource(
>   topic: String,
>   properties: Properties,
>   deserializationSchema: DeserializationSchema[Row],
>   fieldsNames: Array[String],
>   typeInfo: Array[TypeInformation[_]]): KafkaTableSource = {
> if (deserializationSchema != null) {
>   new Kafka09TableSource(topic, properties, deserializationSchema, 
> fieldsNames, typeInfo)
> } else {
>   new Kafka09JsonTableSource(topic, properties, fieldsNames, typeInfo)
> }
>   }
> {code}
> Because of the class KafkaTableSource modifier is default,we cann't define 
> this function result type with KafkaTableSource ,we must give the specific 
> type.
> if some other kafka source extends KafkaTableSource ,and we don't sure which 
> subclass of KafkaTableSource should be use,how can we specific the type?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5324) JVM Opitons will be work both for YarnApplicationMasterRunner and YarnTaskManager with yarn mode

2016-12-12 Thread yuemeng (JIRA)
yuemeng created FLINK-5324:
--

 Summary: JVM Opitons will be work both for 
YarnApplicationMasterRunner and YarnTaskManager with yarn mode
 Key: FLINK-5324
 URL: https://issues.apache.org/jira/browse/FLINK-5324
 Project: Flink
  Issue Type: Bug
  Components: YARN
Affects Versions: 1.1.3
Reporter: yuemeng
Priority: Critical


YarnApplicationMasterRunner and YarnTaskManager both use follow code to get jvm 
options
{code}
final String javaOpts = 
flinkConfiguration.getString(ConfigConstants.FLINK_JVM_OPTIONS, "")
{code/}
so when we add some jvm options for one of them ,it will be both worked



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5324) JVM Opitons will be work both for YarnApplicationMasterRunner and YarnTaskManager with yarn mode

2016-12-12 Thread yuemeng (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5324?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuemeng updated FLINK-5324:
---
Description: 
YarnApplicationMasterRunner and YarnTaskManager both use follow code to get jvm 
options
{code}
final String javaOpts = 
flinkConfiguration.getString(ConfigConstants.FLINK_JVM_OPTIONS, "")
{code}
so when we add some jvm options for one of them ,it will be both worked

  was:
YarnApplicationMasterRunner and YarnTaskManager both use follow code to get jvm 
options
{code}
final String javaOpts = 
flinkConfiguration.getString(ConfigConstants.FLINK_JVM_OPTIONS, "")
{code/}
so when we add some jvm options for one of them ,it will be both worked


> JVM Opitons will be work both for YarnApplicationMasterRunner and 
> YarnTaskManager with yarn mode
> 
>
> Key: FLINK-5324
> URL: https://issues.apache.org/jira/browse/FLINK-5324
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 1.1.3
>Reporter: yuemeng
>Priority: Critical
>
> YarnApplicationMasterRunner and YarnTaskManager both use follow code to get 
> jvm options
> {code}
> final String javaOpts = 
> flinkConfiguration.getString(ConfigConstants.FLINK_JVM_OPTIONS, "")
> {code}
> so when we add some jvm options for one of them ,it will be both worked



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5324) JVM Opitons will be work both for YarnApplicationMasterRunner and YarnTaskManager with yarn mode

2016-12-13 Thread yuemeng (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5324?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuemeng updated FLINK-5324:
---
Attachment: 0001-FLINK-5324-yarn-JVM-Opitons-will-work-for-both-YarnA.patch

> JVM Opitons will be work both for YarnApplicationMasterRunner and 
> YarnTaskManager with yarn mode
> 
>
> Key: FLINK-5324
> URL: https://issues.apache.org/jira/browse/FLINK-5324
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 1.1.3
>Reporter: yuemeng
>Priority: Critical
> Attachments: 
> 0001-FLINK-5324-yarn-JVM-Opitons-will-work-for-both-YarnA.patch
>
>
> YarnApplicationMasterRunner and YarnTaskManager both use follow code to get 
> jvm options
> {code}
> final String javaOpts = 
> flinkConfiguration.getString(ConfigConstants.FLINK_JVM_OPTIONS, "")
> {code}
> so when we add some jvm options for one of them ,it will be both worked



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-9329) hasRowtimeAttribute will throw NPE if user use setProctimeAttribute for table source

2018-06-07 Thread yuemeng (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9329?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuemeng closed FLINK-9329.
--
Resolution: Invalid

> hasRowtimeAttribute will throw NPE if user use setProctimeAttribute for table 
> source
> 
>
> Key: FLINK-9329
> URL: https://issues.apache.org/jira/browse/FLINK-9329
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: yuemeng
>Assignee: yuemeng
>Priority: Critical
>
> {code:java}
> Examples:
> KafkaTableSource source = Kafka010JsonTableSource.builder()
> .withSchema(TableSchema.builder()
> .field("sensorId", Types.LONG()) 
> .field("temp", Types.DOUBLE())
> .field("ptime", Types.SQL_TIMESTAMP()).build())
> .withProctimeAttribute("ptime")
> .build(); tableEnv.registerTableSource("flights", source ); {code}
> {{ }}
> {code:java}
> Kafka010JsonTableSource implement the DefinedRowtimeAttributes .
> so when TableSourceUtil call hasRowtimeAttribute(source)to check ,it will 
> call follow code
> /** Returns a list with all rowtime attribute names of the [[TableSource]]. */
> private def getRowtimeAttributes(tableSource: TableSource[_]): Array[String] 
> = {
>   tableSource match {
> case r: DefinedRowtimeAttributes =>
>   r.getRowtimeAttributeDescriptors.asScala.map(_.getAttributeName).toArray
> case _ =>
>   Array()
>   }
> }
>  r.getRowtimeAttributeDescriptors will throw NPE because of we use 
> ProctimeAttribute here
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-5726) Add the RocketMQ plugin for the Apache Flink

2018-10-14 Thread yuemeng (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-5726?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuemeng reassigned FLINK-5726:
--

Assignee: yuemeng

> Add the RocketMQ plugin for the Apache Flink
> 
>
> Key: FLINK-5726
> URL: https://issues.apache.org/jira/browse/FLINK-5726
> Project: Flink
>  Issue Type: Task
>  Components: Streaming Connectors
>Reporter: Longda Feng
>Assignee: yuemeng
>Priority: Minor
>
> Apache RocketMQ® is an open source distributed messaging and streaming data 
> platform. It has been used in a lot of companies. Please refer to 
> http://rocketmq.incubator.apache.org/ for more details.
> Since the Apache RocketMq 4.0 will be released in the next few days, we can 
> start the job of adding the RocketMq plugin for the Apache Flink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9201) same merge window will be fired twice if watermark already pass the new merged window

2018-04-18 Thread yuemeng (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuemeng updated FLINK-9201:
---
Summary: same merge window will be fired twice if watermark already pass 
the new merged window  (was: same merge window will fire twice if watermark 
already pass the window for merge windows)

> same merge window will be fired twice if watermark already pass the new 
> merged window
> -
>
> Key: FLINK-9201
> URL: https://issues.apache.org/jira/browse/FLINK-9201
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.3.3
>Reporter: yuemeng
>Assignee: yuemeng
>Priority: Blocker
>
> sum with session window,.
> suppose the session gap is 3 seconds and allowedlateness is 60 seconds
> w1,TimeWindow[1,9] had elements,1,2,3,6,will be fired if watermark reached 9
>  if a late element (w2,TimeWindow[7,10]) had come but the watermark already 
> at 11.
> w1,w2 will be merged a new window w3 TimeWindow[1,10] and will be register a 
> new timer by call triggerContext.onMerge(mergedWindows),then w3 will be fired 
> later by call triggerContext.onElement(element) because of the watermark pass 
> the w3.
> but w3 will be fired again because of the timer < current watermark.
> that mean w3 will be fired  twice because of watermark pass the new merge 
> window w3.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9201) same merge window will fire twice if watermark already pass the window for merge windows

2018-04-18 Thread yuemeng (JIRA)
yuemeng created FLINK-9201:
--

 Summary: same merge window will fire twice if watermark already 
pass the window for merge windows
 Key: FLINK-9201
 URL: https://issues.apache.org/jira/browse/FLINK-9201
 Project: Flink
  Issue Type: Bug
  Components: Core
Affects Versions: 1.3.3
Reporter: yuemeng
Assignee: yuemeng


sum with session window,.

suppose the session gap is 3 seconds and allowedlateness is 60 seconds

w1,TimeWindow[1,9] had elements,1,2,3,6,will be fired if watermark reached 9

 if a late element (w2,TimeWindow[7,10]) had come but the watermark already at 
11.

w1,w2 will be merged a new window w3 TimeWindow[1,10] and will be register a 
new timer by call triggerContext.onMerge(mergedWindows),then w3 will be fired 
later by call triggerContext.onElement(element) because of the watermark pass 
the w3.

but w3 will be fired again because of the timer < current watermark.

that mean w3 will be fired  twice because of watermark pass the new merge 
window w3.

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9201) same merge window will be fired twice if watermark already pass the new merged window

2018-04-18 Thread yuemeng (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuemeng updated FLINK-9201:
---
Description: 
sum with session window,.suppose the session gap is 3 seconds and 
allowedlateness is 60 seconds

w1,TimeWindow[1,9] had elements,1,2,3,6,will be fired if watermark reached 9

 if a late element (w2,TimeWindow[7,10]) had come but the watermark already at 
11.

w1,w2 will be merged a new window w3 TimeWindow[1,10] and will be register a 
new timer by call triggerContext.onMerge(mergedWindows),w3 will be fired first 
time by call triggerContext.onElement(element) because of the watermark pass 
the w3.

 w3 will be fired second times because of the timer < current watermark.

that mean w3 will be fired  twice because of watermark pass the new merge 
window w3.

 

 

  was:
sum with session window,.

suppose the session gap is 3 seconds and allowedlateness is 60 seconds

w1,TimeWindow[1,9] had elements,1,2,3,6,will be fired if watermark reached 9

 if a late element (w2,TimeWindow[7,10]) had come but the watermark already at 
11.

w1,w2 will be merged a new window w3 TimeWindow[1,10] and will be register a 
new timer by call triggerContext.onMerge(mergedWindows),then w3 will be fired 
later by call triggerContext.onElement(element) because of the watermark pass 
the w3.

but w3 will be fired again because of the timer < current watermark.

that mean w3 will be fired  twice because of watermark pass the new merge 
window w3.

 

 


> same merge window will be fired twice if watermark already pass the new 
> merged window
> -
>
> Key: FLINK-9201
> URL: https://issues.apache.org/jira/browse/FLINK-9201
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.3.3
>Reporter: yuemeng
>Assignee: yuemeng
>Priority: Blocker
>
> sum with session window,.suppose the session gap is 3 seconds and 
> allowedlateness is 60 seconds
> w1,TimeWindow[1,9] had elements,1,2,3,6,will be fired if watermark reached 9
>  if a late element (w2,TimeWindow[7,10]) had come but the watermark already 
> at 11.
> w1,w2 will be merged a new window w3 TimeWindow[1,10] and will be register a 
> new timer by call triggerContext.onMerge(mergedWindows),w3 will be fired 
> first time by call triggerContext.onElement(element) because of the watermark 
> pass the w3.
>  w3 will be fired second times because of the timer < current watermark.
> that mean w3 will be fired  twice because of watermark pass the new merge 
> window w3.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9201) same merge window will be fired twice if watermark already pass the new merged window

2018-04-18 Thread yuemeng (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuemeng updated FLINK-9201:
---
Description: 
sum with session window,.suppose the session gap is 3 seconds and 
allowedlateness is 60 seconds
 * w1,TimeWindow[1,9] had elements,1,2,3,6,will be fired if watermark reached 9
 *  if a late element (w2,TimeWindow[7,10]) had come but the watermark already 
at 11.
 * w1,w2 will be merged a new window w3 TimeWindow[1,10] and will be register a 
new timer by call triggerContext.onMerge(mergedWindows),w3 will be fired first 
time by call triggerContext.onElement(element) because of the watermark pass 
the w3. w3 will be fired second times because of the timer < current watermark.

that mean w3 will be fired  twice because of watermark pass the new merge 
window w3.

 

 

  was:
sum with session window,.suppose the session gap is 3 seconds and 
allowedlateness is 60 seconds

w1,TimeWindow[1,9] had elements,1,2,3,6,will be fired if watermark reached 9

 if a late element (w2,TimeWindow[7,10]) had come but the watermark already at 
11.

w1,w2 will be merged a new window w3 TimeWindow[1,10] and will be register a 
new timer by call triggerContext.onMerge(mergedWindows),w3 will be fired first 
time by call triggerContext.onElement(element) because of the watermark pass 
the w3.

 w3 will be fired second times because of the timer < current watermark.

that mean w3 will be fired  twice because of watermark pass the new merge 
window w3.

 

 


> same merge window will be fired twice if watermark already pass the new 
> merged window
> -
>
> Key: FLINK-9201
> URL: https://issues.apache.org/jira/browse/FLINK-9201
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.3.3
>Reporter: yuemeng
>Assignee: yuemeng
>Priority: Blocker
>
> sum with session window,.suppose the session gap is 3 seconds and 
> allowedlateness is 60 seconds
>  * w1,TimeWindow[1,9] had elements,1,2,3,6,will be fired if watermark reached 
> 9
>  *  if a late element (w2,TimeWindow[7,10]) had come but the watermark 
> already at 11.
>  * w1,w2 will be merged a new window w3 TimeWindow[1,10] and will be register 
> a new timer by call triggerContext.onMerge(mergedWindows),w3 will be fired 
> first time by call triggerContext.onElement(element) because of the watermark 
> pass the w3. w3 will be fired second times because of the timer < current 
> watermark.
> that mean w3 will be fired  twice because of watermark pass the new merge 
> window w3.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9201) same merge window will be fired twice if watermark already pass the new merged window

2018-04-26 Thread yuemeng (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuemeng updated FLINK-9201:
---
Description: 
sum with session window,.suppose the session gap is 3 seconds and 
allowedlateness is 60 seconds
 * w1,TimeWindow[1,9] had elements,1,2,3,6,will be fired if watermark reached 9
 *  if a late element (w2,TimeWindow[7,10]) had come but the watermark already 
at 11.
 * w1,w2 will be merged a new window w3 TimeWindow[1,10] and will be register a 
new timer by call triggerContext.onMerge(mergedWindows),w3 will be fired first 
time by call triggerContext.onElement(element) because of the watermark pass 
the w3. w3 will be fired second times because of the timer < current watermark.

that mean w3 will be fired  twice because of watermark pass the new merge 
window w3.

Examples

{code}

@Test
@SuppressWarnings("unchecked")
public void testSessionWindowsFiredTwice() throws Exception {
 closeCalled.set(0);

 final int sessionSize = 3;

 TypeInformation> inputType = 
TypeInfoParser.parse("Tuple2");

 ListStateDescriptor> stateDesc = new 
ListStateDescriptor<>("window-contents",
 inputType.createSerializer(new ExecutionConfig()));

 WindowOperator, Iterable>, Tuple3, TimeWindow> operator = new 
WindowOperator<>(
 EventTimeSessionWindows.withGap(Time.seconds(sessionSize)),
 new TimeWindow.Serializer(),
 new TupleKeySelector(),
 BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
 stateDesc,
 new InternalIterableWindowFunction<>(new SessionWindowFunction()),
 EventTimeTrigger.create(),
 6,
 null /* late data output tag */);

 OneInputStreamOperatorTestHarness, Tuple3> testHarness =
 createTestHarness(operator);

 ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>();

 testHarness.open();

 // add elements out-of-order
 testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 0));
 testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 2), 1000));
 testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 3), 2500));

 testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 10));
 testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), 1000));
 testHarness.processWatermark(new Watermark(5500));
 expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-6", 0L, 5500L), 
5499));
 expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-3", 10L, 4000L), 
3999));
 expectedOutput.add(new Watermark(5500));
 // do a snapshot, close and restore again
 OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);

 TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", 
expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
 testHarness.close();

 testHarness = createTestHarness(operator);
 testHarness.setup();
 testHarness.initializeState(snapshot);
 testHarness.open();
 expectedOutput.clear();
 //suppose the watermark alread arrived 1
 testHarness.processWatermark(new Watermark(1));
 //late element with timestamp 4500 had arrived,the new session window[0, 7500] 
is still a valid window becase of maxtimestamp < cleantime
 //and fired immediately
 testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 6), 4500));
 expectedOutput.add(new Watermark(1));
 expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-12", 0L, 7500L), 
7499));
 //when a new watermark had arrived,the same TimeWindow[0, 7500] will fired 
again
 testHarness.processWatermark(new Watermark(11000));
 expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-12", 0L, 7500L), 
7499));
 expectedOutput.add(new Watermark(11000));
 TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", 
expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
 testHarness.close();
}

{code}

 

 

 

  was:
sum with session window,.suppose the session gap is 3 seconds and 
allowedlateness is 60 seconds
 * w1,TimeWindow[1,9] had elements,1,2,3,6,will be fired if watermark reached 9
 *  if a late element (w2,TimeWindow[7,10]) had come but the watermark already 
at 11.
 * w1,w2 will be merged a new window w3 TimeWindow[1,10] and will be register a 
new timer by call triggerContext.onMerge(mergedWindows),w3 will be fired first 
time by call triggerContext.onElement(element) because of the watermark pass 
the w3. w3 will be fired second times because of the timer < current watermark.

that mean w3 will be fired  twice because of watermark pass the new merge 
window w3.

 

 


> same merge window will be fired twice if watermark already pass the new 
> merged window
> -
>
> Key: FLINK-9201
> URL: https://issues.apache.org/jira/browse/FLINK-9201
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.3.3
>Reporter: yuemeng
>

[jira] [Updated] (FLINK-9201) same merge window will be fired twice if watermark already pass the new merged window

2018-04-26 Thread yuemeng (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuemeng updated FLINK-9201:
---
Description: 
sum with session window,.suppose the session gap is 3 seconds and 
allowedlateness is 60 seconds
 * w1,TimeWindow[1,9] had elements,1,2,3,6,will be fired if watermark reached 9
 *  if a late element (w2,TimeWindow[7,10]) had come but the watermark already 
at 11.
 * w1,w2 will be merged a new window w3 TimeWindow[1,10] and will be register a 
new timer by call triggerContext.onMerge(mergedWindows),w3 will be fired first 
time by call triggerContext.onElement(element) because of the watermark pass 
the w3. w3 will be fired second times because of the timer < current watermark.

that mean w3 will be fired  twice because of watermark pass the new merge 
window w3.

Examples
{code:java}
@Test
@SuppressWarnings("unchecked")
public void testSessionWindowsFiredTwice() throws Exception {
 closeCalled.set(0);

 final int sessionSize = 3;

 TypeInformation> inputType = 
TypeInfoParser.parse("Tuple2");

 ListStateDescriptor> stateDesc = new 
ListStateDescriptor<>("window-contents",
 inputType.createSerializer(new ExecutionConfig()));

 WindowOperator, Iterable>, Tuple3, TimeWindow> operator = new 
WindowOperator<>(
 EventTimeSessionWindows.withGap(Time.seconds(sessionSize)),
 new TimeWindow.Serializer(),
 new TupleKeySelector(),
 BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
 stateDesc,
 new InternalIterableWindowFunction<>(new SessionWindowFunction()),
 EventTimeTrigger.create(),
 6,
 null /* late data output tag */);

 OneInputStreamOperatorTestHarness, Tuple3> testHarness =
 createTestHarness(operator);

 ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>();

 testHarness.open();

 // add elements out-of-order
 testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 0));
 testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 2), 1000));
 testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 3), 2500));

 testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 10));
 testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), 1000));
 testHarness.processWatermark(new Watermark(5500));
 expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-6", 0L, 5500L), 
5499));
 expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-3", 10L, 4000L), 
3999));
 expectedOutput.add(new Watermark(5500));
 // do a snapshot, close and restore again
 OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);

 TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", 
expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
 testHarness.close();

 testHarness = createTestHarness(operator);
 testHarness.setup();
 testHarness.initializeState(snapshot);
 testHarness.open();
 expectedOutput.clear();
 //suppose the watermark alread arrived 1
 testHarness.processWatermark(new Watermark(1));
 //late element with timestamp 4500 had arrived,the new session window[0, 7500] 
is still a valid window becase of maxtimestamp < cleantime
 //and fired immediately
 testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 6), 4500));
 expectedOutput.add(new Watermark(1));
 expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-12", 0L, 7500L), 
7499));
 //when a new watermark had arrived,the same TimeWindow[0, 7500] will fired 
again becase of a new timer rigsterd by call triggerOnMerge
 testHarness.processWatermark(new Watermark(11000));
 expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-12", 0L, 7500L), 
7499));
 expectedOutput.add(new Watermark(11000));
 TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", 
expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
 testHarness.close();
}

{code}
 

 

 

  was:
sum with session window,.suppose the session gap is 3 seconds and 
allowedlateness is 60 seconds
 * w1,TimeWindow[1,9] had elements,1,2,3,6,will be fired if watermark reached 9
 *  if a late element (w2,TimeWindow[7,10]) had come but the watermark already 
at 11.
 * w1,w2 will be merged a new window w3 TimeWindow[1,10] and will be register a 
new timer by call triggerContext.onMerge(mergedWindows),w3 will be fired first 
time by call triggerContext.onElement(element) because of the watermark pass 
the w3. w3 will be fired second times because of the timer < current watermark.

that mean w3 will be fired  twice because of watermark pass the new merge 
window w3.

Examples

{code}

@Test
@SuppressWarnings("unchecked")
public void testSessionWindowsFiredTwice() throws Exception {
 closeCalled.set(0);

 final int sessionSize = 3;

 TypeInformation> inputType = 
TypeInfoParser.parse("Tuple2");

 ListStateDescriptor> stateDesc = new 
ListStateDescriptor<>("window-contents",
 inputType.createSerializer(new ExecutionConfig()));

 WindowOperator, 

[jira] [Updated] (FLINK-9201) same merge window will be fired twice if watermark already pass the new merged window

2018-04-26 Thread yuemeng (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuemeng updated FLINK-9201:
---
Description: 
sum with session window,.suppose the session gap is 3 seconds and 
allowedlateness is 60 seconds
 * w1,TimeWindow[1,9] had elements,1,2,3,6,will be fired if watermark reached 9
 *  if a late element (w2,TimeWindow[7,10]) had come but the watermark already 
at 11.
 * w1,w2 will be merged a new window w3 TimeWindow[1,10] and will be register a 
new timer by call triggerContext.onMerge(mergedWindows),w3 will be fired first 
time by call triggerContext.onElement(element) because of the watermark pass 
the w3. w3 will be fired second times because of the timer < current watermark.

that mean w3 will be fired  twice because of watermark pass the new merge 
window w3.

Examples
{code:java}
@Test
@SuppressWarnings("unchecked")
public void testSessionWindowsFiredTwice() throws Exception {
 closeCalled.set(0);

 final int sessionSize = 3;

 TypeInformation> inputType = 
TypeInfoParser.parse("Tuple2");

 ListStateDescriptor> stateDesc = new 
ListStateDescriptor<>("window-contents",
 inputType.createSerializer(new ExecutionConfig()));

 WindowOperator, Iterable>, Tuple3, TimeWindow> operator = new 
WindowOperator<>(
 EventTimeSessionWindows.withGap(Time.seconds(sessionSize)),
 new TimeWindow.Serializer(),
 new TupleKeySelector(),
 BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
 stateDesc,
 new InternalIterableWindowFunction<>(new SessionWindowFunction()),
 EventTimeTrigger.create(),
 6,
 null /* late data output tag */);

 OneInputStreamOperatorTestHarness, Tuple3> testHarness =
 createTestHarness(operator);

 ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>();

 testHarness.open();

 // add elements out-of-order
 testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 0));
 testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 2), 1000));
 testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 3), 2500));

 testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 10));
 testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), 1000));
 testHarness.processWatermark(new Watermark(5500));
 expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-6", 0L, 5500L), 
5499));
 expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-3", 10L, 4000L), 
3999));
 expectedOutput.add(new Watermark(5500));
 // do a snapshot, close and restore again
 OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);

 TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", 
expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
 testHarness.close();

 testHarness = createTestHarness(operator);
 testHarness.setup();
 testHarness.initializeState(snapshot);
 testHarness.open();
 expectedOutput.clear();
 //suppose the watermark alread arrived 1
 testHarness.processWatermark(new Watermark(1));
 //late element with timestamp 4500 had arrived,the new session window[0, 7500] 
is still a valid window becase of maxtimestamp < cleantime
 //and fired immediately
 testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 6), 4500));
 expectedOutput.add(new Watermark(1));
 expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-12", 0L, 7500L), 
7499));
 //when a new watermark had arrived,the same TimeWindow[0, 7500] will fired 
again becase of a new timer had rigstered by call triggerOnMerge
 testHarness.processWatermark(new Watermark(11000));
 expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-12", 0L, 7500L), 
7499));
 expectedOutput.add(new Watermark(11000));
 TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", 
expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
 testHarness.close();
}

{code}
 

 

 

  was:
sum with session window,.suppose the session gap is 3 seconds and 
allowedlateness is 60 seconds
 * w1,TimeWindow[1,9] had elements,1,2,3,6,will be fired if watermark reached 9
 *  if a late element (w2,TimeWindow[7,10]) had come but the watermark already 
at 11.
 * w1,w2 will be merged a new window w3 TimeWindow[1,10] and will be register a 
new timer by call triggerContext.onMerge(mergedWindows),w3 will be fired first 
time by call triggerContext.onElement(element) because of the watermark pass 
the w3. w3 will be fired second times because of the timer < current watermark.

that mean w3 will be fired  twice because of watermark pass the new merge 
window w3.

Examples
{code:java}
@Test
@SuppressWarnings("unchecked")
public void testSessionWindowsFiredTwice() throws Exception {
 closeCalled.set(0);

 final int sessionSize = 3;

 TypeInformation> inputType = 
TypeInfoParser.parse("Tuple2");

 ListStateDescriptor> stateDesc = new 
ListStateDescriptor<>("window-contents",
 inputType.createSerializer(new ExecutionConfig()));

 WindowOp

[jira] [Updated] (FLINK-9201) same merge window will be fired twice if watermark already passed the merge window

2018-04-26 Thread yuemeng (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuemeng updated FLINK-9201:
---
Summary: same merge window will be fired twice if watermark already passed 
the merge window  (was: same merge window will be fired twice if watermark 
already passed the new merged window)

> same merge window will be fired twice if watermark already passed the merge 
> window
> --
>
> Key: FLINK-9201
> URL: https://issues.apache.org/jira/browse/FLINK-9201
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.3.3
>Reporter: yuemeng
>Assignee: yuemeng
>Priority: Blocker
>
> sum with session window,.suppose the session gap is 3 seconds and 
> allowedlateness is 60 seconds
>  * w1,TimeWindow[1,9] had elements,1,2,3,6,will be fired if watermark reached 
> 9
>  *  if a late element (w2,TimeWindow[7,10]) had come but the watermark 
> already at 11.
>  * w1,w2 will be merged a new window w3 TimeWindow[1,10] and will be register 
> a new timer by call triggerContext.onMerge(mergedWindows),w3 will be fired 
> first time by call triggerContext.onElement(element) because of the watermark 
> pass the w3. w3 will be fired second times because of the timer < current 
> watermark.
> that mean w3 will be fired  twice because of watermark pass the new merge 
> window w3.
> Examples
> {code:java}
> @Test
> @SuppressWarnings("unchecked")
> public void testSessionWindowsFiredTwice() throws Exception {
>  closeCalled.set(0);
>  final int sessionSize = 3;
>  TypeInformation> inputType = 
> TypeInfoParser.parse("Tuple2");
>  ListStateDescriptor> stateDesc = new 
> ListStateDescriptor<>("window-contents",
>  inputType.createSerializer(new ExecutionConfig()));
>  WindowOperator, Iterable Integer>>, Tuple3, TimeWindow> operator = new 
> WindowOperator<>(
>  EventTimeSessionWindows.withGap(Time.seconds(sessionSize)),
>  new TimeWindow.Serializer(),
>  new TupleKeySelector(),
>  BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
>  stateDesc,
>  new InternalIterableWindowFunction<>(new SessionWindowFunction()),
>  EventTimeTrigger.create(),
>  6,
>  null /* late data output tag */);
>  OneInputStreamOperatorTestHarness, Tuple3 Long, Long>> testHarness =
>  createTestHarness(operator);
>  ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>();
>  testHarness.open();
>  // add elements out-of-order
>  testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 0));
>  testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 2), 
> 1000));
>  testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 3), 
> 2500));
>  testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 10));
>  testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), 
> 1000));
>  testHarness.processWatermark(new Watermark(5500));
>  expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-6", 0L, 5500L), 
> 5499));
>  expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-3", 10L, 4000L), 
> 3999));
>  expectedOutput.add(new Watermark(5500));
>  // do a snapshot, close and restore again
>  OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
>  TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", 
> expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
>  testHarness.close();
>  testHarness = createTestHarness(operator);
>  testHarness.setup();
>  testHarness.initializeState(snapshot);
>  testHarness.open();
>  expectedOutput.clear();
>  //suppose the watermark alread arrived 1
>  testHarness.processWatermark(new Watermark(1));
>  //late element with timestamp 4500 had arrived,the new session window[0, 
> 7500] is still a valid window becase of maxtimestamp < cleantime
>  //and fired immediately
>  testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 6), 
> 4500));
>  expectedOutput.add(new Watermark(1));
>  expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-12", 0L, 7500L), 
> 7499));
>  //when a new watermark had arrived,the same TimeWindow[0, 7500] will fired 
> again becase of a new timer had rigstered by call triggerOnMerge
>  testHarness.processWatermark(new Watermark(11000));
>  expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-12", 0L, 7500L), 
> 7499));
>  expectedOutput.add(new Watermark(11000));
>  TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", 
> expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
>  testHarness.close();
> }
> {code}
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9201) same merge window will be fired twice if watermark already passed the new merged window

2018-04-26 Thread yuemeng (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuemeng updated FLINK-9201:
---
Summary: same merge window will be fired twice if watermark already passed 
the new merged window  (was: same merge window will be fired twice if watermark 
already pass the new merged window)

> same merge window will be fired twice if watermark already passed the new 
> merged window
> ---
>
> Key: FLINK-9201
> URL: https://issues.apache.org/jira/browse/FLINK-9201
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.3.3
>Reporter: yuemeng
>Assignee: yuemeng
>Priority: Blocker
>
> sum with session window,.suppose the session gap is 3 seconds and 
> allowedlateness is 60 seconds
>  * w1,TimeWindow[1,9] had elements,1,2,3,6,will be fired if watermark reached 
> 9
>  *  if a late element (w2,TimeWindow[7,10]) had come but the watermark 
> already at 11.
>  * w1,w2 will be merged a new window w3 TimeWindow[1,10] and will be register 
> a new timer by call triggerContext.onMerge(mergedWindows),w3 will be fired 
> first time by call triggerContext.onElement(element) because of the watermark 
> pass the w3. w3 will be fired second times because of the timer < current 
> watermark.
> that mean w3 will be fired  twice because of watermark pass the new merge 
> window w3.
> Examples
> {code:java}
> @Test
> @SuppressWarnings("unchecked")
> public void testSessionWindowsFiredTwice() throws Exception {
>  closeCalled.set(0);
>  final int sessionSize = 3;
>  TypeInformation> inputType = 
> TypeInfoParser.parse("Tuple2");
>  ListStateDescriptor> stateDesc = new 
> ListStateDescriptor<>("window-contents",
>  inputType.createSerializer(new ExecutionConfig()));
>  WindowOperator, Iterable Integer>>, Tuple3, TimeWindow> operator = new 
> WindowOperator<>(
>  EventTimeSessionWindows.withGap(Time.seconds(sessionSize)),
>  new TimeWindow.Serializer(),
>  new TupleKeySelector(),
>  BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
>  stateDesc,
>  new InternalIterableWindowFunction<>(new SessionWindowFunction()),
>  EventTimeTrigger.create(),
>  6,
>  null /* late data output tag */);
>  OneInputStreamOperatorTestHarness, Tuple3 Long, Long>> testHarness =
>  createTestHarness(operator);
>  ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>();
>  testHarness.open();
>  // add elements out-of-order
>  testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 0));
>  testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 2), 
> 1000));
>  testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 3), 
> 2500));
>  testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 10));
>  testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), 
> 1000));
>  testHarness.processWatermark(new Watermark(5500));
>  expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-6", 0L, 5500L), 
> 5499));
>  expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-3", 10L, 4000L), 
> 3999));
>  expectedOutput.add(new Watermark(5500));
>  // do a snapshot, close and restore again
>  OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
>  TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", 
> expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
>  testHarness.close();
>  testHarness = createTestHarness(operator);
>  testHarness.setup();
>  testHarness.initializeState(snapshot);
>  testHarness.open();
>  expectedOutput.clear();
>  //suppose the watermark alread arrived 1
>  testHarness.processWatermark(new Watermark(1));
>  //late element with timestamp 4500 had arrived,the new session window[0, 
> 7500] is still a valid window becase of maxtimestamp < cleantime
>  //and fired immediately
>  testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 6), 
> 4500));
>  expectedOutput.add(new Watermark(1));
>  expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-12", 0L, 7500L), 
> 7499));
>  //when a new watermark had arrived,the same TimeWindow[0, 7500] will fired 
> again becase of a new timer had rigstered by call triggerOnMerge
>  testHarness.processWatermark(new Watermark(11000));
>  expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-12", 0L, 7500L), 
> 7499));
>  expectedOutput.add(new Watermark(11000));
>  TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", 
> expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
>  testHarness.close();
> }
> {code}
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-9201) same merge window will be fired twice if watermark already passed the merge window

2018-04-26 Thread yuemeng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16453759#comment-16453759
 ] 

yuemeng edited comment on FLINK-9201 at 4/26/18 9:40 AM:
-

hi [~till.rohrmann]

can you help me to check this issue?

thanks a lot


was (Author: yuemeng):
hi [~till.rohrmann]

can you check this issue?

> same merge window will be fired twice if watermark already passed the merge 
> window
> --
>
> Key: FLINK-9201
> URL: https://issues.apache.org/jira/browse/FLINK-9201
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.3.3
>Reporter: yuemeng
>Assignee: yuemeng
>Priority: Blocker
>
> sum with session window,.suppose the session gap is 3 seconds and 
> allowedlateness is 60 seconds
>  * w1,TimeWindow[1,9] had elements,1,2,3,6,will be fired if watermark reached 
> 9
>  *  if a late element (w2,TimeWindow[7,10]) had come but the watermark 
> already at 11.
>  * w1,w2 will be merged a new window w3 TimeWindow[1,10] and will be register 
> a new timer by call triggerContext.onMerge(mergedWindows),w3 will be fired 
> first time by call triggerContext.onElement(element) because of the watermark 
> pass the w3. w3 will be fired second times because of the timer < current 
> watermark.
> that mean w3 will be fired  twice because of watermark pass the new merge 
> window w3.
> Examples
> {code:java}
> @Test
> @SuppressWarnings("unchecked")
> public void testSessionWindowsFiredTwice() throws Exception {
>  closeCalled.set(0);
>  final int sessionSize = 3;
>  TypeInformation> inputType = 
> TypeInfoParser.parse("Tuple2");
>  ListStateDescriptor> stateDesc = new 
> ListStateDescriptor<>("window-contents",
>  inputType.createSerializer(new ExecutionConfig()));
>  WindowOperator, Iterable Integer>>, Tuple3, TimeWindow> operator = new 
> WindowOperator<>(
>  EventTimeSessionWindows.withGap(Time.seconds(sessionSize)),
>  new TimeWindow.Serializer(),
>  new TupleKeySelector(),
>  BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
>  stateDesc,
>  new InternalIterableWindowFunction<>(new SessionWindowFunction()),
>  EventTimeTrigger.create(),
>  6,
>  null /* late data output tag */);
>  OneInputStreamOperatorTestHarness, Tuple3 Long, Long>> testHarness =
>  createTestHarness(operator);
>  ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>();
>  testHarness.open();
>  // add elements out-of-order
>  testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 0));
>  testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 2), 
> 1000));
>  testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 3), 
> 2500));
>  testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 10));
>  testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), 
> 1000));
>  testHarness.processWatermark(new Watermark(5500));
>  expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-6", 0L, 5500L), 
> 5499));
>  expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-3", 10L, 4000L), 
> 3999));
>  expectedOutput.add(new Watermark(5500));
>  // do a snapshot, close and restore again
>  OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
>  TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", 
> expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
>  testHarness.close();
>  testHarness = createTestHarness(operator);
>  testHarness.setup();
>  testHarness.initializeState(snapshot);
>  testHarness.open();
>  expectedOutput.clear();
>  //suppose the watermark alread arrived 1
>  testHarness.processWatermark(new Watermark(1));
>  //late element with timestamp 4500 had arrived,the new session window[0, 
> 7500] is still a valid window becase of maxtimestamp < cleantime
>  //and fired immediately
>  testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 6), 
> 4500));
>  expectedOutput.add(new Watermark(1));
>  expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-12", 0L, 7500L), 
> 7499));
>  //when a new watermark had arrived,the same TimeWindow[0, 7500] will fired 
> again becase of a new timer had rigstered by call triggerOnMerge
>  testHarness.processWatermark(new Watermark(11000));
>  expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-12", 0L, 7500L), 
> 7499));
>  expectedOutput.add(new Watermark(11000));
>  TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", 
> expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
>  testHarness.close();
> }
> {code}
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9201) same merge window will be fired twice if watermark already passed the merge window

2018-04-26 Thread yuemeng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16453759#comment-16453759
 ] 

yuemeng commented on FLINK-9201:


hi [~till.rohrmann]

can you check this issue?

> same merge window will be fired twice if watermark already passed the merge 
> window
> --
>
> Key: FLINK-9201
> URL: https://issues.apache.org/jira/browse/FLINK-9201
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.3.3
>Reporter: yuemeng
>Assignee: yuemeng
>Priority: Blocker
>
> sum with session window,.suppose the session gap is 3 seconds and 
> allowedlateness is 60 seconds
>  * w1,TimeWindow[1,9] had elements,1,2,3,6,will be fired if watermark reached 
> 9
>  *  if a late element (w2,TimeWindow[7,10]) had come but the watermark 
> already at 11.
>  * w1,w2 will be merged a new window w3 TimeWindow[1,10] and will be register 
> a new timer by call triggerContext.onMerge(mergedWindows),w3 will be fired 
> first time by call triggerContext.onElement(element) because of the watermark 
> pass the w3. w3 will be fired second times because of the timer < current 
> watermark.
> that mean w3 will be fired  twice because of watermark pass the new merge 
> window w3.
> Examples
> {code:java}
> @Test
> @SuppressWarnings("unchecked")
> public void testSessionWindowsFiredTwice() throws Exception {
>  closeCalled.set(0);
>  final int sessionSize = 3;
>  TypeInformation> inputType = 
> TypeInfoParser.parse("Tuple2");
>  ListStateDescriptor> stateDesc = new 
> ListStateDescriptor<>("window-contents",
>  inputType.createSerializer(new ExecutionConfig()));
>  WindowOperator, Iterable Integer>>, Tuple3, TimeWindow> operator = new 
> WindowOperator<>(
>  EventTimeSessionWindows.withGap(Time.seconds(sessionSize)),
>  new TimeWindow.Serializer(),
>  new TupleKeySelector(),
>  BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
>  stateDesc,
>  new InternalIterableWindowFunction<>(new SessionWindowFunction()),
>  EventTimeTrigger.create(),
>  6,
>  null /* late data output tag */);
>  OneInputStreamOperatorTestHarness, Tuple3 Long, Long>> testHarness =
>  createTestHarness(operator);
>  ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>();
>  testHarness.open();
>  // add elements out-of-order
>  testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 0));
>  testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 2), 
> 1000));
>  testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 3), 
> 2500));
>  testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 10));
>  testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), 
> 1000));
>  testHarness.processWatermark(new Watermark(5500));
>  expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-6", 0L, 5500L), 
> 5499));
>  expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-3", 10L, 4000L), 
> 3999));
>  expectedOutput.add(new Watermark(5500));
>  // do a snapshot, close and restore again
>  OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
>  TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", 
> expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
>  testHarness.close();
>  testHarness = createTestHarness(operator);
>  testHarness.setup();
>  testHarness.initializeState(snapshot);
>  testHarness.open();
>  expectedOutput.clear();
>  //suppose the watermark alread arrived 1
>  testHarness.processWatermark(new Watermark(1));
>  //late element with timestamp 4500 had arrived,the new session window[0, 
> 7500] is still a valid window becase of maxtimestamp < cleantime
>  //and fired immediately
>  testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 6), 
> 4500));
>  expectedOutput.add(new Watermark(1));
>  expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-12", 0L, 7500L), 
> 7499));
>  //when a new watermark had arrived,the same TimeWindow[0, 7500] will fired 
> again becase of a new timer had rigstered by call triggerOnMerge
>  testHarness.processWatermark(new Watermark(11000));
>  expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-12", 0L, 7500L), 
> 7499));
>  expectedOutput.add(new Watermark(11000));
>  TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", 
> expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
>  testHarness.close();
> }
> {code}
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9201) same merge window will be fired twice if watermark already passed the merge window

2018-05-02 Thread yuemeng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16460606#comment-16460606
 ] 

yuemeng commented on FLINK-9201:


[~jark],[~StephanEwen]

 

can you  check this issue?

thanks

 

> same merge window will be fired twice if watermark already passed the merge 
> window
> --
>
> Key: FLINK-9201
> URL: https://issues.apache.org/jira/browse/FLINK-9201
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.3.3
>Reporter: yuemeng
>Assignee: yuemeng
>Priority: Blocker
>
> sum with session window,.suppose the session gap is 3 seconds and 
> allowedlateness is 60 seconds
>  * w1,TimeWindow[1,9] had elements,1,2,3,6,will be fired if watermark reached 
> 9
>  *  if a late element (w2,TimeWindow[7,10]) had come but the watermark 
> already at 11.
>  * w1,w2 will be merged a new window w3 TimeWindow[1,10] and will be register 
> a new timer by call triggerContext.onMerge(mergedWindows),w3 will be fired 
> first time by call triggerContext.onElement(element) because of the watermark 
> pass the w3. w3 will be fired second times because of the timer < current 
> watermark.
> that mean w3 will be fired  twice because of watermark pass the new merge 
> window w3.
> Examples
> {code:java}
> @Test
> @SuppressWarnings("unchecked")
> public void testSessionWindowsFiredTwice() throws Exception {
>  closeCalled.set(0);
>  final int sessionSize = 3;
>  TypeInformation> inputType = 
> TypeInfoParser.parse("Tuple2");
>  ListStateDescriptor> stateDesc = new 
> ListStateDescriptor<>("window-contents",
>  inputType.createSerializer(new ExecutionConfig()));
>  WindowOperator, Iterable Integer>>, Tuple3, TimeWindow> operator = new 
> WindowOperator<>(
>  EventTimeSessionWindows.withGap(Time.seconds(sessionSize)),
>  new TimeWindow.Serializer(),
>  new TupleKeySelector(),
>  BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
>  stateDesc,
>  new InternalIterableWindowFunction<>(new SessionWindowFunction()),
>  EventTimeTrigger.create(),
>  6,
>  null /* late data output tag */);
>  OneInputStreamOperatorTestHarness, Tuple3 Long, Long>> testHarness =
>  createTestHarness(operator);
>  ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>();
>  testHarness.open();
>  // add elements out-of-order
>  testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 0));
>  testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 2), 
> 1000));
>  testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 3), 
> 2500));
>  testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 10));
>  testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), 
> 1000));
>  testHarness.processWatermark(new Watermark(5500));
>  expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-6", 0L, 5500L), 
> 5499));
>  expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-3", 10L, 4000L), 
> 3999));
>  expectedOutput.add(new Watermark(5500));
>  // do a snapshot, close and restore again
>  OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
>  TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", 
> expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
>  testHarness.close();
>  testHarness = createTestHarness(operator);
>  testHarness.setup();
>  testHarness.initializeState(snapshot);
>  testHarness.open();
>  expectedOutput.clear();
>  //suppose the watermark alread arrived 1
>  testHarness.processWatermark(new Watermark(1));
>  //late element with timestamp 4500 had arrived,the new session window[0, 
> 7500] is still a valid window becase of maxtimestamp < cleantime
>  //and fired immediately
>  testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 6), 
> 4500));
>  expectedOutput.add(new Watermark(1));
>  expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-12", 0L, 7500L), 
> 7499));
>  //when a new watermark had arrived,the same TimeWindow[0, 7500] will fired 
> again becase of a new timer had rigstered by call triggerOnMerge
>  testHarness.processWatermark(new Watermark(11000));
>  expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-12", 0L, 7500L), 
> 7499));
>  expectedOutput.add(new Watermark(11000));
>  TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", 
> expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
>  testHarness.close();
> }
> {code}
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-7146) FLINK SQLs support DDL

2018-05-04 Thread yuemeng (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7146?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuemeng closed FLINK-7146.
--
Resolution: Duplicate

> FLINK SQLs support DDL
> --
>
> Key: FLINK-7146
> URL: https://issues.apache.org/jira/browse/FLINK-7146
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: yuemeng
>Priority: Major
>
> For now,Flink SQL can't support DDL, we can only register a table by call 
> registerTableInternal in TableEnvironment
> we should support DDL for sql such as create a table or create function like:
> {code}
> CREATE TABLE kafka_source (
>   id INT,
>   price INT
> ) PROPERTIES (
>   category = 'source',
>   type = 'kafka',
>   version = '0.9.0.1',
>   separator = ',',
>   topic = 'test',
>   brokers = 'xx:9092',
>   group_id = 'test'
> );
> CREATE TABLE db_sink (
>   id INT,
>   price DOUBLE
> ) PROPERTIES (
>   category = 'sink',
>   type = 'mysql',
>   table_name = 'udaf_test',
>   url = 
> 'jdbc:mysql://127.0.0.1:3308/ds?useUnicode=true&characterEncoding=UTF8',
>   username = 'ds_dev',
>   password = 's]k51_(>R'
> );
> CREATE TEMPORARY function 'AVGUDAF' AS 
> 'com.x.server.codegen.aggregate.udaf.avg.IntegerAvgUDAF';
> INSERT INTO db_sink SELECT id ,AVGUDAF(price) FROM kafka_source group by id
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9201) same merge window will be fired twice if watermark already passed the merge window

2018-05-08 Thread yuemeng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16468219#comment-16468219
 ] 

yuemeng commented on FLINK-9201:


[~aljoscha]

thanks for your suggest

i will move the test to EventTimeTriggerTest

> same merge window will be fired twice if watermark already passed the merge 
> window
> --
>
> Key: FLINK-9201
> URL: https://issues.apache.org/jira/browse/FLINK-9201
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.3.3
>Reporter: yuemeng
>Assignee: yuemeng
>Priority: Blocker
> Fix For: 1.6.0
>
>
> sum with session window,.suppose the session gap is 3 seconds and 
> allowedlateness is 60 seconds
>  * w1,TimeWindow[1,9] had elements,1,2,3,6,will be fired if watermark reached 
> 9
>  *  if a late element (w2,TimeWindow[7,10]) had come but the watermark 
> already at 11.
>  * w1,w2 will be merged a new window w3 TimeWindow[1,10] and will be register 
> a new timer by call triggerContext.onMerge(mergedWindows),w3 will be fired 
> first time by call triggerContext.onElement(element) because of the watermark 
> pass the w3. w3 will be fired second times because of the timer < current 
> watermark.
> that mean w3 will be fired  twice because of watermark pass the new merge 
> window w3.
> Examples
> {code:java}
> @Test
> @SuppressWarnings("unchecked")
> public void testSessionWindowsFiredTwice() throws Exception {
>  closeCalled.set(0);
>  final int sessionSize = 3;
>  TypeInformation> inputType = 
> TypeInfoParser.parse("Tuple2");
>  ListStateDescriptor> stateDesc = new 
> ListStateDescriptor<>("window-contents",
>  inputType.createSerializer(new ExecutionConfig()));
>  WindowOperator, Iterable Integer>>, Tuple3, TimeWindow> operator = new 
> WindowOperator<>(
>  EventTimeSessionWindows.withGap(Time.seconds(sessionSize)),
>  new TimeWindow.Serializer(),
>  new TupleKeySelector(),
>  BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
>  stateDesc,
>  new InternalIterableWindowFunction<>(new SessionWindowFunction()),
>  EventTimeTrigger.create(),
>  6,
>  null /* late data output tag */);
>  OneInputStreamOperatorTestHarness, Tuple3 Long, Long>> testHarness =
>  createTestHarness(operator);
>  ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>();
>  testHarness.open();
>  // add elements out-of-order
>  testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 0));
>  testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 2), 
> 1000));
>  testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 3), 
> 2500));
>  testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 10));
>  testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), 
> 1000));
>  testHarness.processWatermark(new Watermark(5500));
>  expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-6", 0L, 5500L), 
> 5499));
>  expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-3", 10L, 4000L), 
> 3999));
>  expectedOutput.add(new Watermark(5500));
>  // do a snapshot, close and restore again
>  OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
>  TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", 
> expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
>  testHarness.close();
>  testHarness = createTestHarness(operator);
>  testHarness.setup();
>  testHarness.initializeState(snapshot);
>  testHarness.open();
>  expectedOutput.clear();
>  //suppose the watermark alread arrived 1
>  testHarness.processWatermark(new Watermark(1));
>  //late element with timestamp 4500 had arrived,the new session window[0, 
> 7500] is still a valid window becase of maxtimestamp < cleantime
>  //and fired immediately
>  testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 6), 
> 4500));
>  expectedOutput.add(new Watermark(1));
>  expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-12", 0L, 7500L), 
> 7499));
>  //when a new watermark had arrived,the same TimeWindow[0, 7500] will fired 
> again becase of a new timer had rigstered by call triggerOnMerge
>  testHarness.processWatermark(new Watermark(11000));
>  expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-12", 0L, 7500L), 
> 7499));
>  expectedOutput.add(new Watermark(11000));
>  TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", 
> expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
>  testHarness.close();
> }
> {code}
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-9201) same merge window will be fired twice if watermark already passed the merge window

2018-05-08 Thread yuemeng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16468219#comment-16468219
 ] 

yuemeng edited comment on FLINK-9201 at 5/9/18 2:12 AM:


[~aljoscha]

thanks for your suggest

i will add a new test case  to EventTimeTriggerTest,but still keep the old test 
in WindowOpertatorTest because of olny merge window will occur the probelm now


was (Author: yuemeng):
[~aljoscha]

thanks for your suggest

i will move the test to EventTimeTriggerTest

> same merge window will be fired twice if watermark already passed the merge 
> window
> --
>
> Key: FLINK-9201
> URL: https://issues.apache.org/jira/browse/FLINK-9201
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.3.3
>Reporter: yuemeng
>Assignee: yuemeng
>Priority: Blocker
> Fix For: 1.6.0
>
>
> sum with session window,.suppose the session gap is 3 seconds and 
> allowedlateness is 60 seconds
>  * w1,TimeWindow[1,9] had elements,1,2,3,6,will be fired if watermark reached 
> 9
>  *  if a late element (w2,TimeWindow[7,10]) had come but the watermark 
> already at 11.
>  * w1,w2 will be merged a new window w3 TimeWindow[1,10] and will be register 
> a new timer by call triggerContext.onMerge(mergedWindows),w3 will be fired 
> first time by call triggerContext.onElement(element) because of the watermark 
> pass the w3. w3 will be fired second times because of the timer < current 
> watermark.
> that mean w3 will be fired  twice because of watermark pass the new merge 
> window w3.
> Examples
> {code:java}
> @Test
> @SuppressWarnings("unchecked")
> public void testSessionWindowsFiredTwice() throws Exception {
>  closeCalled.set(0);
>  final int sessionSize = 3;
>  TypeInformation> inputType = 
> TypeInfoParser.parse("Tuple2");
>  ListStateDescriptor> stateDesc = new 
> ListStateDescriptor<>("window-contents",
>  inputType.createSerializer(new ExecutionConfig()));
>  WindowOperator, Iterable Integer>>, Tuple3, TimeWindow> operator = new 
> WindowOperator<>(
>  EventTimeSessionWindows.withGap(Time.seconds(sessionSize)),
>  new TimeWindow.Serializer(),
>  new TupleKeySelector(),
>  BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
>  stateDesc,
>  new InternalIterableWindowFunction<>(new SessionWindowFunction()),
>  EventTimeTrigger.create(),
>  6,
>  null /* late data output tag */);
>  OneInputStreamOperatorTestHarness, Tuple3 Long, Long>> testHarness =
>  createTestHarness(operator);
>  ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>();
>  testHarness.open();
>  // add elements out-of-order
>  testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 0));
>  testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 2), 
> 1000));
>  testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 3), 
> 2500));
>  testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 10));
>  testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), 
> 1000));
>  testHarness.processWatermark(new Watermark(5500));
>  expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-6", 0L, 5500L), 
> 5499));
>  expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-3", 10L, 4000L), 
> 3999));
>  expectedOutput.add(new Watermark(5500));
>  // do a snapshot, close and restore again
>  OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
>  TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", 
> expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
>  testHarness.close();
>  testHarness = createTestHarness(operator);
>  testHarness.setup();
>  testHarness.initializeState(snapshot);
>  testHarness.open();
>  expectedOutput.clear();
>  //suppose the watermark alread arrived 1
>  testHarness.processWatermark(new Watermark(1));
>  //late element with timestamp 4500 had arrived,the new session window[0, 
> 7500] is still a valid window becase of maxtimestamp < cleantime
>  //and fired immediately
>  testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 6), 
> 4500));
>  expectedOutput.add(new Watermark(1));
>  expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-12", 0L, 7500L), 
> 7499));
>  //when a new watermark had arrived,the same TimeWindow[0, 7500] will fired 
> again becase of a new timer had rigstered by call triggerOnMerge
>  testHarness.processWatermark(new Watermark(11000));
>  expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-12", 0L, 7500L), 
> 7499));
>  expectedOutput.add(new Watermark(11000));
>  TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", 
> expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
>  

[jira] [Created] (FLINK-9329) hasRowtimeAttribute will throw NPE if user use setProctimeAttribute for table source

2018-05-10 Thread yuemeng (JIRA)
yuemeng created FLINK-9329:
--

 Summary: hasRowtimeAttribute will throw NPE if user use 
setProctimeAttribute for table source
 Key: FLINK-9329
 URL: https://issues.apache.org/jira/browse/FLINK-9329
 Project: Flink
  Issue Type: Bug
  Components: Table API & SQL
Reporter: yuemeng
Assignee: yuemeng


{{{code}}}

{{KafkaTableSource source = Kafka010JsonTableSource.builder() // ...  
.withSchema(TableSchema.builder() .field("sensorId", Types.LONG()) 
.field("temp", Types.DOUBLE()) // field "ptime" is of type SQL_TIMESTAMP 
.field("ptime", Types.SQL_TIMESTAMP()).build()) // declare "ptime" as 
processing time attribute .withProctimeAttribute("ptime") .build();}}

tableEnv.registerTableSource("flights", kafkaTableSource);

{{{code}}}

{{ }}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9329) hasRowtimeAttribute will throw NPE if user use setProctimeAttribute for table source

2018-05-10 Thread yuemeng (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9329?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuemeng updated FLINK-9329:
---
Description: 
{code:java}
KafkaTableSource source = Kafka010JsonTableSource.builder()
.withSchema(TableSchema.builder()
.field("sensorId", Types.LONG()) 
.field("temp", Types.DOUBLE())
.field("ptime", Types.SQL_TIMESTAMP()).build())
.withProctimeAttribute("ptime")
.build(); tableEnv.registerTableSource("flights", source ); {code}
{{ }}
{code:java}
Kafka010JsonTableSource implement the DefinedRowtimeAttributes when 
TableSourceUtil.hasRowtimeAttribute(soource) will call 
/** Returns a list with all rowtime attribute names of the [[TableSource]]. */
private def getRowtimeAttributes(tableSource: TableSource[_]): Array[String] = {
  tableSource match {
case r: DefinedRowtimeAttributes =>
  r.getRowtimeAttributeDescriptors.asScala.map(_.getAttributeName).toArray
case _ =>
  Array()
  }
}

 r.getRowtimeAttributeDescriptors will throw NPE because of we use 
ProctimeAttribute here

{code}

  was:
{{{code}}}

{{KafkaTableSource source = Kafka010JsonTableSource.builder() // ...  
.withSchema(TableSchema.builder() .field("sensorId", Types.LONG()) 
.field("temp", Types.DOUBLE()) // field "ptime" is of type SQL_TIMESTAMP 
.field("ptime", Types.SQL_TIMESTAMP()).build()) // declare "ptime" as 
processing time attribute .withProctimeAttribute("ptime") .build();}}

tableEnv.registerTableSource("flights", kafkaTableSource);

{{{code}}}

{{ }}


> hasRowtimeAttribute will throw NPE if user use setProctimeAttribute for table 
> source
> 
>
> Key: FLINK-9329
> URL: https://issues.apache.org/jira/browse/FLINK-9329
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: yuemeng
>Assignee: yuemeng
>Priority: Critical
>
> {code:java}
> KafkaTableSource source = Kafka010JsonTableSource.builder()
> .withSchema(TableSchema.builder()
> .field("sensorId", Types.LONG()) 
> .field("temp", Types.DOUBLE())
> .field("ptime", Types.SQL_TIMESTAMP()).build())
> .withProctimeAttribute("ptime")
> .build(); tableEnv.registerTableSource("flights", source ); {code}
> {{ }}
> {code:java}
> Kafka010JsonTableSource implement the DefinedRowtimeAttributes when 
> TableSourceUtil.hasRowtimeAttribute(soource) will call 
> /** Returns a list with all rowtime attribute names of the [[TableSource]]. */
> private def getRowtimeAttributes(tableSource: TableSource[_]): Array[String] 
> = {
>   tableSource match {
> case r: DefinedRowtimeAttributes =>
>   r.getRowtimeAttributeDescriptors.asScala.map(_.getAttributeName).toArray
> case _ =>
>   Array()
>   }
> }
>  r.getRowtimeAttributeDescriptors will throw NPE because of we use 
> ProctimeAttribute here
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9329) hasRowtimeAttribute will throw NPE if user use setProctimeAttribute for table source

2018-05-10 Thread yuemeng (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9329?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuemeng updated FLINK-9329:
---
Description: 
{code:java}
Examples:
KafkaTableSource source = Kafka010JsonTableSource.builder()
.withSchema(TableSchema.builder()
.field("sensorId", Types.LONG()) 
.field("temp", Types.DOUBLE())
.field("ptime", Types.SQL_TIMESTAMP()).build())
.withProctimeAttribute("ptime")
.build(); tableEnv.registerTableSource("flights", source ); {code}
{{ }}
{code:java}
Kafka010JsonTableSource implement the DefinedRowtimeAttributes .
so when TableSourceUtil call hasRowtimeAttribute(source)to check ,it will call 
follow code
/** Returns a list with all rowtime attribute names of the [[TableSource]]. */
private def getRowtimeAttributes(tableSource: TableSource[_]): Array[String] = {
  tableSource match {
case r: DefinedRowtimeAttributes =>
  r.getRowtimeAttributeDescriptors.asScala.map(_.getAttributeName).toArray
case _ =>
  Array()
  }
}

 r.getRowtimeAttributeDescriptors will throw NPE because of we use 
ProctimeAttribute here

{code}

  was:
{code:java}
KafkaTableSource source = Kafka010JsonTableSource.builder()
.withSchema(TableSchema.builder()
.field("sensorId", Types.LONG()) 
.field("temp", Types.DOUBLE())
.field("ptime", Types.SQL_TIMESTAMP()).build())
.withProctimeAttribute("ptime")
.build(); tableEnv.registerTableSource("flights", source ); {code}
{{ }}
{code:java}
Kafka010JsonTableSource implement the DefinedRowtimeAttributes when 
TableSourceUtil.hasRowtimeAttribute(soource) will call 
/** Returns a list with all rowtime attribute names of the [[TableSource]]. */
private def getRowtimeAttributes(tableSource: TableSource[_]): Array[String] = {
  tableSource match {
case r: DefinedRowtimeAttributes =>
  r.getRowtimeAttributeDescriptors.asScala.map(_.getAttributeName).toArray
case _ =>
  Array()
  }
}

 r.getRowtimeAttributeDescriptors will throw NPE because of we use 
ProctimeAttribute here

{code}


> hasRowtimeAttribute will throw NPE if user use setProctimeAttribute for table 
> source
> 
>
> Key: FLINK-9329
> URL: https://issues.apache.org/jira/browse/FLINK-9329
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: yuemeng
>Assignee: yuemeng
>Priority: Critical
>
> {code:java}
> Examples:
> KafkaTableSource source = Kafka010JsonTableSource.builder()
> .withSchema(TableSchema.builder()
> .field("sensorId", Types.LONG()) 
> .field("temp", Types.DOUBLE())
> .field("ptime", Types.SQL_TIMESTAMP()).build())
> .withProctimeAttribute("ptime")
> .build(); tableEnv.registerTableSource("flights", source ); {code}
> {{ }}
> {code:java}
> Kafka010JsonTableSource implement the DefinedRowtimeAttributes .
> so when TableSourceUtil call hasRowtimeAttribute(source)to check ,it will 
> call follow code
> /** Returns a list with all rowtime attribute names of the [[TableSource]]. */
> private def getRowtimeAttributes(tableSource: TableSource[_]): Array[String] 
> = {
>   tableSource match {
> case r: DefinedRowtimeAttributes =>
>   r.getRowtimeAttributeDescriptors.asScala.map(_.getAttributeName).toArray
> case _ =>
>   Array()
>   }
> }
>  r.getRowtimeAttributeDescriptors will throw NPE because of we use 
> ProctimeAttribute here
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-14666) support multiple catalog in flink table sql

2019-11-07 Thread yuemeng (Jira)
yuemeng created FLINK-14666:
---

 Summary: support multiple catalog in flink table sql
 Key: FLINK-14666
 URL: https://issues.apache.org/jira/browse/FLINK-14666
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.9.1, 1.9.0, 1.8.2, 1.8.0
Reporter: yuemeng


currently, calcite will only use the current catalog as schema path to validate 
sql node,
maybe this is not reasonable

{code}
tableEnvironment.useCatalog("user_catalog");
tableEnvironment.useDatabase("user_db");
 Table table = tableEnvironment.sqlQuery("SELECT action, os,count(*) as cnt 
from music_queue_3 group by action, os,tumble(proctime, INTERVAL '10' 
SECOND)"); tableEnvironment.registerTable("v1", table);
Table t2 = tableEnvironment.sqlQuery("select action, os, 1 as cnt from v1");
tableEnvironment.registerTable("v2", t2);
tableEnvironment.sqlUpdate("INSERT INTO database2.kafka_table_test1 SELECT 
action, os,cast (cnt as BIGINT) as cnt from v2");
{code}

suppose source table music_queue_3  and sink table kafka_table_test1 both in 
user_catalog
 catalog 
 but some temp table or view such as v1, v2,v3 will register in default catalog.

when we select temp table v2 and insert it into our own catalog table 
database2.kafka_table_test1 
it always failed with sql node validate, because of schema path in
catalog reader is the current catalog without default catalog,the temp table or 
view will never be Identified
















--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14666) support multiple catalog in flink table sql

2019-11-07 Thread yuemeng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16969842#comment-16969842
 ] 

yuemeng commented on FLINK-14666:
-

[~ykt836][~jark]
can you check this issue, thanks

> support multiple catalog in flink table sql
> ---
>
> Key: FLINK-14666
> URL: https://issues.apache.org/jira/browse/FLINK-14666
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.8.0, 1.8.2, 1.9.0, 1.9.1
>Reporter: yuemeng
>Priority: Critical
>  Labels: pull-request-available
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> currently, calcite will only use the current catalog as schema path to 
> validate sql node,
> maybe this is not reasonable
> {code}
> tableEnvironment.useCatalog("user_catalog");
> tableEnvironment.useDatabase("user_db");
>  Table table = tableEnvironment.sqlQuery("SELECT action, os,count(*) as cnt 
> from music_queue_3 group by action, os,tumble(proctime, INTERVAL '10' 
> SECOND)"); tableEnvironment.registerTable("v1", table);
> Table t2 = tableEnvironment.sqlQuery("select action, os, 1 as cnt from v1");
> tableEnvironment.registerTable("v2", t2);
> tableEnvironment.sqlUpdate("INSERT INTO database2.kafka_table_test1 SELECT 
> action, os,cast (cnt as BIGINT) as cnt from v2");
> {code}
> suppose source table music_queue_3  and sink table kafka_table_test1 both in 
> user_catalog
>  catalog 
>  but some temp table or view such as v1, v2,v3 will register in default 
> catalog.
> when we select temp table v2 and insert it into our own catalog table 
> database2.kafka_table_test1 
> it always failed with sql node validate, because of schema path in
> catalog reader is the current catalog without default catalog,the temp table 
> or view will never be Identified



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-14666) support multiple catalog in flink table sql

2019-11-07 Thread yuemeng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16969842#comment-16969842
 ] 

yuemeng edited comment on FLINK-14666 at 11/8/19 5:27 AM:
--

[~ykt836][~jark]
can you check this issue, if some of you agree this is an issue
can you assign it to me, I had already fixed it on the latest version


was (Author: yuemeng):
[~ykt836][~jark]
can you check this issue, thanks

> support multiple catalog in flink table sql
> ---
>
> Key: FLINK-14666
> URL: https://issues.apache.org/jira/browse/FLINK-14666
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.8.0, 1.8.2, 1.9.0, 1.9.1
>Reporter: yuemeng
>Priority: Critical
>  Labels: pull-request-available
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> currently, calcite will only use the current catalog as schema path to 
> validate sql node,
> maybe this is not reasonable
> {code}
> tableEnvironment.useCatalog("user_catalog");
> tableEnvironment.useDatabase("user_db");
>  Table table = tableEnvironment.sqlQuery("SELECT action, os,count(*) as cnt 
> from music_queue_3 group by action, os,tumble(proctime, INTERVAL '10' 
> SECOND)"); tableEnvironment.registerTable("v1", table);
> Table t2 = tableEnvironment.sqlQuery("select action, os, 1 as cnt from v1");
> tableEnvironment.registerTable("v2", t2);
> tableEnvironment.sqlUpdate("INSERT INTO database2.kafka_table_test1 SELECT 
> action, os,cast (cnt as BIGINT) as cnt from v2");
> {code}
> suppose source table music_queue_3  and sink table kafka_table_test1 both in 
> user_catalog
>  catalog 
>  but some temp table or view such as v1, v2,v3 will register in default 
> catalog.
> when we select temp table v2 and insert it into our own catalog table 
> database2.kafka_table_test1 
> it always failed with sql node validate, because of schema path in
> catalog reader is the current catalog without default catalog,the temp table 
> or view will never be Identified



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14666) support multiple catalog in flink table sql

2019-11-08 Thread yuemeng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16969951#comment-16969951
 ] 

yuemeng commented on FLINK-14666:
-

[~ykt836] but when you select some field from temporary table to our own 
catalog table
because of these two tables not in the same catalog, so sql node validate can't 
be passed

> support multiple catalog in flink table sql
> ---
>
> Key: FLINK-14666
> URL: https://issues.apache.org/jira/browse/FLINK-14666
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.8.0, 1.8.2, 1.9.0, 1.9.1
>Reporter: yuemeng
>Priority: Critical
>  Labels: pull-request-available
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> currently, calcite will only use the current catalog as schema path to 
> validate sql node,
> maybe this is not reasonable
> {code}
> tableEnvironment.useCatalog("user_catalog");
> tableEnvironment.useDatabase("user_db");
>  Table table = tableEnvironment.sqlQuery("SELECT action, os,count(*) as cnt 
> from music_queue_3 group by action, os,tumble(proctime, INTERVAL '10' 
> SECOND)"); tableEnvironment.registerTable("v1", table);
> Table t2 = tableEnvironment.sqlQuery("select action, os, 1 as cnt from v1");
> tableEnvironment.registerTable("v2", t2);
> tableEnvironment.sqlUpdate("INSERT INTO database2.kafka_table_test1 SELECT 
> action, os,cast (cnt as BIGINT) as cnt from v2");
> {code}
> suppose source table music_queue_3  and sink table kafka_table_test1 both in 
> user_catalog
>  catalog 
>  but some temp table or view such as v1, v2,v3 will register in default 
> catalog.
> when we select temp table v2 and insert it into our own catalog table 
> database2.kafka_table_test1 
> it always failed with sql node validate, because of schema path in
> catalog reader is the current catalog without default catalog,the temp table 
> or view will never be Identified



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14666) support multiple catalog in flink table sql

2019-11-08 Thread yuemeng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16970052#comment-16970052
 ] 

yuemeng commented on FLINK-14666:
-

[~dwysakowicz] thanks for your reply
 The fully qualified name will work well
but for flink 1.9.0,temporary tables will also need fully qualified such as
{code}
tableEnvironment.registerTable("v1", table);
Table t2 = tableEnvironment.sqlQuery("select action, os, 1 as cnt from 
default_catalog.default_database.v1");
{code}
because of buildin catalog named default_catalog.and all temporary view 
register to it.
there may be not a problem after FLIP-64


> support multiple catalog in flink table sql
> ---
>
> Key: FLINK-14666
> URL: https://issues.apache.org/jira/browse/FLINK-14666
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.8.0, 1.8.2, 1.9.0, 1.9.1
>Reporter: yuemeng
>Priority: Critical
>  Labels: pull-request-available
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> currently, calcite will only use the current catalog as schema path to 
> validate sql node,
> maybe this is not reasonable
> {code}
> tableEnvironment.useCatalog("user_catalog");
> tableEnvironment.useDatabase("user_db");
>  Table table = tableEnvironment.sqlQuery("SELECT action, os,count(*) as cnt 
> from music_queue_3 group by action, os,tumble(proctime, INTERVAL '10' 
> SECOND)"); tableEnvironment.registerTable("v1", table);
> Table t2 = tableEnvironment.sqlQuery("select action, os, 1 as cnt from v1");
> tableEnvironment.registerTable("v2", t2);
> tableEnvironment.sqlUpdate("INSERT INTO database2.kafka_table_test1 SELECT 
> action, os,cast (cnt as BIGINT) as cnt from v2");
> {code}
> suppose source table music_queue_3  and sink table kafka_table_test1 both in 
> user_catalog
>  catalog 
>  but some temp table or view such as v1, v2,v3 will register in default 
> catalog.
> when we select temp table v2 and insert it into our own catalog table 
> database2.kafka_table_test1 
> it always failed with sql node validate, because of schema path in
> catalog reader is the current catalog without default catalog,the temp table 
> or view will never be Identified



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-14666) support multiple catalog in flink table sql

2019-11-08 Thread yuemeng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16970052#comment-16970052
 ] 

yuemeng edited comment on FLINK-14666 at 11/8/19 10:40 AM:
---

[~dwysakowicz] thanks for your reply
 The fully qualified name will work well
but for flink 1.9.0,temporary tables will also need fully qualified such as
{code}
tableEnvironment.registerTable("v1", table);
Table t2 = tableEnvironment.sqlQuery("select action, os, 1 as cnt from 
default_catalog.default_database.v1");
{code}
because of buildin catalog named default_catalog and all temporary view 
register to it.
there may be not a problem after FLIP-64



was (Author: yuemeng):
[~dwysakowicz] thanks for your reply
 The fully qualified name will work well
but for flink 1.9.0,temporary tables will also need fully qualified such as
{code}
tableEnvironment.registerTable("v1", table);
Table t2 = tableEnvironment.sqlQuery("select action, os, 1 as cnt from 
default_catalog.default_database.v1");
{code}
because of buildin catalog named default_catalog.and all temporary view 
register to it.
there may be not a problem after FLIP-64


> support multiple catalog in flink table sql
> ---
>
> Key: FLINK-14666
> URL: https://issues.apache.org/jira/browse/FLINK-14666
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.8.0, 1.8.2, 1.9.0, 1.9.1
>Reporter: yuemeng
>Priority: Critical
>  Labels: pull-request-available
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> currently, calcite will only use the current catalog as schema path to 
> validate sql node,
> maybe this is not reasonable
> {code}
> tableEnvironment.useCatalog("user_catalog");
> tableEnvironment.useDatabase("user_db");
>  Table table = tableEnvironment.sqlQuery("SELECT action, os,count(*) as cnt 
> from music_queue_3 group by action, os,tumble(proctime, INTERVAL '10' 
> SECOND)"); tableEnvironment.registerTable("v1", table);
> Table t2 = tableEnvironment.sqlQuery("select action, os, 1 as cnt from v1");
> tableEnvironment.registerTable("v2", t2);
> tableEnvironment.sqlUpdate("INSERT INTO database2.kafka_table_test1 SELECT 
> action, os,cast (cnt as BIGINT) as cnt from v2");
> {code}
> suppose source table music_queue_3  and sink table kafka_table_test1 both in 
> user_catalog
>  catalog 
>  but some temp table or view such as v1, v2,v3 will register in default 
> catalog.
> when we select temp table v2 and insert it into our own catalog table 
> database2.kafka_table_test1 
> it always failed with sql node validate, because of schema path in
> catalog reader is the current catalog without default catalog,the temp table 
> or view will never be Identified



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14666) support multiple catalog in flink table sql

2019-11-10 Thread yuemeng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16971278#comment-16971278
 ] 

yuemeng commented on FLINK-14666:
-

[~dwysakowicz]  I will close the issue because of your new feature in the 
latest master

> support multiple catalog in flink table sql
> ---
>
> Key: FLINK-14666
> URL: https://issues.apache.org/jira/browse/FLINK-14666
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.8.0, 1.8.2, 1.9.0, 1.9.1
>Reporter: yuemeng
>Priority: Critical
>  Labels: pull-request-available
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> currently, calcite will only use the current catalog as schema path to 
> validate sql node,
> maybe this is not reasonable
> {code}
> tableEnvironment.useCatalog("user_catalog");
> tableEnvironment.useDatabase("user_db");
>  Table table = tableEnvironment.sqlQuery("SELECT action, os,count(*) as cnt 
> from music_queue_3 group by action, os,tumble(proctime, INTERVAL '10' 
> SECOND)"); tableEnvironment.registerTable("v1", table);
> Table t2 = tableEnvironment.sqlQuery("select action, os, 1 as cnt from v1");
> tableEnvironment.registerTable("v2", t2);
> tableEnvironment.sqlUpdate("INSERT INTO database2.kafka_table_test1 SELECT 
> action, os,cast (cnt as BIGINT) as cnt from v2");
> {code}
> suppose source table music_queue_3  and sink table kafka_table_test1 both in 
> user_catalog
>  catalog 
>  but some temp table or view such as v1, v2,v3 will register in default 
> catalog.
> when we select temp table v2 and insert it into our own catalog table 
> database2.kafka_table_test1 
> it always failed with sql node validate, because of schema path in
> catalog reader is the current catalog without default catalog,the temp table 
> or view will never be Identified



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-14666) support multiple catalog in flink table sql

2019-11-10 Thread yuemeng (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14666?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuemeng closed FLINK-14666.
---
Resolution: Implemented

> support multiple catalog in flink table sql
> ---
>
> Key: FLINK-14666
> URL: https://issues.apache.org/jira/browse/FLINK-14666
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.8.0, 1.8.2, 1.9.0, 1.9.1
>Reporter: yuemeng
>Priority: Critical
>  Labels: pull-request-available
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> currently, calcite will only use the current catalog as schema path to 
> validate sql node,
> maybe this is not reasonable
> {code}
> tableEnvironment.useCatalog("user_catalog");
> tableEnvironment.useDatabase("user_db");
>  Table table = tableEnvironment.sqlQuery("SELECT action, os,count(*) as cnt 
> from music_queue_3 group by action, os,tumble(proctime, INTERVAL '10' 
> SECOND)"); tableEnvironment.registerTable("v1", table);
> Table t2 = tableEnvironment.sqlQuery("select action, os, 1 as cnt from v1");
> tableEnvironment.registerTable("v2", t2);
> tableEnvironment.sqlUpdate("INSERT INTO database2.kafka_table_test1 SELECT 
> action, os,cast (cnt as BIGINT) as cnt from v2");
> {code}
> suppose source table music_queue_3  and sink table kafka_table_test1 both in 
> user_catalog
>  catalog 
>  but some temp table or view such as v1, v2,v3 will register in default 
> catalog.
> when we select temp table v2 and insert it into our own catalog table 
> database2.kafka_table_test1 
> it always failed with sql node validate, because of schema path in
> catalog reader is the current catalog without default catalog,the temp table 
> or view will never be Identified



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20935) can't write flink configuration to tmp file and add it to local resource in yarn session mode

2021-01-11 Thread yuemeng (Jira)
yuemeng created FLINK-20935:
---

 Summary: can't write flink configuration to tmp file and add it to 
local resource in yarn session mode
 Key: FLINK-20935
 URL: https://issues.apache.org/jira/browse/FLINK-20935
 Project: Flink
  Issue Type: Bug
  Components: Deployment / YARN
Affects Versions: 1.12.0, 1.13.0
Reporter: yuemeng


In flink 1.12.0 or lastest version,when we execute command such as 
bin/yarn-session.sh -n 20 -jm 9096 -nm 4096 -st,the depoy will be failed with 
follow errors:
{code}
org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't deploy 
Yarn session cluster
at 
org.apache.flink.yarn.YarnClusterDescriptor.deploySessionCluster(YarnClusterDescriptor.java:411)
at 
org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:498)
at 
org.apache.flink.yarn.cli.FlinkYarnSessionCli.lambda$main$4(FlinkYarnSessionCli.java:730)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at 
org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:730)
Caused by: java.io.FileNotFoundException: File does not exist: 
/tmp/application_1573723355201_0036-flink-conf.yaml688141408443326132.tmp
at 
org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1309)
{code}

when we called startAppMaster method in YarnClusterDescriptor,it will be try to 
write flink configuration to tmp file and add it to local resource. but the 
follow code will make the tmp file system as a distribute file system
{code}
// Upload the flink configuration
// write out configuration file
File tmpConfigurationFile = null;
try {
tmpConfigurationFile = File.createTempFile(appId + 
"-flink-conf.yaml", null);
BootstrapTools.writeConfiguration(configuration, 
tmpConfigurationFile);

String flinkConfigKey = "flink-conf.yaml";
fileUploader.registerSingleLocalResource(
flinkConfigKey,
new 
Path(tmpConfigurationFile.getAbsolutePath()),
"",
LocalResourceType.FILE,
true,
true);

classPathBuilder.append("flink-conf.yaml").append(File.pathSeparator);
} finally {
if (tmpConfigurationFile != null && 
!tmpConfigurationFile.delete()) {
LOG.warn("Fail to delete temporary file {}.", 
tmpConfigurationFile.toPath());
}
}
{code}

{code} tmpConfigurationFile.getAbsolutePath() {code} method will be return a 
path without file schema and the file system will be considered as a distribute 
file system







--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-20935) can't write flink configuration to tmp file and add it to local resource in yarn session mode

2021-01-12 Thread yuemeng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17263861#comment-17263861
 ] 

yuemeng commented on FLINK-20935:
-

[~fly_in_gis],yes ,i set fs.default-scheme to hdfs, but create tmp file to 
write flink configuration  and add it to local resource don‘t need  rely on 
this configuration just  like the way we handle jobGraph.

{code}

// write job graph to tmp file and add it to local resource
// TODO: server use user main method to generate job graph
if (jobGraph != null) {
 File tmpJobGraphFile = null;
 try {
 tmpJobGraphFile = File.createTempFile(appId.toString(), null);
 try (FileOutputStream output = new FileOutputStream(tmpJobGraphFile);
 ObjectOutputStream obOutput = new ObjectOutputStream(output)) {
 obOutput.writeObject(jobGraph);
 }

 final String jobGraphFilename = "job.graph";
 configuration.setString(JOB_GRAPH_FILE_PATH, jobGraphFilename);

 fileUploader.registerSingleLocalResource(
 jobGraphFilename,
 new Path(tmpJobGraphFile.toURI()),
 "",
 LocalResourceType.FILE,
 true,
 false);
 classPathBuilder.append(jobGraphFilename).append(File.pathSeparator);
 } catch (Exception e) {
 LOG.warn("Add job graph to local resource fail.");
 throw e;
 } finally {
 if (tmpJobGraphFile != null && !tmpJobGraphFile.delete()) {
 LOG.warn("Fail to delete temporary file {}.", tmpJobGraphFile.toPath());
 }
 }
}{code}

> can't write flink configuration to tmp file and add it to local resource in 
> yarn session mode
> -
>
> Key: FLINK-20935
> URL: https://issues.apache.org/jira/browse/FLINK-20935
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / YARN
>Affects Versions: 1.12.0, 1.13.0
>Reporter: yuemeng
>Priority: Major
>  Labels: pull-request-available
>
> In flink 1.12.0 or lastest version,when we execute command such as 
> bin/yarn-session.sh -n 20 -jm 9096 -nm 4096 -st,the depoy will be failed with 
> follow errors:
> {code}
> org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't 
> deploy Yarn session cluster
>   at 
> org.apache.flink.yarn.YarnClusterDescriptor.deploySessionCluster(YarnClusterDescriptor.java:411)
>   at 
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:498)
>   at 
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.lambda$main$4(FlinkYarnSessionCli.java:730)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:422)
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
>   at 
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>   at 
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:730)
> Caused by: java.io.FileNotFoundException: File does not exist: 
> /tmp/application_1573723355201_0036-flink-conf.yaml688141408443326132.tmp
>   at 
> org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1309)
> {code}
> when we called startAppMaster method in YarnClusterDescriptor,it will be try 
> to write flink configuration to tmp file and add it to local resource. but 
> the follow code will make the tmp file system as a distribute file system
> {code}
> // Upload the flink configuration
>   // write out configuration file
>   File tmpConfigurationFile = null;
>   try {
>   tmpConfigurationFile = File.createTempFile(appId + 
> "-flink-conf.yaml", null);
>   BootstrapTools.writeConfiguration(configuration, 
> tmpConfigurationFile);
>   String flinkConfigKey = "flink-conf.yaml";
>   fileUploader.registerSingleLocalResource(
>   flinkConfigKey,
>   new 
> Path(tmpConfigurationFile.getAbsolutePath()),
>   "",
>   LocalResourceType.FILE,
>   true,
>   true);
>   
> classPathBuilder.append("flink-conf.yaml").append(File.pathSeparator);
>   } finally {
>   if (tmpConfigurationFile != null && 
> !tmpConfigurationFile.delete()) {
>   LOG.warn("Fail to delete temporary file {}.", 
> tmpConfigurationFile.toPath());
>   }
>   }
> {code}
> {code} tmpConfigurationFile.getAbsolutePath() {code} method will be return a 
> path without file schema and the file system will be considered as a 
> distribute file system



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-20935) can't write flink configuration to tmp file and add it to local resource in yarn session mode

2021-01-13 Thread yuemeng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17264565#comment-17264565
 ] 

yuemeng commented on FLINK-20935:
-

[~trohrmann] [~fly_in_gis],yes,this is a good way to fixed this problem. I will 
fixed in this way

> can't write flink configuration to tmp file and add it to local resource in 
> yarn session mode
> -
>
> Key: FLINK-20935
> URL: https://issues.apache.org/jira/browse/FLINK-20935
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / YARN
>Affects Versions: 1.12.0, 1.13.0
>Reporter: yuemeng
>Assignee: yuemeng
>Priority: Major
>  Labels: pull-request-available
>
> In flink 1.12.0 or lastest version,when we execute command such as 
> bin/yarn-session.sh -n 20 -jm 9096 -nm 4096 -st,the depoy will be failed with 
> follow errors:
> {code}
> org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't 
> deploy Yarn session cluster
>   at 
> org.apache.flink.yarn.YarnClusterDescriptor.deploySessionCluster(YarnClusterDescriptor.java:411)
>   at 
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:498)
>   at 
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.lambda$main$4(FlinkYarnSessionCli.java:730)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:422)
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
>   at 
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>   at 
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:730)
> Caused by: java.io.FileNotFoundException: File does not exist: 
> /tmp/application_1573723355201_0036-flink-conf.yaml688141408443326132.tmp
>   at 
> org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1309)
> {code}
> when we called startAppMaster method in YarnClusterDescriptor,it will be try 
> to write flink configuration to tmp file and add it to local resource. but 
> the follow code will make the tmp file system as a distribute file system
> {code}
> // Upload the flink configuration
>   // write out configuration file
>   File tmpConfigurationFile = null;
>   try {
>   tmpConfigurationFile = File.createTempFile(appId + 
> "-flink-conf.yaml", null);
>   BootstrapTools.writeConfiguration(configuration, 
> tmpConfigurationFile);
>   String flinkConfigKey = "flink-conf.yaml";
>   fileUploader.registerSingleLocalResource(
>   flinkConfigKey,
>   new 
> Path(tmpConfigurationFile.getAbsolutePath()),
>   "",
>   LocalResourceType.FILE,
>   true,
>   true);
>   
> classPathBuilder.append("flink-conf.yaml").append(File.pathSeparator);
>   } finally {
>   if (tmpConfigurationFile != null && 
> !tmpConfigurationFile.delete()) {
>   LOG.warn("Fail to delete temporary file {}.", 
> tmpConfigurationFile.toPath());
>   }
>   }
> {code}
> {code} tmpConfigurationFile.getAbsolutePath() {code} method will be return a 
> path without file schema and the file system will be considered as a 
> distribute file system



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-20935) can't write flink configuration to tmp file and add it to local resource in yarn session mode

2021-01-13 Thread yuemeng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17264565#comment-17264565
 ] 

yuemeng edited comment on FLINK-20935 at 1/14/21, 3:02 AM:
---

[~trohrmann] [~fly_in_gis],yes,I agree that,I will check all the call of 
{{registerSingleLocalResource}} to make sure that the local path is qualified


was (Author: yuemeng):
[~trohrmann] [~fly_in_gis],yes,this is a good way to fixed this problem. I will 
fixed in this way

> can't write flink configuration to tmp file and add it to local resource in 
> yarn session mode
> -
>
> Key: FLINK-20935
> URL: https://issues.apache.org/jira/browse/FLINK-20935
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / YARN
>Affects Versions: 1.12.0, 1.13.0
>Reporter: yuemeng
>Assignee: yuemeng
>Priority: Major
>  Labels: pull-request-available
>
> In flink 1.12.0 or lastest version,when we execute command such as 
> bin/yarn-session.sh -n 20 -jm 9096 -nm 4096 -st,the depoy will be failed with 
> follow errors:
> {code}
> org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't 
> deploy Yarn session cluster
>   at 
> org.apache.flink.yarn.YarnClusterDescriptor.deploySessionCluster(YarnClusterDescriptor.java:411)
>   at 
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:498)
>   at 
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.lambda$main$4(FlinkYarnSessionCli.java:730)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:422)
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
>   at 
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>   at 
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:730)
> Caused by: java.io.FileNotFoundException: File does not exist: 
> /tmp/application_1573723355201_0036-flink-conf.yaml688141408443326132.tmp
>   at 
> org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1309)
> {code}
> when we called startAppMaster method in YarnClusterDescriptor,it will be try 
> to write flink configuration to tmp file and add it to local resource. but 
> the follow code will make the tmp file system as a distribute file system
> {code}
> // Upload the flink configuration
>   // write out configuration file
>   File tmpConfigurationFile = null;
>   try {
>   tmpConfigurationFile = File.createTempFile(appId + 
> "-flink-conf.yaml", null);
>   BootstrapTools.writeConfiguration(configuration, 
> tmpConfigurationFile);
>   String flinkConfigKey = "flink-conf.yaml";
>   fileUploader.registerSingleLocalResource(
>   flinkConfigKey,
>   new 
> Path(tmpConfigurationFile.getAbsolutePath()),
>   "",
>   LocalResourceType.FILE,
>   true,
>   true);
>   
> classPathBuilder.append("flink-conf.yaml").append(File.pathSeparator);
>   } finally {
>   if (tmpConfigurationFile != null && 
> !tmpConfigurationFile.delete()) {
>   LOG.warn("Fail to delete temporary file {}.", 
> tmpConfigurationFile.toPath());
>   }
>   }
> {code}
> {code} tmpConfigurationFile.getAbsolutePath() {code} method will be return a 
> path without file schema and the file system will be considered as a 
> distribute file system



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-20935) can't write flink configuration to tmp file and add it to local resource in yarn session mode

2021-01-17 Thread yuemeng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17264565#comment-17264565
 ] 

yuemeng edited comment on FLINK-20935 at 1/18/21, 2:44 AM:
---

[~trohrmann] [~fly_in_gis],yes,I agree that,I will check all the call of 
{{registerSingleLocalResource}} to make sure that the local path is qualified

 

[~trohrmann] [~fly_in_gis],i had checked all the place who called 
{{registerSingleLocalResource method.but for now,only this place can cause the 
problem,because of other local file always had a file schema(file:///)}}


was (Author: yuemeng):
[~trohrmann] [~fly_in_gis],yes,I agree that,I will check all the call of 
{{registerSingleLocalResource}} to make sure that the local path is qualified

> can't write flink configuration to tmp file and add it to local resource in 
> yarn session mode
> -
>
> Key: FLINK-20935
> URL: https://issues.apache.org/jira/browse/FLINK-20935
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / YARN
>Affects Versions: 1.12.0, 1.13.0
>Reporter: yuemeng
>Assignee: yuemeng
>Priority: Major
>  Labels: pull-request-available
>
> In flink 1.12.0 or lastest version,when we execute command such as 
> bin/yarn-session.sh -n 20 -jm 9096 -nm 4096 -st,the depoy will be failed with 
> follow errors:
> {code}
> org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't 
> deploy Yarn session cluster
>   at 
> org.apache.flink.yarn.YarnClusterDescriptor.deploySessionCluster(YarnClusterDescriptor.java:411)
>   at 
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:498)
>   at 
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.lambda$main$4(FlinkYarnSessionCli.java:730)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:422)
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
>   at 
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>   at 
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:730)
> Caused by: java.io.FileNotFoundException: File does not exist: 
> /tmp/application_1573723355201_0036-flink-conf.yaml688141408443326132.tmp
>   at 
> org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1309)
> {code}
> when we called startAppMaster method in YarnClusterDescriptor,it will be try 
> to write flink configuration to tmp file and add it to local resource. but 
> the follow code will make the tmp file system as a distribute file system
> {code}
> // Upload the flink configuration
>   // write out configuration file
>   File tmpConfigurationFile = null;
>   try {
>   tmpConfigurationFile = File.createTempFile(appId + 
> "-flink-conf.yaml", null);
>   BootstrapTools.writeConfiguration(configuration, 
> tmpConfigurationFile);
>   String flinkConfigKey = "flink-conf.yaml";
>   fileUploader.registerSingleLocalResource(
>   flinkConfigKey,
>   new 
> Path(tmpConfigurationFile.getAbsolutePath()),
>   "",
>   LocalResourceType.FILE,
>   true,
>   true);
>   
> classPathBuilder.append("flink-conf.yaml").append(File.pathSeparator);
>   } finally {
>   if (tmpConfigurationFile != null && 
> !tmpConfigurationFile.delete()) {
>   LOG.warn("Fail to delete temporary file {}.", 
> tmpConfigurationFile.toPath());
>   }
>   }
> {code}
> {code} tmpConfigurationFile.getAbsolutePath() {code} method will be return a 
> path without file schema and the file system will be considered as a 
> distribute file system



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-20935) can't write flink configuration to tmp file and add it to local resource in yarn session mode

2021-01-20 Thread yuemeng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17268446#comment-17268446
 ] 

yuemeng commented on FLINK-20935:
-

[~trohrmann] ,yes ,JerryTaoTao is myself.i will take a look at your comment.

[~fly_in_gis],if we create a local file, we can't treat it as a distribute file 
path. any operation like 
path.getFileSystem(yarnConfiguration).makeQualified(path) or 
fileSystem.makeQualified(resourcePath)
will only make it as a qualified distribute file path,but in fact it's just a 
local file to upload to hdfs and download to local disk by container

> can't write flink configuration to tmp file and add it to local resource in 
> yarn session mode
> -
>
> Key: FLINK-20935
> URL: https://issues.apache.org/jira/browse/FLINK-20935
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / YARN
>Affects Versions: 1.12.0, 1.13.0
>Reporter: yuemeng
>Assignee: yuemeng
>Priority: Major
>  Labels: pull-request-available
>
> In flink 1.12.0 or lastest version,when we execute command such as 
> bin/yarn-session.sh -n 20 -jm 9096 -nm 4096 -st,the depoy will be failed with 
> follow errors:
> {code}
> org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't 
> deploy Yarn session cluster
>   at 
> org.apache.flink.yarn.YarnClusterDescriptor.deploySessionCluster(YarnClusterDescriptor.java:411)
>   at 
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:498)
>   at 
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.lambda$main$4(FlinkYarnSessionCli.java:730)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:422)
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
>   at 
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>   at 
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:730)
> Caused by: java.io.FileNotFoundException: File does not exist: 
> /tmp/application_1573723355201_0036-flink-conf.yaml688141408443326132.tmp
>   at 
> org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1309)
> {code}
> when we called startAppMaster method in YarnClusterDescriptor,it will be try 
> to write flink configuration to tmp file and add it to local resource. but 
> the follow code will make the tmp file system as a distribute file system
> {code}
> // Upload the flink configuration
>   // write out configuration file
>   File tmpConfigurationFile = null;
>   try {
>   tmpConfigurationFile = File.createTempFile(appId + 
> "-flink-conf.yaml", null);
>   BootstrapTools.writeConfiguration(configuration, 
> tmpConfigurationFile);
>   String flinkConfigKey = "flink-conf.yaml";
>   fileUploader.registerSingleLocalResource(
>   flinkConfigKey,
>   new 
> Path(tmpConfigurationFile.getAbsolutePath()),
>   "",
>   LocalResourceType.FILE,
>   true,
>   true);
>   
> classPathBuilder.append("flink-conf.yaml").append(File.pathSeparator);
>   } finally {
>   if (tmpConfigurationFile != null && 
> !tmpConfigurationFile.delete()) {
>   LOG.warn("Fail to delete temporary file {}.", 
> tmpConfigurationFile.toPath());
>   }
>   }
> {code}
> {code} tmpConfigurationFile.getAbsolutePath() {code} method will be return a 
> path without file schema and the file system will be considered as a 
> distribute file system



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-20935) can't write flink configuration to tmp file and add it to local resource in yarn session mode

2021-01-21 Thread yuemeng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17269941#comment-17269941
 ] 

yuemeng commented on FLINK-20935:
-

[~trohrmann], i had updated the pr, can u review it. thanks

> can't write flink configuration to tmp file and add it to local resource in 
> yarn session mode
> -
>
> Key: FLINK-20935
> URL: https://issues.apache.org/jira/browse/FLINK-20935
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / YARN
>Affects Versions: 1.12.0, 1.13.0
>Reporter: yuemeng
>Assignee: yuemeng
>Priority: Major
>  Labels: pull-request-available
>
> In flink 1.12.0 or lastest version,when we execute command such as 
> bin/yarn-session.sh -n 20 -jm 9096 -nm 4096 -st,the depoy will be failed with 
> follow errors:
> {code}
> org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't 
> deploy Yarn session cluster
>   at 
> org.apache.flink.yarn.YarnClusterDescriptor.deploySessionCluster(YarnClusterDescriptor.java:411)
>   at 
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:498)
>   at 
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.lambda$main$4(FlinkYarnSessionCli.java:730)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:422)
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
>   at 
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>   at 
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:730)
> Caused by: java.io.FileNotFoundException: File does not exist: 
> /tmp/application_1573723355201_0036-flink-conf.yaml688141408443326132.tmp
>   at 
> org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1309)
> {code}
> when we called startAppMaster method in YarnClusterDescriptor,it will be try 
> to write flink configuration to tmp file and add it to local resource. but 
> the follow code will make the tmp file system as a distribute file system
> {code}
> // Upload the flink configuration
>   // write out configuration file
>   File tmpConfigurationFile = null;
>   try {
>   tmpConfigurationFile = File.createTempFile(appId + 
> "-flink-conf.yaml", null);
>   BootstrapTools.writeConfiguration(configuration, 
> tmpConfigurationFile);
>   String flinkConfigKey = "flink-conf.yaml";
>   fileUploader.registerSingleLocalResource(
>   flinkConfigKey,
>   new 
> Path(tmpConfigurationFile.getAbsolutePath()),
>   "",
>   LocalResourceType.FILE,
>   true,
>   true);
>   
> classPathBuilder.append("flink-conf.yaml").append(File.pathSeparator);
>   } finally {
>   if (tmpConfigurationFile != null && 
> !tmpConfigurationFile.delete()) {
>   LOG.warn("Fail to delete temporary file {}.", 
> tmpConfigurationFile.toPath());
>   }
>   }
> {code}
> {code} tmpConfigurationFile.getAbsolutePath() {code} method will be return a 
> path without file schema and the file system will be considered as a 
> distribute file system



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-29801) OperatorCoordinator need open the way to operate metricGroup interface

2022-10-30 Thread yuemeng (Jira)
yuemeng created FLINK-29801:
---

 Summary: OperatorCoordinator need open the way to operate 
metricGroup interface
 Key: FLINK-29801
 URL: https://issues.apache.org/jira/browse/FLINK-29801
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination
Reporter: yuemeng


Currently, We have no way to get metric group instances in OperatorCoordinator

In some cases, we may report some metric in OperatorCoordinator such as Flink 
hudi integrate scene, some meta will send to operator coordinator to commit to 
hdfs or hms

but we also need to report some metrics in operator coordinator for monitor 
purpose



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29801) OperatorCoordinator need open the way to operate metricGroup interface

2022-10-30 Thread yuemeng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29801?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17626419#comment-17626419
 ] 

yuemeng commented on FLINK-29801:
-

[~jark] [~rmetzger]  Can you review this improvement, because we really need it 
in Hudi to report some metric

> OperatorCoordinator need open the way to operate metricGroup interface
> --
>
> Key: FLINK-29801
> URL: https://issues.apache.org/jira/browse/FLINK-29801
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Reporter: yuemeng
>Priority: Major
>  Labels: pull-request-available
>
> Currently, We have no way to get metric group instances in OperatorCoordinator
> In some cases, we may report some metric in OperatorCoordinator such as Flink 
> hudi integrate scene, some meta will send to operator coordinator to commit 
> to hdfs or hms
> but we also need to report some metrics in operator coordinator for monitor 
> purpose



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29801) OperatorCoordinator need open the way to operate metricGroup interface

2022-10-30 Thread yuemeng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29801?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17626423#comment-17626423
 ] 

yuemeng commented on FLINK-29801:
-

[~danny0405] Can you review this improvement, we need this to report some 
metrics in stream write coordinator

> OperatorCoordinator need open the way to operate metricGroup interface
> --
>
> Key: FLINK-29801
> URL: https://issues.apache.org/jira/browse/FLINK-29801
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Reporter: yuemeng
>Priority: Major
>  Labels: pull-request-available
>
> Currently, We have no way to get metric group instances in OperatorCoordinator
> In some cases, we may report some metric in OperatorCoordinator such as Flink 
> hudi integrate scene, some meta will send to operator coordinator to commit 
> to hdfs or hms
> but we also need to report some metrics in operator coordinator for monitor 
> purpose



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29801) OperatorCoordinator need open the way to operate metricGroup interface

2022-11-17 Thread yuemeng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29801?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17635341#comment-17635341
 ] 

yuemeng commented on FLINK-29801:
-

[~ruanhang1993] [~zhuzh] 

Firstly, thanks a lot for your reply

The following answer to your 2 question
 # Each implementation of OperatorCoordinator will hold the metric group 
instance that can be registered for anything they wanted. this feature is not 
only for SourceCoordinator, eg: what about the other operator coordinators who 
don't have a context
 # Each operator coordinator's metric group is distinguished by metric group 
name

Secondly

This implement already worked in our product env, if we had or wanted a new 
design, we can push it forward and finish quickly because we real need it

> OperatorCoordinator need open the way to operate metricGroup interface
> --
>
> Key: FLINK-29801
> URL: https://issues.apache.org/jira/browse/FLINK-29801
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Metrics
>Reporter: yuemeng
>Assignee: yuemeng
>Priority: Major
>  Labels: pull-request-available
>
> Currently, We have no way to get metric group instances in OperatorCoordinator
> In some cases, we may report some metric in OperatorCoordinator such as Flink 
> hudi integrate scene, some meta will send to operator coordinator to commit 
> to hdfs or hms
> but we also need to report some metrics in operator coordinator for monitor 
> purpose



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29801) OperatorCoordinator need open the way to operate metricGroup interface

2022-11-20 Thread yuemeng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29801?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17636464#comment-17636464
 ] 

yuemeng commented on FLINK-29801:
-

Hi [~ruanhang1993], [~zhuzh] 

A further consideration is good, very happy to hear that and looking forward to 
this Flip, I will focus on any progress of this Flip and It would be even 
better if  I could help it finish, If any part needs me to join in, just let me 
know

 

 

> OperatorCoordinator need open the way to operate metricGroup interface
> --
>
> Key: FLINK-29801
> URL: https://issues.apache.org/jira/browse/FLINK-29801
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Metrics
>Reporter: yuemeng
>Assignee: yuemeng
>Priority: Major
>  Labels: pull-request-available
>
> Currently, We have no way to get metric group instances in OperatorCoordinator
> In some cases, we may report some metric in OperatorCoordinator such as Flink 
> hudi integrate scene, some meta will send to operator coordinator to commit 
> to hdfs or hms
> but we also need to report some metrics in operator coordinator for monitor 
> purpose



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29801) OperatorCoordinator need open the way to operate metricGroup interface

2022-11-23 Thread yuemeng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29801?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17638122#comment-17638122
 ] 

yuemeng commented on FLINK-29801:
-

[~ruanhang1993]   I will review this FLIP later and looking forward to working 
with you on this FLIP, can you tell me your Slack or Dingding Id

> OperatorCoordinator need open the way to operate metricGroup interface
> --
>
> Key: FLINK-29801
> URL: https://issues.apache.org/jira/browse/FLINK-29801
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Metrics
>Reporter: yuemeng
>Assignee: yuemeng
>Priority: Major
>  Labels: pull-request-available
>
> Currently, We have no way to get metric group instances in OperatorCoordinator
> In some cases, we may report some metric in OperatorCoordinator such as Flink 
> hudi integrate scene, some meta will send to operator coordinator to commit 
> to hdfs or hms
> but we also need to report some metrics in operator coordinator for monitor 
> purpose



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29801) OperatorCoordinator need open the way to operate metricGroup interface

2022-11-24 Thread yuemeng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29801?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17638202#comment-17638202
 ] 

yuemeng commented on FLINK-29801:
-

[~ruanhang1993]  Here is my email(272614...@qq.com)

> OperatorCoordinator need open the way to operate metricGroup interface
> --
>
> Key: FLINK-29801
> URL: https://issues.apache.org/jira/browse/FLINK-29801
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Metrics
>Reporter: yuemeng
>Assignee: yuemeng
>Priority: Major
>  Labels: pull-request-available
>
> Currently, We have no way to get metric group instances in OperatorCoordinator
> In some cases, we may report some metric in OperatorCoordinator such as Flink 
> hudi integrate scene, some meta will send to operator coordinator to commit 
> to hdfs or hms
> but we also need to report some metrics in operator coordinator for monitor 
> purpose



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-7145) Flink SQL API should support multiple parameters for UserDefinedAggFunction

2017-07-11 Thread yuemeng (JIRA)
yuemeng created FLINK-7145:
--

 Summary: Flink SQL API should support multiple parameters for 
UserDefinedAggFunction
 Key: FLINK-7145
 URL: https://issues.apache.org/jira/browse/FLINK-7145
 Project: Flink
  Issue Type: New Feature
  Components: Table API & SQL
Reporter: yuemeng
Priority: Critical


UDAF such as topK and some other udaf with bloom filter need more than one 
parameters ,we should make flink sql support this.
base on flink sql support DML and multiple parameters udaf,we can execute sql 
like:
{code}
CREATE TEMPORARY function 'TOPK' AS 
'com..aggregate.udaf.distinctUdaf.topk.ITopKUDAF';
INSERT INTO db_sink SELECT id, TOPK(price, 5, 'DESC') FROM kafka_source GROUP 
BY id;
{code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7146) FLINK SQLs support DDL

2017-07-11 Thread yuemeng (JIRA)
yuemeng created FLINK-7146:
--

 Summary: FLINK SQLs support DDL
 Key: FLINK-7146
 URL: https://issues.apache.org/jira/browse/FLINK-7146
 Project: Flink
  Issue Type: Sub-task
  Components: Table API & SQL
Reporter: yuemeng


For now,Flink SQL can't support DDL, we can only register a table by call 
registerTableInternal in TableEnvironment
we should support DDL for sql such as create a table or create function like:
{code}

CREATE TABLE kafka_source (
  id INT,
  price INT
) PROPERTIES (
  category = 'source',
  type = 'kafka',
  version = '0.9.0.1',
  separator = ',',
  topic = 'test',
  brokers = 'xx:9092',
  group_id = 'test'
);

CREATE TABLE db_sink (
  id INT,
  price DOUBLE
) PROPERTIES (
  category = 'sink',
  type = 'mysql',
  table_name = 'udaf_test',
  url = 'jdbc:mysql://127.0.0.1:3308/ds?useUnicode=true&characterEncoding=UTF8',
  username = 'ds_dev',
  password = 's]k51_(>R'
);

CREATE TEMPORARY function 'AVGUDAF' AS 
'com.x.server.codegen.aggregate.udaf.avg.IntegerAvgUDAF';

INSERT INTO db_sink SELECT id ,AVGUDAF(price) FROM kafka_source group by id


{code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (FLINK-7145) Flink SQL API should support multiple parameters for UserDefinedAggFunction

2017-07-11 Thread yuemeng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7145?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16081847#comment-16081847
 ] 

yuemeng edited comment on FLINK-7145 at 7/11/17 8:20 AM:
-

[~fhueske]
thanks a lot for reply.
some days ago, when i try to make flink udaf to support multiple parameters, i 
match a problem
suppose we registered more than one function with same name topK to schema.
1)two function as follow with same name
Top10(INT,INT)
Top10(INT,DOUBLE)
2) execute sql
{code}
SELECT TUMBLE_END(rowtime, INTERVAL '10' MINUTE), top10(cnt, page) AS topPages
FROM pageVisits 
GROUP BY TUMBLE(rowtime, INTERVAL '10' MINUTE);
{code}
suppose cnt is INT,page is Double,for now calcite can't  find the exact 
function.
3) you can check this pr
[https://issues.apache.org/jira/browse/CALCITE-1833]



was (Author: yuemeng):
[~fhueske]
thanks a lot for reply.
some days ago, when i try to make flink udaf to support multiple parameters, i 
match a problem
suppose we registered more than one function with same name topK to schema.
1)two function as follow with same name
Top10(INT,INT)
Top10(INT,DOUBLE)
2) execute sql
{code}
SELECT TUMBLE_END(rowtime, INTERVAL '10' MINUTE), top10(cnt, page) AS topPages
FROM pageVisits 
GROUP BY TUMBLE(rowtime, INTERVAL '10' MINUTE);
{code}
suppose cnt is INT,page is Double,for now calcite can't  find the exact 
function.
3) you can check this pr
[https://issues.apache.org/jira/browse/CALCITE-1833]


> Flink SQL API should support multiple parameters for UserDefinedAggFunction
> ---
>
> Key: FLINK-7145
> URL: https://issues.apache.org/jira/browse/FLINK-7145
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: yuemeng
>Priority: Critical
>
> UDAF such as topK and some other udaf with bloom filter need more than one 
> parameters ,we should make flink sql support this.
> base on flink sql support DML and multiple parameters udaf,we can execute sql 
> like:
> {code}
> CREATE TEMPORARY function 'TOPK' AS 
> 'com..aggregate.udaf.distinctUdaf.topk.ITopKUDAF';
> INSERT INTO db_sink SELECT id, TOPK(price, 5, 'DESC') FROM kafka_source GROUP 
> BY id;
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7148) Flink SQL API support DDL

2017-07-11 Thread yuemeng (JIRA)
yuemeng created FLINK-7148:
--

 Summary: Flink SQL API support  DDL
 Key: FLINK-7148
 URL: https://issues.apache.org/jira/browse/FLINK-7148
 Project: Flink
  Issue Type: Bug
  Components: Table API & SQL
Reporter: yuemeng


For now,Flink SQL can't support DDL operation,user can only register a table by 
call registerTableInternal in TableEnvironment. we should support DDL such as  
create table or create function like:
{code}
CREATE TABLE kafka_source (
  id INT,
  price INT
) PROPERTIES (
  category = 'source',
  type = 'kafka',
  version = '0.9.0.1',
  separator = ',',
  topic = 'test',
  brokers = ':9092',
  group_id = 'test'
);

CREATE TABLE db_sink (
  id INT,
  price DOUBLE
) PROPERTIES (
  category = 'sink',
  type = 'mysql',
  table_name = 'udaf_test',
  url = 'jdbc:mysql://127.0.0.1:3308/ds?useUnicode=true&characterEncoding=UTF8',
  username = 'ds_dev',
  password = 's]k51_(>R'
);

CREATE TEMPORARY function 'AVGUDAF' AS 
'com..server.codegen.aggregate.udaf.avg.IntegerAvgUDAF';
INSERT INTO db_sink SELECT id ,AVGUDAF(price) FROM kafka_source group by id

{code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7148) Flink SQL API support DDL

2017-07-11 Thread yuemeng (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7148?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuemeng updated FLINK-7148:
---
Priority: Critical  (was: Major)

> Flink SQL API support  DDL
> --
>
> Key: FLINK-7148
> URL: https://issues.apache.org/jira/browse/FLINK-7148
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: yuemeng
>Priority: Critical
>
> For now,Flink SQL can't support DDL operation,user can only register a table 
> by call registerTableInternal in TableEnvironment. we should support DDL such 
> as  create table or create function like:
> {code}
> CREATE TABLE kafka_source (
>   id INT,
>   price INT
> ) PROPERTIES (
>   category = 'source',
>   type = 'kafka',
>   version = '0.9.0.1',
>   separator = ',',
>   topic = 'test',
>   brokers = ':9092',
>   group_id = 'test'
> );
> CREATE TABLE db_sink (
>   id INT,
>   price DOUBLE
> ) PROPERTIES (
>   category = 'sink',
>   type = 'mysql',
>   table_name = 'udaf_test',
>   url = 
> 'jdbc:mysql://127.0.0.1:3308/ds?useUnicode=true&characterEncoding=UTF8',
>   username = 'ds_dev',
>   password = 's]k51_(>R'
> );
> CREATE TEMPORARY function 'AVGUDAF' AS 
> 'com..server.codegen.aggregate.udaf.avg.IntegerAvgUDAF';
> INSERT INTO db_sink SELECT id ,AVGUDAF(price) FROM kafka_source group by id
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7145) Flink SQL API should support multiple parameters for UserDefinedAggFunction

2017-07-11 Thread yuemeng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7145?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16081847#comment-16081847
 ] 

yuemeng commented on FLINK-7145:


[~fhueske]
thanks a lot for reply.
some days ago, when i try to make flink udaf to support multiple parameters, i 
match a problem
suppose we registered more than one function with same name topK to schema.
1)two function as follow with same name
Top10(INT,INT)
Top10(INT,DOUBLE)
2) execute sql
{code}
SELECT TUMBLE_END(rowtime, INTERVAL '10' MINUTE), top10(cnt, page) AS topPages
FROM pageVisits 
GROUP BY TUMBLE(rowtime, INTERVAL '10' MINUTE);
{code}
suppose cnt is INT,page is Double,for now calcite can't  find the exact 
function.
3) you can check this pr
[https://issues.apache.org/jira/browse/CALCITE-1833]


> Flink SQL API should support multiple parameters for UserDefinedAggFunction
> ---
>
> Key: FLINK-7145
> URL: https://issues.apache.org/jira/browse/FLINK-7145
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: yuemeng
>Priority: Critical
>
> UDAF such as topK and some other udaf with bloom filter need more than one 
> parameters ,we should make flink sql support this.
> base on flink sql support DML and multiple parameters udaf,we can execute sql 
> like:
> {code}
> CREATE TEMPORARY function 'TOPK' AS 
> 'com..aggregate.udaf.distinctUdaf.topk.ITopKUDAF';
> INSERT INTO db_sink SELECT id, TOPK(price, 5, 'DESC') FROM kafka_source GROUP 
> BY id;
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7148) Flink SQL API support DDL

2017-07-11 Thread yuemeng (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7148?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuemeng updated FLINK-7148:
---
Issue Type: New Feature  (was: Bug)

> Flink SQL API support  DDL
> --
>
> Key: FLINK-7148
> URL: https://issues.apache.org/jira/browse/FLINK-7148
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: yuemeng
>
> For now,Flink SQL can't support DDL operation,user can only register a table 
> by call registerTableInternal in TableEnvironment. we should support DDL such 
> as  create table or create function like:
> {code}
> CREATE TABLE kafka_source (
>   id INT,
>   price INT
> ) PROPERTIES (
>   category = 'source',
>   type = 'kafka',
>   version = '0.9.0.1',
>   separator = ',',
>   topic = 'test',
>   brokers = ':9092',
>   group_id = 'test'
> );
> CREATE TABLE db_sink (
>   id INT,
>   price DOUBLE
> ) PROPERTIES (
>   category = 'sink',
>   type = 'mysql',
>   table_name = 'udaf_test',
>   url = 
> 'jdbc:mysql://127.0.0.1:3308/ds?useUnicode=true&characterEncoding=UTF8',
>   username = 'ds_dev',
>   password = 's]k51_(>R'
> );
> CREATE TEMPORARY function 'AVGUDAF' AS 
> 'com..server.codegen.aggregate.udaf.avg.IntegerAvgUDAF';
> INSERT INTO db_sink SELECT id ,AVGUDAF(price) FROM kafka_source group by id
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7145) Flink SQL API should support multiple parameters for UserDefinedAggFunction

2017-07-11 Thread yuemeng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7145?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16081877#comment-16081877
 ] 

yuemeng commented on FLINK-7145:


[~fhueske]
ok ,thanks for your reply,
I see, I will close this iusse .
For flink , update the calcite version will be solve this problem
I will test this new feature in flink


> Flink SQL API should support multiple parameters for UserDefinedAggFunction
> ---
>
> Key: FLINK-7145
> URL: https://issues.apache.org/jira/browse/FLINK-7145
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: yuemeng
>
> UDAF such as topK and some other udaf with bloom filter need more than one 
> parameters ,we should make flink sql support this.
> base on flink sql support DML and multiple parameters udaf,we can execute sql 
> like:
> {code}
> CREATE TEMPORARY function 'TOPK' AS 
> 'com..aggregate.udaf.distinctUdaf.topk.ITopKUDAF';
> INSERT INTO db_sink SELECT id, TOPK(price, 5, 'DESC') FROM kafka_source GROUP 
> BY id;
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7148) Flink SQL API support DDL

2017-07-11 Thread yuemeng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7148?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16081862#comment-16081862
 ] 

yuemeng commented on FLINK-7148:


[~fhueske]
can you check this feature?
If so, can you assign this issue to me?

> Flink SQL API support  DDL
> --
>
> Key: FLINK-7148
> URL: https://issues.apache.org/jira/browse/FLINK-7148
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: yuemeng
>Priority: Critical
>
> For now,Flink SQL can't support DDL operation,user can only register a table 
> by call registerTableInternal in TableEnvironment. we should support DDL such 
> as  create table or create function like:
> {code}
> CREATE TABLE kafka_source (
>   id INT,
>   price INT
> ) PROPERTIES (
>   category = 'source',
>   type = 'kafka',
>   version = '0.9.0.1',
>   separator = ',',
>   topic = 'test',
>   brokers = ':9092',
>   group_id = 'test'
> );
> CREATE TABLE db_sink (
>   id INT,
>   price DOUBLE
> ) PROPERTIES (
>   category = 'sink',
>   type = 'mysql',
>   table_name = 'udaf_test',
>   url = 
> 'jdbc:mysql://127.0.0.1:3308/ds?useUnicode=true&characterEncoding=UTF8',
>   username = 'ds_dev',
>   password = 's]k51_(>R'
> );
> CREATE TEMPORARY function 'AVGUDAF' AS 
> 'com..server.codegen.aggregate.udaf.avg.IntegerAvgUDAF';
> INSERT INTO db_sink SELECT id ,AVGUDAF(price) FROM kafka_source group by id
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (FLINK-7148) Flink SQL API support DDL

2017-07-11 Thread yuemeng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7148?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16081965#comment-16081965
 ] 

yuemeng edited comment on FLINK-7148 at 7/11/17 9:29 AM:
-

[~jark]
[~fhueske]
this is duplicate with FLINK-6962,i will close this issue.


was (Author: yuemeng):
[~jark]
[~fhueske]
this is duplicate with FLINK-6962,i wil close this issue.

> Flink SQL API support  DDL
> --
>
> Key: FLINK-7148
> URL: https://issues.apache.org/jira/browse/FLINK-7148
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: yuemeng
>Priority: Critical
>
> For now,Flink SQL can't support DDL operation,user can only register a table 
> by call registerTableInternal in TableEnvironment. we should support DDL such 
> as  create table or create function like:
> {code}
> CREATE TABLE kafka_source (
>   id INT,
>   price INT
> ) PROPERTIES (
>   category = 'source',
>   type = 'kafka',
>   version = '0.9.0.1',
>   separator = ',',
>   topic = 'test',
>   brokers = ':9092',
>   group_id = 'test'
> );
> CREATE TABLE db_sink (
>   id INT,
>   price DOUBLE
> ) PROPERTIES (
>   category = 'sink',
>   type = 'mysql',
>   table_name = 'udaf_test',
>   url = 
> 'jdbc:mysql://127.0.0.1:3308/ds?useUnicode=true&characterEncoding=UTF8',
>   username = 'ds_dev',
>   password = 's]k51_(>R'
> );
> CREATE TEMPORARY function 'AVGUDAF' AS 
> 'com..server.codegen.aggregate.udaf.avg.IntegerAvgUDAF';
> INSERT INTO db_sink SELECT id ,AVGUDAF(price) FROM kafka_source group by id
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7148) Flink SQL API support DDL

2017-07-11 Thread yuemeng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7148?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16081965#comment-16081965
 ] 

yuemeng commented on FLINK-7148:


[~jark]
[~fhueske]
this is duplicate with FLINK-6962,i wil close this issue.

> Flink SQL API support  DDL
> --
>
> Key: FLINK-7148
> URL: https://issues.apache.org/jira/browse/FLINK-7148
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: yuemeng
>Priority: Critical
>
> For now,Flink SQL can't support DDL operation,user can only register a table 
> by call registerTableInternal in TableEnvironment. we should support DDL such 
> as  create table or create function like:
> {code}
> CREATE TABLE kafka_source (
>   id INT,
>   price INT
> ) PROPERTIES (
>   category = 'source',
>   type = 'kafka',
>   version = '0.9.0.1',
>   separator = ',',
>   topic = 'test',
>   brokers = ':9092',
>   group_id = 'test'
> );
> CREATE TABLE db_sink (
>   id INT,
>   price DOUBLE
> ) PROPERTIES (
>   category = 'sink',
>   type = 'mysql',
>   table_name = 'udaf_test',
>   url = 
> 'jdbc:mysql://127.0.0.1:3308/ds?useUnicode=true&characterEncoding=UTF8',
>   username = 'ds_dev',
>   password = 's]k51_(>R'
> );
> CREATE TEMPORARY function 'AVGUDAF' AS 
> 'com..server.codegen.aggregate.udaf.avg.IntegerAvgUDAF';
> INSERT INTO db_sink SELECT id ,AVGUDAF(price) FROM kafka_source group by id
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-7148) Flink SQL API support DDL

2017-07-11 Thread yuemeng (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7148?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuemeng closed FLINK-7148.
--
Resolution: Duplicate

> Flink SQL API support  DDL
> --
>
> Key: FLINK-7148
> URL: https://issues.apache.org/jira/browse/FLINK-7148
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: yuemeng
>Priority: Critical
>
> For now,Flink SQL can't support DDL operation,user can only register a table 
> by call registerTableInternal in TableEnvironment. we should support DDL such 
> as  create table or create function like:
> {code}
> CREATE TABLE kafka_source (
>   id INT,
>   price INT
> ) PROPERTIES (
>   category = 'source',
>   type = 'kafka',
>   version = '0.9.0.1',
>   separator = ',',
>   topic = 'test',
>   brokers = ':9092',
>   group_id = 'test'
> );
> CREATE TABLE db_sink (
>   id INT,
>   price DOUBLE
> ) PROPERTIES (
>   category = 'sink',
>   type = 'mysql',
>   table_name = 'udaf_test',
>   url = 
> 'jdbc:mysql://127.0.0.1:3308/ds?useUnicode=true&characterEncoding=UTF8',
>   username = 'ds_dev',
>   password = 's]k51_(>R'
> );
> CREATE TEMPORARY function 'AVGUDAF' AS 
> 'com..server.codegen.aggregate.udaf.avg.IntegerAvgUDAF';
> INSERT INTO db_sink SELECT id ,AVGUDAF(price) FROM kafka_source group by id
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7148) Flink SQL API support DDL

2017-07-11 Thread yuemeng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7148?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16082009#comment-16082009
 ] 

yuemeng commented on FLINK-7148:


[~fhueske]
thanks ,i will do that.

> Flink SQL API support  DDL
> --
>
> Key: FLINK-7148
> URL: https://issues.apache.org/jira/browse/FLINK-7148
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: yuemeng
>Priority: Critical
>
> For now,Flink SQL can't support DDL operation,user can only register a table 
> by call registerTableInternal in TableEnvironment. we should support DDL such 
> as  create table or create function like:
> {code}
> CREATE TABLE kafka_source (
>   id INT,
>   price INT
> ) PROPERTIES (
>   category = 'source',
>   type = 'kafka',
>   version = '0.9.0.1',
>   separator = ',',
>   topic = 'test',
>   brokers = ':9092',
>   group_id = 'test'
> );
> CREATE TABLE db_sink (
>   id INT,
>   price DOUBLE
> ) PROPERTIES (
>   category = 'sink',
>   type = 'mysql',
>   table_name = 'udaf_test',
>   url = 
> 'jdbc:mysql://127.0.0.1:3308/ds?useUnicode=true&characterEncoding=UTF8',
>   username = 'ds_dev',
>   password = 's]k51_(>R'
> );
> CREATE TEMPORARY function 'AVGUDAF' AS 
> 'com..server.codegen.aggregate.udaf.avg.IntegerAvgUDAF';
> INSERT INTO db_sink SELECT id ,AVGUDAF(price) FROM kafka_source group by id
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7151) FLINK SQL support create temporary function and table

2017-07-11 Thread yuemeng (JIRA)
yuemeng created FLINK-7151:
--

 Summary: FLINK SQL support create temporary function and table
 Key: FLINK-7151
 URL: https://issues.apache.org/jira/browse/FLINK-7151
 Project: Flink
  Issue Type: New Feature
Reporter: yuemeng


Based on create temporary function and table.we can register a udf,udaf,udtf 
use sql:
{code}
CREATE TEMPORARY function 'TOPK' AS 
'com..aggregate.udaf.distinctUdaf.topk.ITopKUDAF';
INSERT INTO db_sink SELECT id, TOPK(price, 5, 'DESC') FROM kafka_source GROUP 
BY id;
{code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7145) Flink SQL API should support multiple parameters for UserDefinedAggFunction

2017-07-11 Thread yuemeng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7145?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16082016#comment-16082016
 ] 

yuemeng commented on FLINK-7145:


[~jark]
TopN with different accumulate(...),there are some question:
1)which accumulate method should be  be called in agg function since agg 
function only call accumulate ,no other operations to find the exact  
accumulate method
2) suppose UDAF such as topN have multiple parameters with different type,How 
to ensure that the correct function can be matched

> Flink SQL API should support multiple parameters for UserDefinedAggFunction
> ---
>
> Key: FLINK-7145
> URL: https://issues.apache.org/jira/browse/FLINK-7145
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: yuemeng
>
> UDAF such as topK and some other udaf with bloom filter need more than one 
> parameters ,we should make flink sql support this.
> base on flink sql support DML and multiple parameters udaf,we can execute sql 
> like:
> {code}
> CREATE TEMPORARY function 'TOPK' AS 
> 'com..aggregate.udaf.distinctUdaf.topk.ITopKUDAF';
> INSERT INTO db_sink SELECT id, TOPK(price, 5, 'DESC') FROM kafka_source GROUP 
> BY id;
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (FLINK-7145) Flink SQL API should support multiple parameters for UserDefinedAggFunction

2017-07-11 Thread yuemeng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7145?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16082016#comment-16082016
 ] 

yuemeng edited comment on FLINK-7145 at 7/11/17 10:34 AM:
--

[~jark]
[~fhueske]
TopN with different accumulate(...),there are some question:
1)which accumulate method should be  be called in agg function since agg 
function only call accumulate ,no other operations to find the exact  
accumulate method
2) suppose UDAF such as topN have multiple parameters with different type,How 
to ensure that the correct function can be matched


was (Author: yuemeng):
[~jark]
TopN with different accumulate(...),there are some question:
1)which accumulate method should be  be called in agg function since agg 
function only call accumulate ,no other operations to find the exact  
accumulate method
2) suppose UDAF such as topN have multiple parameters with different type,How 
to ensure that the correct function can be matched

> Flink SQL API should support multiple parameters for UserDefinedAggFunction
> ---
>
> Key: FLINK-7145
> URL: https://issues.apache.org/jira/browse/FLINK-7145
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: yuemeng
>
> UDAF such as topK and some other udaf with bloom filter need more than one 
> parameters ,we should make flink sql support this.
> base on flink sql support DML and multiple parameters udaf,we can execute sql 
> like:
> {code}
> CREATE TEMPORARY function 'TOPK' AS 
> 'com..aggregate.udaf.distinctUdaf.topk.ITopKUDAF';
> INSERT INTO db_sink SELECT id, TOPK(price, 5, 'DESC') FROM kafka_source GROUP 
> BY id;
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7145) Flink SQL API should support multiple parameters for UserDefinedAggFunction

2017-07-11 Thread yuemeng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7145?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16082075#comment-16082075
 ] 

yuemeng commented on FLINK-7145:


[~fhueske]
can you tell me where i can run follow sql
{code}
SELECT TUMBLE_END(rowtime, INTERVAL '10' MINUTE), top10(cnt, page) AS topPages
FROM pageVisits 
GROUP BY TUMBLE(rowtime, INTERVAL '10' MINUTE)
{code}
i can't find any ut with this sql in current master
is cnt and page field with same type?


> Flink SQL API should support multiple parameters for UserDefinedAggFunction
> ---
>
> Key: FLINK-7145
> URL: https://issues.apache.org/jira/browse/FLINK-7145
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: yuemeng
>
> UDAF such as topK and some other udaf with bloom filter need more than one 
> parameters ,we should make flink sql support this.
> base on flink sql support DML and multiple parameters udaf,we can execute sql 
> like:
> {code}
> CREATE TEMPORARY function 'TOPK' AS 
> 'com..aggregate.udaf.distinctUdaf.topk.ITopKUDAF';
> INSERT INTO db_sink SELECT id, TOPK(price, 5, 'DESC') FROM kafka_source GROUP 
> BY id;
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-7145) Flink SQL API should support multiple parameters for UserDefinedAggFunction

2017-07-11 Thread yuemeng (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7145?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuemeng closed FLINK-7145.
--
Resolution: Duplicate

> Flink SQL API should support multiple parameters for UserDefinedAggFunction
> ---
>
> Key: FLINK-7145
> URL: https://issues.apache.org/jira/browse/FLINK-7145
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: yuemeng
>
> UDAF such as topK and some other udaf with bloom filter need more than one 
> parameters ,we should make flink sql support this.
> base on flink sql support DML and multiple parameters udaf,we can execute sql 
> like:
> {code}
> CREATE TEMPORARY function 'TOPK' AS 
> 'com..aggregate.udaf.distinctUdaf.topk.ITopKUDAF';
> INSERT INTO db_sink SELECT id, TOPK(price, 5, 'DESC') FROM kafka_source GROUP 
> BY id;
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7151) FLINK SQL support create temporary function and table

2017-07-11 Thread yuemeng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16083380#comment-16083380
 ] 

yuemeng commented on FLINK-7151:


[~fhueske]
thanks for your reply.
I will split this issue with two different specific title.thank you
[~jark]
user can use temporary function to create temporary udf or udaf just like hive 
for their own job.
suppose user want to use a special udf(not buitin function) for their own 
special job.what we do?
maybe we can create a temporary  function for this job.
{code}
Hive:
create temporary function mytest as 'test.udf.ToLowerCase'; 
select mytest(test.name) from test; 
{code}
temporary table is conceptual table,not a physical table,we can use it for 
streaming etl logical
{code}
CREATE TABLE kafka_source (
  id INT,
  name VARCHAR(100),
  price INT,
  comment VARCHAR(100)
) PROPERTIES (
  category = 'source',
  type = 'kafka',
  version = '0.9.0.1',
  separator = ',',
  topic = 'test',
  brokers = g:9092',
  group_id = 'test'
);

CREATE TABLE tmp (
  id INT,
  name VARCHAR(100),
  price INT
) PROPERTIES (
  category = 'tmp'
);


CREATE TABLE db_sink (
  id INT,
  name VARCHAR(100),
  price INT
) PROPERTIES (
  category = 'sink',
  type = 'mysql',
  table_name = 'test',
  url = 'jdbc:mysql://127.0.0.1:3307/ds?useUnicode=true&characterEncoding=UTF8',
  username = 'ds_dev',
  password = 's]k51_(>R'
);

-- INSERT INTO db_sink SELECT id, name,price FROM kafka_source;
--INSERT INTO db_sink SELECT id, name, sum(price) FROM kafka_source GROUP BY 
id,name HAVING sum(price) > 1 AND id < 10;
INSERT INTO tmp SELECT id,name,price + 1.0 FROM kafka_source;
INSERT into db_sink SELECT id,name,price + 1 as price FROM tmp;


{code}



> FLINK SQL support create temporary function and table
> -
>
> Key: FLINK-7151
> URL: https://issues.apache.org/jira/browse/FLINK-7151
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: yuemeng
>
> Based on create temporary function and table.we can register a udf,udaf,udtf 
> use sql:
> {code}
> CREATE TEMPORARY function 'TOPK' AS 
> 'com..aggregate.udaf.distinctUdaf.topk.ITopKUDAF';
> INSERT INTO db_sink SELECT id, TOPK(price, 5, 'DESC') FROM kafka_source GROUP 
> BY id;
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7151) FLINK SQL support create temporary function and table

2017-07-12 Thread yuemeng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16083845#comment-16083845
 ] 

yuemeng commented on FLINK-7151:


[~jark]
yes,extend the ExternalCatalog to support register external functions is a very 
good way ,i think,maybe we can register a function use sql ddl instead of api
here some questions:
1) all buitin function in flink will be register to shcema each job cycle.it's 
a ugly way
2) agg function such as sum,avg should matched by calcite,we can directly get 
the exact function by aggCall instead of hard code


> FLINK SQL support create temporary function and table
> -
>
> Key: FLINK-7151
> URL: https://issues.apache.org/jira/browse/FLINK-7151
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: yuemeng
>
> Based on create temporary function and table.we can register a udf,udaf,udtf 
> use sql:
> {code}
> CREATE TEMPORARY function 'TOPK' AS 
> 'com..aggregate.udaf.distinctUdaf.topk.ITopKUDAF';
> INSERT INTO db_sink SELECT id, TOPK(price, 5, 'DESC') FROM kafka_source GROUP 
> BY id;
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7151) FLINK SQL support create temporary function and table

2017-07-16 Thread yuemeng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16089346#comment-16089346
 ] 

yuemeng commented on FLINK-7151:


[~fhueske]
can you assgin this issue to me ,i want to implement this  for flink
thanks a lot

> FLINK SQL support create temporary function and table
> -
>
> Key: FLINK-7151
> URL: https://issues.apache.org/jira/browse/FLINK-7151
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: yuemeng
>
> Based on create temporary function and table.we can register a udf,udaf,udtf 
> use sql:
> {code}
> CREATE TEMPORARY function 'TOPK' AS 
> 'com..aggregate.udaf.distinctUdaf.topk.ITopKUDAF';
> INSERT INTO db_sink SELECT id, TOPK(price, 5, 'DESC') FROM kafka_source GROUP 
> BY id;
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)