[jira] [Assigned] (FLINK-7076) Implement container release to support dynamic scaling
[ https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yuemeng reassigned FLINK-7076: -- Assignee: yuemeng > Implement container release to support dynamic scaling > -- > > Key: FLINK-7076 > URL: https://issues.apache.org/jira/browse/FLINK-7076 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Reporter: Till Rohrmann >Assignee: yuemeng > Labels: flip-6 > > In order to support dynamic scaling, the {{YarnResourceManager}} has to be > able to dynamically free containers. We have to implement the > {{YarnResourceManager#stopWorker}} method. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling
[ https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16112117#comment-16112117 ] yuemeng commented on FLINK-7076: [~till.rohrmann] To support dynamic scaling,for my understand. we need to know when the new container to be allocate from RM,and when to release container which marked as free. this issue to solve how to define a free container(TM) and release it from RM. > Implement container release to support dynamic scaling > -- > > Key: FLINK-7076 > URL: https://issues.apache.org/jira/browse/FLINK-7076 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Reporter: Till Rohrmann >Assignee: yuemeng > Labels: flip-6 > > In order to support dynamic scaling, the {{YarnResourceManager}} has to be > able to dynamically free containers. We have to implement the > {{YarnResourceManager#stopWorker}} method. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (FLINK-7309) NullPointerException in CodeGenUtils.timePointToInternalCode() generated code
[ https://issues.apache.org/jira/browse/FLINK-7309?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yuemeng reassigned FLINK-7309: -- Assignee: Sihua Zhou > NullPointerException in CodeGenUtils.timePointToInternalCode() generated code > - > > Key: FLINK-7309 > URL: https://issues.apache.org/jira/browse/FLINK-7309 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.3.1 >Reporter: Liangliang Chen >Assignee: Sihua Zhou >Priority: Critical > > The code generated by CodeGenUtils.timePointToInternalCode() will cause a > NullPointerException when SQL table field type is `TIMESTAMP` and the field > value is `null`. > Example for reproduce: > {code} > object StreamSQLExample { > def main(args: Array[String]): Unit = { > val env = StreamExecutionEnvironment.getExecutionEnvironment > val tEnv = TableEnvironment.getTableEnvironment(env) > // null field value > val orderA: DataStream[Order] = env.fromCollection(Seq( > Order(null, "beer", 3))) > > tEnv.registerDataStream("OrderA", orderA, 'ts, 'product, 'amount) > val result = tEnv.sql("SELECT * FROM OrderA") > result.toAppendStream[Order].print() > > env.execute() > } > case class Order(ts: Timestamp, product: String, amount: Int) > } > {code} > In the above example, timePointToInternalCode() will generated some > statements like this: > {code} > ... > long result$1 = > org.apache.calcite.runtime.SqlFunctions.toLong((java.sql.Timestamp) in1.ts()); > boolean isNull$2 = (java.sql.Timestamp) in1.ts() == null; > ... > {code} > so, the NPE will happen when in1.ts() is null. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (FLINK-7309) NullPointerException in CodeGenUtils.timePointToInternalCode() generated code
[ https://issues.apache.org/jira/browse/FLINK-7309?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yuemeng reassigned FLINK-7309: -- Assignee: yuemeng (was: Sihua Zhou) > NullPointerException in CodeGenUtils.timePointToInternalCode() generated code > - > > Key: FLINK-7309 > URL: https://issues.apache.org/jira/browse/FLINK-7309 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.3.1 >Reporter: Liangliang Chen >Assignee: yuemeng >Priority: Critical > > The code generated by CodeGenUtils.timePointToInternalCode() will cause a > NullPointerException when SQL table field type is `TIMESTAMP` and the field > value is `null`. > Example for reproduce: > {code} > object StreamSQLExample { > def main(args: Array[String]): Unit = { > val env = StreamExecutionEnvironment.getExecutionEnvironment > val tEnv = TableEnvironment.getTableEnvironment(env) > // null field value > val orderA: DataStream[Order] = env.fromCollection(Seq( > Order(null, "beer", 3))) > > tEnv.registerDataStream("OrderA", orderA, 'ts, 'product, 'amount) > val result = tEnv.sql("SELECT * FROM OrderA") > result.toAppendStream[Order].print() > > env.execute() > } > case class Order(ts: Timestamp, product: String, amount: Int) > } > {code} > In the above example, timePointToInternalCode() will generated some > statements like this: > {code} > ... > long result$1 = > org.apache.calcite.runtime.SqlFunctions.toLong((java.sql.Timestamp) in1.ts()); > boolean isNull$2 = (java.sql.Timestamp) in1.ts() == null; > ... > {code} > so, the NPE will happen when in1.ts() is null. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (FLINK-7309) NullPointerException in CodeGenUtils.timePointToInternalCode() generated code
[ https://issues.apache.org/jira/browse/FLINK-7309?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yuemeng reassigned FLINK-7309: -- Assignee: (was: yuemeng) > NullPointerException in CodeGenUtils.timePointToInternalCode() generated code > - > > Key: FLINK-7309 > URL: https://issues.apache.org/jira/browse/FLINK-7309 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.3.1 >Reporter: Liangliang Chen >Priority: Critical > > The code generated by CodeGenUtils.timePointToInternalCode() will cause a > NullPointerException when SQL table field type is `TIMESTAMP` and the field > value is `null`. > Example for reproduce: > {code} > object StreamSQLExample { > def main(args: Array[String]): Unit = { > val env = StreamExecutionEnvironment.getExecutionEnvironment > val tEnv = TableEnvironment.getTableEnvironment(env) > // null field value > val orderA: DataStream[Order] = env.fromCollection(Seq( > Order(null, "beer", 3))) > > tEnv.registerDataStream("OrderA", orderA, 'ts, 'product, 'amount) > val result = tEnv.sql("SELECT * FROM OrderA") > result.toAppendStream[Order].print() > > env.execute() > } > case class Order(ts: Timestamp, product: String, amount: Int) > } > {code} > In the above example, timePointToInternalCode() will generated some > statements like this: > {code} > ... > long result$1 = > org.apache.calcite.runtime.SqlFunctions.toLong((java.sql.Timestamp) in1.ts()); > boolean isNull$2 = (java.sql.Timestamp) in1.ts() == null; > ... > {code} > so, the NPE will happen when in1.ts() is null. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-11057) where in grammar will cause stream inner join loigcal
yuemeng created FLINK-11057: --- Summary: where in grammar will cause stream inner join loigcal Key: FLINK-11057 URL: https://issues.apache.org/jira/browse/FLINK-11057 Project: Flink Issue Type: Bug Reporter: yuemeng {code} select action , count ( * ) as cnt from user_action where action in ( 'view' , 'impress' , 'sysaction' , 'commentimpress' , 'play' , 'click' , 'page' , 'abtestreqsuss' , 'bannerimpress' , 'abtestserver' , 'active' , 'search' , 'activeclient' , 'like' , 'zan' , 'adclick' , 'login' , 'comment' , 'subscribeartist' , 'subscribevideo' , 'subscribedjradio' , 'share' , 'private' , 'register' , 'downloadall' , 'forward' , 'newdj' , 'recommendimpress' , 'hotkeywordimpress' , 'nogetad' , 'add' , 'subscribe' , 'follow' , 'new' ) group by tumble ( proctime , interval '60' SECOND ) , action {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11057) where in grammar will cause stream inner join loigcal
[ https://issues.apache.org/jira/browse/FLINK-11057?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yuemeng updated FLINK-11057: Description: {code:java} select action , count ( * ) as cnt from user_action where action in ( 'view' , 'impress' , 'sysaction' , 'commentimpress' , 'play' , 'click' , 'page' , 'abtestreqsuss' , 'bannerimpress' , 'abtestserver' , 'active' , 'search' , 'activeclient' , 'like' , 'zan' , 'adclick' , 'login' , 'comment' , 'subscribeartist' , 'subscribevideo' , 'subscribedjradio' , 'share' , 'private' , 'register' , 'downloadall' , 'forward' , 'newdj' , 'recommendimpress' , 'hotkeywordimpress' , 'nogetad' , 'add' , 'subscribe' , 'follow' , 'new' ) group by tumble ( proctime , interval '60' SECOND ) , action {code} sql such as this will be cause a stream inner join logical. but if i reduce the the element of parentheses,it can't cause the stream inner join logical was: {code} select action , count ( * ) as cnt from user_action where action in ( 'view' , 'impress' , 'sysaction' , 'commentimpress' , 'play' , 'click' , 'page' , 'abtestreqsuss' , 'bannerimpress' , 'abtestserver' , 'active' , 'search' , 'activeclient' , 'like' , 'zan' , 'adclick' , 'login' , 'comment' , 'subscribeartist' , 'subscribevideo' , 'subscribedjradio' , 'share' , 'private' , 'register' , 'downloadall' , 'forward' , 'newdj' , 'recommendimpress' , 'hotkeywordimpress' , 'nogetad' , 'add' , 'subscribe' , 'follow' , 'new' ) group by tumble ( proctime , interval '60' SECOND ) , action {code} > where in grammar will cause stream inner join loigcal > - > > Key: FLINK-11057 > URL: https://issues.apache.org/jira/browse/FLINK-11057 > Project: Flink > Issue Type: Bug >Reporter: yuemeng >Priority: Critical > > {code:java} > select action , count ( * ) as cnt from user_action where action in ( 'view' > , 'impress' , 'sysaction' , 'commentimpress' , 'play' , 'click' , 'page' , > 'abtestreqsuss' , 'bannerimpress' , 'abtestserver' , 'active' , 'search' , > 'activeclient' , 'like' , 'zan' , 'adclick' , 'login' , 'comment' , > 'subscribeartist' , 'subscribevideo' , 'subscribedjradio' , 'share' , > 'private' , 'register' , 'downloadall' , 'forward' , 'newdj' , > 'recommendimpress' , 'hotkeywordimpress' , 'nogetad' , 'add' , 'subscribe' , > 'follow' , 'new' ) group by tumble ( proctime , interval '60' SECOND ) , > action > {code} > sql such as this will be cause a stream inner join logical. > but if i reduce the the element of parentheses,it can't cause the stream > inner join logical -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11057) where in grammar will cause stream inner join loigcal
[ https://issues.apache.org/jira/browse/FLINK-11057?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yuemeng updated FLINK-11057: Description: {code:java} select action , count ( * ) as cnt from user_action where action in ( 'view' , 'impress' , 'sysaction' , 'commentimpress' , 'play' , 'click' , 'page' , 'abtestreqsuss' , 'bannerimpress' , 'abtestserver' , 'active' , 'search' , 'activeclient' , 'like' , 'zan' , 'adclick' , 'login' , 'comment' , 'subscribeartist' , 'subscribevideo' , 'subscribedjradio' , 'share' , 'private' , 'register' , 'downloadall' , 'forward' , 'newdj' , 'recommendimpress' , 'hotkeywordimpress' , 'nogetad' , 'add' , 'subscribe' , 'follow' , 'new' ) group by tumble ( proctime , interval '60' SECOND ) , action {code} sql such as this will be cause a stream inner join logical. but if i reduce the element of parentheses,it can't cause the stream inner join logical was: {code:java} select action , count ( * ) as cnt from user_action where action in ( 'view' , 'impress' , 'sysaction' , 'commentimpress' , 'play' , 'click' , 'page' , 'abtestreqsuss' , 'bannerimpress' , 'abtestserver' , 'active' , 'search' , 'activeclient' , 'like' , 'zan' , 'adclick' , 'login' , 'comment' , 'subscribeartist' , 'subscribevideo' , 'subscribedjradio' , 'share' , 'private' , 'register' , 'downloadall' , 'forward' , 'newdj' , 'recommendimpress' , 'hotkeywordimpress' , 'nogetad' , 'add' , 'subscribe' , 'follow' , 'new' ) group by tumble ( proctime , interval '60' SECOND ) , action {code} sql such as this will be cause a stream inner join logical. but if i reduce the the element of parentheses,it can't cause the stream inner join logical > where in grammar will cause stream inner join loigcal > - > > Key: FLINK-11057 > URL: https://issues.apache.org/jira/browse/FLINK-11057 > Project: Flink > Issue Type: Bug >Reporter: yuemeng >Priority: Critical > > {code:java} > select action , count ( * ) as cnt from user_action where action in ( 'view' > , 'impress' , 'sysaction' , 'commentimpress' , 'play' , 'click' , 'page' , > 'abtestreqsuss' , 'bannerimpress' , 'abtestserver' , 'active' , 'search' , > 'activeclient' , 'like' , 'zan' , 'adclick' , 'login' , 'comment' , > 'subscribeartist' , 'subscribevideo' , 'subscribedjradio' , 'share' , > 'private' , 'register' , 'downloadall' , 'forward' , 'newdj' , > 'recommendimpress' , 'hotkeywordimpress' , 'nogetad' , 'add' , 'subscribe' , > 'follow' , 'new' ) group by tumble ( proctime , interval '60' SECOND ) , > action > {code} > sql such as this will be cause a stream inner join logical. > but if i reduce the element of parentheses,it can't cause the stream inner > join logical -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11057) where in grammar will cause stream inner join loigcal
[ https://issues.apache.org/jira/browse/FLINK-11057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16708480#comment-16708480 ] yuemeng commented on FLINK-11057: - [~twalthr] thanks for your reply,FLINK-10474 can resolve this issue > where in grammar will cause stream inner join loigcal > - > > Key: FLINK-11057 > URL: https://issues.apache.org/jira/browse/FLINK-11057 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: yuemeng >Priority: Critical > > {code:java} > select action , count ( * ) as cnt from user_action where action in ( 'view' > , 'impress' , 'sysaction' , 'commentimpress' , 'play' , 'click' , 'page' , > 'abtestreqsuss' , 'bannerimpress' , 'abtestserver' , 'active' , 'search' , > 'activeclient' , 'like' , 'zan' , 'adclick' , 'login' , 'comment' , > 'subscribeartist' , 'subscribevideo' , 'subscribedjradio' , 'share' , > 'private' , 'register' , 'downloadall' , 'forward' , 'newdj' , > 'recommendimpress' , 'hotkeywordimpress' , 'nogetad' , 'add' , 'subscribe' , > 'follow' , 'new' ) group by tumble ( proctime , interval '60' SECOND ) , > action > {code} > sql such as this will be cause a stream inner join logical. > but if i reduce the element of parentheses,it can't cause the stream inner > join logical -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-6065) Make TransportClient for ES5 pluggable
[ https://issues.apache.org/jira/browse/FLINK-6065?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yuemeng reassigned FLINK-6065: -- Assignee: yuemeng > Make TransportClient for ES5 pluggable > -- > > Key: FLINK-6065 > URL: https://issues.apache.org/jira/browse/FLINK-6065 > Project: Flink > Issue Type: Improvement > Components: ElasticSearch Connector, Streaming Connectors >Reporter: Robert Metzger >Assignee: yuemeng >Priority: Major > Labels: pull-request-available > > This JIRA is based on a user request: > http://stackoverflow.com/questions/42807454/flink-xpack-elasticsearch-5-elasticsearchsecurityexception-missing-autentication?noredirect=1#comment72728053_42807454 > Currently, in the {{Elasticsearch5ApiCallBridge}} the > {{PreBuiltTransportClient}} is hardcoded. It would be nice to make this > client pluggable to allow using other clients such as the > {{PreBuiltXPackTransportClient}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-4827) Sql on streaming example use scala with wrong variable name
yuemeng created FLINK-4827: -- Summary: Sql on streaming example use scala with wrong variable name Key: FLINK-4827 URL: https://issues.apache.org/jira/browse/FLINK-4827 Project: Flink Issue Type: Bug Components: Documentation Affects Versions: 1.1.2, 1.1.0 Reporter: yuemeng Priority: Minor Fix For: 1.1.3 val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env) // read a DataStream from an external source val ds: DataStream[(Long, String, Integer)] = env.addSource(...) // register the DataStream under the name "Orders" tableEnv.registerDataStream("Orders", ds, 'user, 'product, 'amount) // run a SQL query on the Table and retrieve the result as a new Table val result = tableEnv.sql( "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'") There is no variable named tableEnv defined here,it's tEnv defined here -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4827) Sql on streaming example use scala with wrong variable name
[ https://issues.apache.org/jira/browse/FLINK-4827?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yuemeng updated FLINK-4827: --- Description: val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env) // read a DataStream from an external source val ds: DataStream[(Long, String, Integer)] = env.addSource(...) // register the DataStream under the name "Orders" tableEnv.registerDataStream("Orders", ds, 'user, 'product, 'amount) // run a SQL query on the Table and retrieve the result as a new Table val result = tableEnv.sql( "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'") There is no variable named tableEnv defined here,only tEnv defined here was: val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env) // read a DataStream from an external source val ds: DataStream[(Long, String, Integer)] = env.addSource(...) // register the DataStream under the name "Orders" tableEnv.registerDataStream("Orders", ds, 'user, 'product, 'amount) // run a SQL query on the Table and retrieve the result as a new Table val result = tableEnv.sql( "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'") There is no variable named tableEnv defined here,it's tEnv defined here > Sql on streaming example use scala with wrong variable name > --- > > Key: FLINK-4827 > URL: https://issues.apache.org/jira/browse/FLINK-4827 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.1.0, 1.1.2 >Reporter: yuemeng >Priority: Minor > Fix For: 1.1.3 > > > val env = StreamExecutionEnvironment.getExecutionEnvironment > val tEnv = TableEnvironment.getTableEnvironment(env) > // read a DataStream from an external source > val ds: DataStream[(Long, String, Integer)] = env.addSource(...) > // register the DataStream under the name "Orders" > tableEnv.registerDataStream("Orders", ds, 'user, 'product, 'amount) > // run a SQL query on the Table and retrieve the result as a new Table > val result = tableEnv.sql( > "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'") > There is no variable named tableEnv defined here,only tEnv defined here -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4827) The example use scala of SQL on Streaming Tables with wrong variable name
[ https://issues.apache.org/jira/browse/FLINK-4827?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yuemeng updated FLINK-4827: --- Summary: The example use scala of SQL on Streaming Tables with wrong variable name (was: Sql on streaming example use scala with wrong variable name) > The example use scala of SQL on Streaming Tables with wrong variable name > -- > > Key: FLINK-4827 > URL: https://issues.apache.org/jira/browse/FLINK-4827 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.1.0, 1.1.2 >Reporter: yuemeng >Priority: Minor > Fix For: 1.1.3 > > > val env = StreamExecutionEnvironment.getExecutionEnvironment > val tEnv = TableEnvironment.getTableEnvironment(env) > // read a DataStream from an external source > val ds: DataStream[(Long, String, Integer)] = env.addSource(...) > // register the DataStream under the name "Orders" > tableEnv.registerDataStream("Orders", ds, 'user, 'product, 'amount) > // run a SQL query on the Table and retrieve the result as a new Table > val result = tableEnv.sql( > "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'") > There is no variable named tableEnv defined here,only tEnv defined here -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4827) The code example use scala of SQL on Streaming Tables with wrong variable name in flink document
[ https://issues.apache.org/jira/browse/FLINK-4827?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yuemeng updated FLINK-4827: --- Summary: The code example use scala of SQL on Streaming Tables with wrong variable name in flink document (was: The example use scala of SQL on Streaming Tables with wrong variable name) > The code example use scala of SQL on Streaming Tables with wrong variable > name in flink document > - > > Key: FLINK-4827 > URL: https://issues.apache.org/jira/browse/FLINK-4827 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.1.0, 1.1.2 >Reporter: yuemeng >Priority: Minor > Fix For: 1.1.3 > > > val env = StreamExecutionEnvironment.getExecutionEnvironment > val tEnv = TableEnvironment.getTableEnvironment(env) > // read a DataStream from an external source > val ds: DataStream[(Long, String, Integer)] = env.addSource(...) > // register the DataStream under the name "Orders" > tableEnv.registerDataStream("Orders", ds, 'user, 'product, 'amount) > // run a SQL query on the Table and retrieve the result as a new Table > val result = tableEnv.sql( > "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'") > There is no variable named tableEnv defined here,only tEnv defined here -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4827) The code example use scala of SQL on Streaming Tables with wrong variable name in flink document
[ https://issues.apache.org/jira/browse/FLINK-4827?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yuemeng updated FLINK-4827: --- Description: val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env) // read a DataStream from an external source val ds: DataStream[(Long, String, Integer)] = env.addSource(...) // register the DataStream under the name "Orders" tableEnv.registerDataStream("Orders", ds, 'user, 'product, 'amount) // run a SQL query on the Table and retrieve the result as a new Table val result = tableEnv.sql( "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'") There is no variable named tableEnv had defined ,Only tEnv defined here was: val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env) // read a DataStream from an external source val ds: DataStream[(Long, String, Integer)] = env.addSource(...) // register the DataStream under the name "Orders" tableEnv.registerDataStream("Orders", ds, 'user, 'product, 'amount) // run a SQL query on the Table and retrieve the result as a new Table val result = tableEnv.sql( "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'") There is no variable named tableEnv had defined ,only tEnv defined here > The code example use scala of SQL on Streaming Tables with wrong variable > name in flink document > - > > Key: FLINK-4827 > URL: https://issues.apache.org/jira/browse/FLINK-4827 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.1.0, 1.1.2 >Reporter: yuemeng >Priority: Minor > Fix For: 1.1.3 > > > val env = StreamExecutionEnvironment.getExecutionEnvironment > val tEnv = TableEnvironment.getTableEnvironment(env) > // read a DataStream from an external source > val ds: DataStream[(Long, String, Integer)] = env.addSource(...) > // register the DataStream under the name "Orders" > tableEnv.registerDataStream("Orders", ds, 'user, 'product, 'amount) > // run a SQL query on the Table and retrieve the result as a new Table > val result = tableEnv.sql( > "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'") > There is no variable named tableEnv had defined ,Only tEnv defined here -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4827) The code example use scala of SQL on Streaming Tables with wrong variable name in flink document
[ https://issues.apache.org/jira/browse/FLINK-4827?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yuemeng updated FLINK-4827: --- Description: val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env) // read a DataStream from an external source val ds: DataStream[(Long, String, Integer)] = env.addSource(...) // register the DataStream under the name "Orders" tableEnv.registerDataStream("Orders", ds, 'user, 'product, 'amount) // run a SQL query on the Table and retrieve the result as a new Table val result = tableEnv.sql( "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'") There is no variable named tableEnv had defined ,only tEnv defined here was: val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env) // read a DataStream from an external source val ds: DataStream[(Long, String, Integer)] = env.addSource(...) // register the DataStream under the name "Orders" tableEnv.registerDataStream("Orders", ds, 'user, 'product, 'amount) // run a SQL query on the Table and retrieve the result as a new Table val result = tableEnv.sql( "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'") There is no variable named tableEnv defined here,only tEnv defined here > The code example use scala of SQL on Streaming Tables with wrong variable > name in flink document > - > > Key: FLINK-4827 > URL: https://issues.apache.org/jira/browse/FLINK-4827 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.1.0, 1.1.2 >Reporter: yuemeng >Priority: Minor > Fix For: 1.1.3 > > > val env = StreamExecutionEnvironment.getExecutionEnvironment > val tEnv = TableEnvironment.getTableEnvironment(env) > // read a DataStream from an external source > val ds: DataStream[(Long, String, Integer)] = env.addSource(...) > // register the DataStream under the name "Orders" > tableEnv.registerDataStream("Orders", ds, 'user, 'product, 'amount) > // run a SQL query on the Table and retrieve the result as a new Table > val result = tableEnv.sql( > "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'") > There is no variable named tableEnv had defined ,only tEnv defined here -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4827) The scala example of SQL on Streaming Tables with wrong variable name in flink document
[ https://issues.apache.org/jira/browse/FLINK-4827?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yuemeng updated FLINK-4827: --- Summary: The scala example of SQL on Streaming Tables with wrong variable name in flink document (was: The code example use scala of SQL on Streaming Tables with wrong variable name in flink document) > The scala example of SQL on Streaming Tables with wrong variable name in > flink document > > > Key: FLINK-4827 > URL: https://issues.apache.org/jira/browse/FLINK-4827 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.1.0, 1.1.2 >Reporter: yuemeng >Priority: Minor > Fix For: 1.1.3 > > > val env = StreamExecutionEnvironment.getExecutionEnvironment > val tEnv = TableEnvironment.getTableEnvironment(env) > // read a DataStream from an external source > val ds: DataStream[(Long, String, Integer)] = env.addSource(...) > // register the DataStream under the name "Orders" > tableEnv.registerDataStream("Orders", ds, 'user, 'product, 'amount) > // run a SQL query on the Table and retrieve the result as a new Table > val result = tableEnv.sql( > "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'") > There is no variable named tableEnv had defined ,Only tEnv defined here -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4827) The scala example of SQL on Streaming Tables with wrong variable name in flink document
[ https://issues.apache.org/jira/browse/FLINK-4827?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yuemeng updated FLINK-4827: --- Attachment: 0001-The-scala-example-of-SQL-on-Streaming-Tables-with-wr.patch > The scala example of SQL on Streaming Tables with wrong variable name in > flink document > > > Key: FLINK-4827 > URL: https://issues.apache.org/jira/browse/FLINK-4827 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.1.0, 1.1.2 >Reporter: yuemeng >Priority: Minor > Fix For: 1.1.3 > > Attachments: > 0001-The-scala-example-of-SQL-on-Streaming-Tables-with-wr.patch > > > val env = StreamExecutionEnvironment.getExecutionEnvironment > val tEnv = TableEnvironment.getTableEnvironment(env) > // read a DataStream from an external source > val ds: DataStream[(Long, String, Integer)] = env.addSource(...) > // register the DataStream under the name "Orders" > tableEnv.registerDataStream("Orders", ds, 'user, 'product, 'amount) > // run a SQL query on the Table and retrieve the result as a new Table > val result = tableEnv.sql( > "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'") > There is no variable named tableEnv had defined ,Only tEnv defined here -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4879) class KafkaTableSource should be public just like KafkaTableSink
yuemeng created FLINK-4879: -- Summary: class KafkaTableSource should be public just like KafkaTableSink Key: FLINK-4879 URL: https://issues.apache.org/jira/browse/FLINK-4879 Project: Flink Issue Type: Bug Components: Kafka Connector Affects Versions: 1.1.3, 1.1.1 Reporter: yuemeng Priority: Minor Fix For: 1.1.4 class KafkaTableSource should be public just like KafkaTableSink,by default,it's modifier is default ,and we cann't access out of it's package,for example: def createKafkaTableSource( topic: String, properties: Properties, deserializationSchema: DeserializationSchema[Row], fieldsNames: Array[String], typeInfo: Array[TypeInformation[_]]): KafkaTableSource = { if (deserializationSchema != null) { new Kafka09TableSource(topic, properties, deserializationSchema, fieldsNames, typeInfo) } else { new Kafka09JsonTableSource(topic, properties, fieldsNames, typeInfo) } } Because of the class KafkaTableSource modifier is default,we cann't define this function result type with KafkaTableSource ,we must give the specific type. if some other kafka source extends KafkaTableSource ,and we don't sure which subclass of KafkaTableSource should be use,how can we specific the type? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4879) class KafkaTableSource should be public just like KafkaTableSink
[ https://issues.apache.org/jira/browse/FLINK-4879?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yuemeng updated FLINK-4879: --- Description: class KafkaTableSource should be public just like KafkaTableSink,by default,it's modifier is default ,and we cann't access out of it's package, for example: def createKafkaTableSource( topic: String, properties: Properties, deserializationSchema: DeserializationSchema[Row], fieldsNames: Array[String], typeInfo: Array[TypeInformation[_]]): KafkaTableSource = { if (deserializationSchema != null) { new Kafka09TableSource(topic, properties, deserializationSchema, fieldsNames, typeInfo) } else { new Kafka09JsonTableSource(topic, properties, fieldsNames, typeInfo) } } Because of the class KafkaTableSource modifier is default,we cann't define this function result type with KafkaTableSource ,we must give the specific type. if some other kafka source extends KafkaTableSource ,and we don't sure which subclass of KafkaTableSource should be use,how can we specific the type? was: class KafkaTableSource should be public just like KafkaTableSink,by default,it's modifier is default ,and we cann't access out of it's package,for example: def createKafkaTableSource( topic: String, properties: Properties, deserializationSchema: DeserializationSchema[Row], fieldsNames: Array[String], typeInfo: Array[TypeInformation[_]]): KafkaTableSource = { if (deserializationSchema != null) { new Kafka09TableSource(topic, properties, deserializationSchema, fieldsNames, typeInfo) } else { new Kafka09JsonTableSource(topic, properties, fieldsNames, typeInfo) } } Because of the class KafkaTableSource modifier is default,we cann't define this function result type with KafkaTableSource ,we must give the specific type. if some other kafka source extends KafkaTableSource ,and we don't sure which subclass of KafkaTableSource should be use,how can we specific the type? > class KafkaTableSource should be public just like KafkaTableSink > > > Key: FLINK-4879 > URL: https://issues.apache.org/jira/browse/FLINK-4879 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.1.1, 1.1.3 >Reporter: yuemeng >Priority: Minor > Fix For: 1.1.4 > > > class KafkaTableSource should be public just like KafkaTableSink,by > default,it's modifier is default ,and we cann't access out of it's package, > for example: > def createKafkaTableSource( > topic: String, > properties: Properties, > deserializationSchema: DeserializationSchema[Row], > fieldsNames: Array[String], > typeInfo: Array[TypeInformation[_]]): KafkaTableSource = { > if (deserializationSchema != null) { > new Kafka09TableSource(topic, properties, deserializationSchema, > fieldsNames, typeInfo) > } else { > new Kafka09JsonTableSource(topic, properties, fieldsNames, typeInfo) > } > } > Because of the class KafkaTableSource modifier is default,we cann't define > this function result type with KafkaTableSource ,we must give the specific > type. > if some other kafka source extends KafkaTableSource ,and we don't sure which > subclass of KafkaTableSource should be use,how can we specific the type? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4879) class KafkaTableSource should be public just like KafkaTableSink
[ https://issues.apache.org/jira/browse/FLINK-4879?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yuemeng updated FLINK-4879: --- Description: class KafkaTableSource should be public just like KafkaTableSink,by default,it's modifier is default ,and we cann't access out of it's package, for example: def createKafkaTableSource( topic: String, properties: Properties, deserializationSchema: DeserializationSchema[Row], fieldsNames: Array[String], typeInfo: Array[TypeInformation[_]]): KafkaTableSource = { if (deserializationSchema != null) { new Kafka09TableSource(topic, properties, deserializationSchema, fieldsNames, typeInfo) } else { new Kafka09JsonTableSource(topic, properties, fieldsNames, typeInfo) } } Because of the class KafkaTableSource modifier is default,we cann't define this function result type with KafkaTableSource ,we must give the specific type. if some other kafka source extends KafkaTableSource ,and we don't sure which subclass of KafkaTableSource should be use,how can we specific the type? was: class KafkaTableSource should be public just like KafkaTableSink,by default,it's modifier is default ,and we cann't access out of it's package, for example: def createKafkaTableSource( topic: String, properties: Properties, deserializationSchema: DeserializationSchema[Row], fieldsNames: Array[String], typeInfo: Array[TypeInformation[_]]): KafkaTableSource = { if (deserializationSchema != null) { new Kafka09TableSource(topic, properties, deserializationSchema, fieldsNames, typeInfo) } else { new Kafka09JsonTableSource(topic, properties, fieldsNames, typeInfo) } } Because of the class KafkaTableSource modifier is default,we cann't define this function result type with KafkaTableSource ,we must give the specific type. if some other kafka source extends KafkaTableSource ,and we don't sure which subclass of KafkaTableSource should be use,how can we specific the type? > class KafkaTableSource should be public just like KafkaTableSink > > > Key: FLINK-4879 > URL: https://issues.apache.org/jira/browse/FLINK-4879 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.1.1, 1.1.3 >Reporter: yuemeng >Priority: Minor > Fix For: 1.1.4 > > > class KafkaTableSource should be public just like KafkaTableSink,by > default,it's modifier is default ,and we cann't access out of it's package, > for example: > > def createKafkaTableSource( > topic: String, > properties: Properties, > deserializationSchema: DeserializationSchema[Row], > fieldsNames: Array[String], > typeInfo: Array[TypeInformation[_]]): KafkaTableSource = { > if (deserializationSchema != null) { > new Kafka09TableSource(topic, properties, deserializationSchema, > fieldsNames, typeInfo) > } else { > new Kafka09JsonTableSource(topic, properties, fieldsNames, typeInfo) > } > } > > Because of the class KafkaTableSource modifier is default,we cann't define > this function result type with KafkaTableSource ,we must give the specific > type. > if some other kafka source extends KafkaTableSource ,and we don't sure which > subclass of KafkaTableSource should be use,how can we specific the type? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4879) class KafkaTableSource should be public just like KafkaTableSink
[ https://issues.apache.org/jira/browse/FLINK-4879?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yuemeng updated FLINK-4879: --- Description: *** class KafkaTableSource should be public just like KafkaTableSink,by default,it's modifier is default ,and we cann't access out of it's package***, for example: def createKafkaTableSource( topic: String, properties: Properties, deserializationSchema: DeserializationSchema[Row], fieldsNames: Array[String], typeInfo: Array[TypeInformation[_]]): KafkaTableSource = { if (deserializationSchema != null) { new Kafka09TableSource(topic, properties, deserializationSchema, fieldsNames, typeInfo) } else { new Kafka09JsonTableSource(topic, properties, fieldsNames, typeInfo) } } Because of the class KafkaTableSource modifier is default,we cann't define this function result type with KafkaTableSource ,we must give the specific type. if some other kafka source extends KafkaTableSource ,and we don't sure which subclass of KafkaTableSource should be use,how can we specific the type? was: class KafkaTableSource should be public just like KafkaTableSink,by default,it's modifier is default ,and we cann't access out of it's package, for example: def createKafkaTableSource( topic: String, properties: Properties, deserializationSchema: DeserializationSchema[Row], fieldsNames: Array[String], typeInfo: Array[TypeInformation[_]]): KafkaTableSource = { if (deserializationSchema != null) { new Kafka09TableSource(topic, properties, deserializationSchema, fieldsNames, typeInfo) } else { new Kafka09JsonTableSource(topic, properties, fieldsNames, typeInfo) } } Because of the class KafkaTableSource modifier is default,we cann't define this function result type with KafkaTableSource ,we must give the specific type. if some other kafka source extends KafkaTableSource ,and we don't sure which subclass of KafkaTableSource should be use,how can we specific the type? > class KafkaTableSource should be public just like KafkaTableSink > > > Key: FLINK-4879 > URL: https://issues.apache.org/jira/browse/FLINK-4879 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.1.1, 1.1.3 >Reporter: yuemeng >Priority: Minor > Fix For: 1.1.4 > > > *** class KafkaTableSource should be public just like KafkaTableSink,by > default,it's modifier is default ,and we cann't access out of it's package***, > for example: > > def createKafkaTableSource( > topic: String, > properties: Properties, > deserializationSchema: DeserializationSchema[Row], > fieldsNames: Array[String], > typeInfo: Array[TypeInformation[_]]): KafkaTableSource = { > if (deserializationSchema != null) { > new Kafka09TableSource(topic, properties, deserializationSchema, > fieldsNames, typeInfo) > } else { > new Kafka09JsonTableSource(topic, properties, fieldsNames, typeInfo) > } > } > > Because of the class KafkaTableSource modifier is default,we cann't define > this function result type with KafkaTableSource ,we must give the specific > type. > if some other kafka source extends KafkaTableSource ,and we don't sure which > subclass of KafkaTableSource should be use,how can we specific the type? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4879) class KafkaTableSource should be public just like KafkaTableSink
[ https://issues.apache.org/jira/browse/FLINK-4879?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yuemeng updated FLINK-4879: --- Description: class KafkaTableSource should be public just like KafkaTableSink,by default,it's modifier is default ,and we cann't access out of it's package, for example: def createKafkaTableSource( topic: String, properties: Properties, deserializationSchema: DeserializationSchema[Row], fieldsNames: Array[String], typeInfo: Array[TypeInformation[_]]): KafkaTableSource = { if (deserializationSchema != null) { new Kafka09TableSource(topic, properties, deserializationSchema, fieldsNames, typeInfo) } else { new Kafka09JsonTableSource(topic, properties, fieldsNames, typeInfo) } } Because of the class KafkaTableSource modifier is default,we cann't define this function result type with KafkaTableSource ,we must give the specific type. if some other kafka source extends KafkaTableSource ,and we don't sure which subclass of KafkaTableSource should be use,how can we specific the type? was: *** class KafkaTableSource should be public just like KafkaTableSink,by default,it's modifier is default ,and we cann't access out of it's package***, for example: def createKafkaTableSource( topic: String, properties: Properties, deserializationSchema: DeserializationSchema[Row], fieldsNames: Array[String], typeInfo: Array[TypeInformation[_]]): KafkaTableSource = { if (deserializationSchema != null) { new Kafka09TableSource(topic, properties, deserializationSchema, fieldsNames, typeInfo) } else { new Kafka09JsonTableSource(topic, properties, fieldsNames, typeInfo) } } Because of the class KafkaTableSource modifier is default,we cann't define this function result type with KafkaTableSource ,we must give the specific type. if some other kafka source extends KafkaTableSource ,and we don't sure which subclass of KafkaTableSource should be use,how can we specific the type? > class KafkaTableSource should be public just like KafkaTableSink > > > Key: FLINK-4879 > URL: https://issues.apache.org/jira/browse/FLINK-4879 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.1.1, 1.1.3 >Reporter: yuemeng >Priority: Minor > Fix For: 1.1.4 > > > class KafkaTableSource should be public just like KafkaTableSink,by > default,it's modifier is default ,and we cann't access out of it's package, > for example: > def createKafkaTableSource( > topic: String, > properties: Properties, > deserializationSchema: DeserializationSchema[Row], > fieldsNames: Array[String], > typeInfo: Array[TypeInformation[_]]): KafkaTableSource = { > if (deserializationSchema != null) { > new Kafka09TableSource(topic, properties, deserializationSchema, > fieldsNames, typeInfo) > } else { > new Kafka09JsonTableSource(topic, properties, fieldsNames, typeInfo) > } > } > Because of the class KafkaTableSource modifier is default,we cann't define > this function result type with KafkaTableSource ,we must give the specific > type. > if some other kafka source extends KafkaTableSource ,and we don't sure which > subclass of KafkaTableSource should be use,how can we specific the type? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4879) class KafkaTableSource should be public just like KafkaTableSink
[ https://issues.apache.org/jira/browse/FLINK-4879?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yuemeng updated FLINK-4879: --- Attachment: 0001-class-KafkaTableSource-should-be-public.patch > class KafkaTableSource should be public just like KafkaTableSink > > > Key: FLINK-4879 > URL: https://issues.apache.org/jira/browse/FLINK-4879 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.1.1, 1.1.3 >Reporter: yuemeng >Priority: Minor > Fix For: 1.1.4 > > Attachments: 0001-class-KafkaTableSource-should-be-public.patch > > > class KafkaTableSource should be public just like KafkaTableSink,by > default,it's modifier is default ,and we cann't access out of it's package, > for example: > def createKafkaTableSource( > topic: String, > properties: Properties, > deserializationSchema: DeserializationSchema[Row], > fieldsNames: Array[String], > typeInfo: Array[TypeInformation[_]]): KafkaTableSource = { > if (deserializationSchema != null) { > new Kafka09TableSource(topic, properties, deserializationSchema, > fieldsNames, typeInfo) > } else { > new Kafka09JsonTableSource(topic, properties, fieldsNames, typeInfo) > } > } > Because of the class KafkaTableSource modifier is default,we cann't define > this function result type with KafkaTableSource ,we must give the specific > type. > if some other kafka source extends KafkaTableSource ,and we don't sure which > subclass of KafkaTableSource should be use,how can we specific the type? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4879) class KafkaTableSource should be public just like KafkaTableSink
[ https://issues.apache.org/jira/browse/FLINK-4879?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yuemeng updated FLINK-4879: --- Description: *class KafkaTableSource should be public just like KafkaTableSink,by default,it's modifier is default ,and we cann't access out of it's package*, for example: {code} def createKafkaTableSource( topic: String, properties: Properties, deserializationSchema: DeserializationSchema[Row], fieldsNames: Array[String], typeInfo: Array[TypeInformation[_]]): KafkaTableSource = { if (deserializationSchema != null) { new Kafka09TableSource(topic, properties, deserializationSchema, fieldsNames, typeInfo) } else { new Kafka09JsonTableSource(topic, properties, fieldsNames, typeInfo) } } {code} Because of the class KafkaTableSource modifier is default,we cann't define this function result type with KafkaTableSource ,we must give the specific type. if some other kafka source extends KafkaTableSource ,and we don't sure which subclass of KafkaTableSource should be use,how can we specific the type? was: class KafkaTableSource should be public just like KafkaTableSink,by default,it's modifier is default ,and we cann't access out of it's package, for example: def createKafkaTableSource( topic: String, properties: Properties, deserializationSchema: DeserializationSchema[Row], fieldsNames: Array[String], typeInfo: Array[TypeInformation[_]]): KafkaTableSource = { if (deserializationSchema != null) { new Kafka09TableSource(topic, properties, deserializationSchema, fieldsNames, typeInfo) } else { new Kafka09JsonTableSource(topic, properties, fieldsNames, typeInfo) } } Because of the class KafkaTableSource modifier is default,we cann't define this function result type with KafkaTableSource ,we must give the specific type. if some other kafka source extends KafkaTableSource ,and we don't sure which subclass of KafkaTableSource should be use,how can we specific the type? > class KafkaTableSource should be public just like KafkaTableSink > > > Key: FLINK-4879 > URL: https://issues.apache.org/jira/browse/FLINK-4879 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.1.1, 1.1.3 >Reporter: yuemeng >Priority: Minor > Fix For: 1.1.4 > > Attachments: 0001-class-KafkaTableSource-should-be-public.patch > > > *class KafkaTableSource should be public just like KafkaTableSink,by > default,it's modifier is default ,and we cann't access out of it's package*, > for example: > {code} > def createKafkaTableSource( > topic: String, > properties: Properties, > deserializationSchema: DeserializationSchema[Row], > fieldsNames: Array[String], > typeInfo: Array[TypeInformation[_]]): KafkaTableSource = { > if (deserializationSchema != null) { > new Kafka09TableSource(topic, properties, deserializationSchema, > fieldsNames, typeInfo) > } else { > new Kafka09JsonTableSource(topic, properties, fieldsNames, typeInfo) > } > } > {code} > Because of the class KafkaTableSource modifier is default,we cann't define > this function result type with KafkaTableSource ,we must give the specific > type. > if some other kafka source extends KafkaTableSource ,and we don't sure which > subclass of KafkaTableSource should be use,how can we specific the type? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5324) JVM Opitons will be work both for YarnApplicationMasterRunner and YarnTaskManager with yarn mode
yuemeng created FLINK-5324: -- Summary: JVM Opitons will be work both for YarnApplicationMasterRunner and YarnTaskManager with yarn mode Key: FLINK-5324 URL: https://issues.apache.org/jira/browse/FLINK-5324 Project: Flink Issue Type: Bug Components: YARN Affects Versions: 1.1.3 Reporter: yuemeng Priority: Critical YarnApplicationMasterRunner and YarnTaskManager both use follow code to get jvm options {code} final String javaOpts = flinkConfiguration.getString(ConfigConstants.FLINK_JVM_OPTIONS, "") {code/} so when we add some jvm options for one of them ,it will be both worked -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5324) JVM Opitons will be work both for YarnApplicationMasterRunner and YarnTaskManager with yarn mode
[ https://issues.apache.org/jira/browse/FLINK-5324?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yuemeng updated FLINK-5324: --- Description: YarnApplicationMasterRunner and YarnTaskManager both use follow code to get jvm options {code} final String javaOpts = flinkConfiguration.getString(ConfigConstants.FLINK_JVM_OPTIONS, "") {code} so when we add some jvm options for one of them ,it will be both worked was: YarnApplicationMasterRunner and YarnTaskManager both use follow code to get jvm options {code} final String javaOpts = flinkConfiguration.getString(ConfigConstants.FLINK_JVM_OPTIONS, "") {code/} so when we add some jvm options for one of them ,it will be both worked > JVM Opitons will be work both for YarnApplicationMasterRunner and > YarnTaskManager with yarn mode > > > Key: FLINK-5324 > URL: https://issues.apache.org/jira/browse/FLINK-5324 > Project: Flink > Issue Type: Bug > Components: YARN >Affects Versions: 1.1.3 >Reporter: yuemeng >Priority: Critical > > YarnApplicationMasterRunner and YarnTaskManager both use follow code to get > jvm options > {code} > final String javaOpts = > flinkConfiguration.getString(ConfigConstants.FLINK_JVM_OPTIONS, "") > {code} > so when we add some jvm options for one of them ,it will be both worked -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5324) JVM Opitons will be work both for YarnApplicationMasterRunner and YarnTaskManager with yarn mode
[ https://issues.apache.org/jira/browse/FLINK-5324?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yuemeng updated FLINK-5324: --- Attachment: 0001-FLINK-5324-yarn-JVM-Opitons-will-work-for-both-YarnA.patch > JVM Opitons will be work both for YarnApplicationMasterRunner and > YarnTaskManager with yarn mode > > > Key: FLINK-5324 > URL: https://issues.apache.org/jira/browse/FLINK-5324 > Project: Flink > Issue Type: Bug > Components: YARN >Affects Versions: 1.1.3 >Reporter: yuemeng >Priority: Critical > Attachments: > 0001-FLINK-5324-yarn-JVM-Opitons-will-work-for-both-YarnA.patch > > > YarnApplicationMasterRunner and YarnTaskManager both use follow code to get > jvm options > {code} > final String javaOpts = > flinkConfiguration.getString(ConfigConstants.FLINK_JVM_OPTIONS, "") > {code} > so when we add some jvm options for one of them ,it will be both worked -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-9329) hasRowtimeAttribute will throw NPE if user use setProctimeAttribute for table source
[ https://issues.apache.org/jira/browse/FLINK-9329?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yuemeng closed FLINK-9329. -- Resolution: Invalid > hasRowtimeAttribute will throw NPE if user use setProctimeAttribute for table > source > > > Key: FLINK-9329 > URL: https://issues.apache.org/jira/browse/FLINK-9329 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: yuemeng >Assignee: yuemeng >Priority: Critical > > {code:java} > Examples: > KafkaTableSource source = Kafka010JsonTableSource.builder() > .withSchema(TableSchema.builder() > .field("sensorId", Types.LONG()) > .field("temp", Types.DOUBLE()) > .field("ptime", Types.SQL_TIMESTAMP()).build()) > .withProctimeAttribute("ptime") > .build(); tableEnv.registerTableSource("flights", source ); {code} > {{ }} > {code:java} > Kafka010JsonTableSource implement the DefinedRowtimeAttributes . > so when TableSourceUtil call hasRowtimeAttribute(source)to check ,it will > call follow code > /** Returns a list with all rowtime attribute names of the [[TableSource]]. */ > private def getRowtimeAttributes(tableSource: TableSource[_]): Array[String] > = { > tableSource match { > case r: DefinedRowtimeAttributes => > r.getRowtimeAttributeDescriptors.asScala.map(_.getAttributeName).toArray > case _ => > Array() > } > } > r.getRowtimeAttributeDescriptors will throw NPE because of we use > ProctimeAttribute here > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-5726) Add the RocketMQ plugin for the Apache Flink
[ https://issues.apache.org/jira/browse/FLINK-5726?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yuemeng reassigned FLINK-5726: -- Assignee: yuemeng > Add the RocketMQ plugin for the Apache Flink > > > Key: FLINK-5726 > URL: https://issues.apache.org/jira/browse/FLINK-5726 > Project: Flink > Issue Type: Task > Components: Streaming Connectors >Reporter: Longda Feng >Assignee: yuemeng >Priority: Minor > > Apache RocketMQ® is an open source distributed messaging and streaming data > platform. It has been used in a lot of companies. Please refer to > http://rocketmq.incubator.apache.org/ for more details. > Since the Apache RocketMq 4.0 will be released in the next few days, we can > start the job of adding the RocketMq plugin for the Apache Flink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9201) same merge window will be fired twice if watermark already pass the new merged window
[ https://issues.apache.org/jira/browse/FLINK-9201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yuemeng updated FLINK-9201: --- Summary: same merge window will be fired twice if watermark already pass the new merged window (was: same merge window will fire twice if watermark already pass the window for merge windows) > same merge window will be fired twice if watermark already pass the new > merged window > - > > Key: FLINK-9201 > URL: https://issues.apache.org/jira/browse/FLINK-9201 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.3.3 >Reporter: yuemeng >Assignee: yuemeng >Priority: Blocker > > sum with session window,. > suppose the session gap is 3 seconds and allowedlateness is 60 seconds > w1,TimeWindow[1,9] had elements,1,2,3,6,will be fired if watermark reached 9 > if a late element (w2,TimeWindow[7,10]) had come but the watermark already > at 11. > w1,w2 will be merged a new window w3 TimeWindow[1,10] and will be register a > new timer by call triggerContext.onMerge(mergedWindows),then w3 will be fired > later by call triggerContext.onElement(element) because of the watermark pass > the w3. > but w3 will be fired again because of the timer < current watermark. > that mean w3 will be fired twice because of watermark pass the new merge > window w3. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9201) same merge window will fire twice if watermark already pass the window for merge windows
yuemeng created FLINK-9201: -- Summary: same merge window will fire twice if watermark already pass the window for merge windows Key: FLINK-9201 URL: https://issues.apache.org/jira/browse/FLINK-9201 Project: Flink Issue Type: Bug Components: Core Affects Versions: 1.3.3 Reporter: yuemeng Assignee: yuemeng sum with session window,. suppose the session gap is 3 seconds and allowedlateness is 60 seconds w1,TimeWindow[1,9] had elements,1,2,3,6,will be fired if watermark reached 9 if a late element (w2,TimeWindow[7,10]) had come but the watermark already at 11. w1,w2 will be merged a new window w3 TimeWindow[1,10] and will be register a new timer by call triggerContext.onMerge(mergedWindows),then w3 will be fired later by call triggerContext.onElement(element) because of the watermark pass the w3. but w3 will be fired again because of the timer < current watermark. that mean w3 will be fired twice because of watermark pass the new merge window w3. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9201) same merge window will be fired twice if watermark already pass the new merged window
[ https://issues.apache.org/jira/browse/FLINK-9201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yuemeng updated FLINK-9201: --- Description: sum with session window,.suppose the session gap is 3 seconds and allowedlateness is 60 seconds w1,TimeWindow[1,9] had elements,1,2,3,6,will be fired if watermark reached 9 if a late element (w2,TimeWindow[7,10]) had come but the watermark already at 11. w1,w2 will be merged a new window w3 TimeWindow[1,10] and will be register a new timer by call triggerContext.onMerge(mergedWindows),w3 will be fired first time by call triggerContext.onElement(element) because of the watermark pass the w3. w3 will be fired second times because of the timer < current watermark. that mean w3 will be fired twice because of watermark pass the new merge window w3. was: sum with session window,. suppose the session gap is 3 seconds and allowedlateness is 60 seconds w1,TimeWindow[1,9] had elements,1,2,3,6,will be fired if watermark reached 9 if a late element (w2,TimeWindow[7,10]) had come but the watermark already at 11. w1,w2 will be merged a new window w3 TimeWindow[1,10] and will be register a new timer by call triggerContext.onMerge(mergedWindows),then w3 will be fired later by call triggerContext.onElement(element) because of the watermark pass the w3. but w3 will be fired again because of the timer < current watermark. that mean w3 will be fired twice because of watermark pass the new merge window w3. > same merge window will be fired twice if watermark already pass the new > merged window > - > > Key: FLINK-9201 > URL: https://issues.apache.org/jira/browse/FLINK-9201 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.3.3 >Reporter: yuemeng >Assignee: yuemeng >Priority: Blocker > > sum with session window,.suppose the session gap is 3 seconds and > allowedlateness is 60 seconds > w1,TimeWindow[1,9] had elements,1,2,3,6,will be fired if watermark reached 9 > if a late element (w2,TimeWindow[7,10]) had come but the watermark already > at 11. > w1,w2 will be merged a new window w3 TimeWindow[1,10] and will be register a > new timer by call triggerContext.onMerge(mergedWindows),w3 will be fired > first time by call triggerContext.onElement(element) because of the watermark > pass the w3. > w3 will be fired second times because of the timer < current watermark. > that mean w3 will be fired twice because of watermark pass the new merge > window w3. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9201) same merge window will be fired twice if watermark already pass the new merged window
[ https://issues.apache.org/jira/browse/FLINK-9201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yuemeng updated FLINK-9201: --- Description: sum with session window,.suppose the session gap is 3 seconds and allowedlateness is 60 seconds * w1,TimeWindow[1,9] had elements,1,2,3,6,will be fired if watermark reached 9 * if a late element (w2,TimeWindow[7,10]) had come but the watermark already at 11. * w1,w2 will be merged a new window w3 TimeWindow[1,10] and will be register a new timer by call triggerContext.onMerge(mergedWindows),w3 will be fired first time by call triggerContext.onElement(element) because of the watermark pass the w3. w3 will be fired second times because of the timer < current watermark. that mean w3 will be fired twice because of watermark pass the new merge window w3. was: sum with session window,.suppose the session gap is 3 seconds and allowedlateness is 60 seconds w1,TimeWindow[1,9] had elements,1,2,3,6,will be fired if watermark reached 9 if a late element (w2,TimeWindow[7,10]) had come but the watermark already at 11. w1,w2 will be merged a new window w3 TimeWindow[1,10] and will be register a new timer by call triggerContext.onMerge(mergedWindows),w3 will be fired first time by call triggerContext.onElement(element) because of the watermark pass the w3. w3 will be fired second times because of the timer < current watermark. that mean w3 will be fired twice because of watermark pass the new merge window w3. > same merge window will be fired twice if watermark already pass the new > merged window > - > > Key: FLINK-9201 > URL: https://issues.apache.org/jira/browse/FLINK-9201 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.3.3 >Reporter: yuemeng >Assignee: yuemeng >Priority: Blocker > > sum with session window,.suppose the session gap is 3 seconds and > allowedlateness is 60 seconds > * w1,TimeWindow[1,9] had elements,1,2,3,6,will be fired if watermark reached > 9 > * if a late element (w2,TimeWindow[7,10]) had come but the watermark > already at 11. > * w1,w2 will be merged a new window w3 TimeWindow[1,10] and will be register > a new timer by call triggerContext.onMerge(mergedWindows),w3 will be fired > first time by call triggerContext.onElement(element) because of the watermark > pass the w3. w3 will be fired second times because of the timer < current > watermark. > that mean w3 will be fired twice because of watermark pass the new merge > window w3. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9201) same merge window will be fired twice if watermark already pass the new merged window
[ https://issues.apache.org/jira/browse/FLINK-9201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yuemeng updated FLINK-9201: --- Description: sum with session window,.suppose the session gap is 3 seconds and allowedlateness is 60 seconds * w1,TimeWindow[1,9] had elements,1,2,3,6,will be fired if watermark reached 9 * if a late element (w2,TimeWindow[7,10]) had come but the watermark already at 11. * w1,w2 will be merged a new window w3 TimeWindow[1,10] and will be register a new timer by call triggerContext.onMerge(mergedWindows),w3 will be fired first time by call triggerContext.onElement(element) because of the watermark pass the w3. w3 will be fired second times because of the timer < current watermark. that mean w3 will be fired twice because of watermark pass the new merge window w3. Examples {code} @Test @SuppressWarnings("unchecked") public void testSessionWindowsFiredTwice() throws Exception { closeCalled.set(0); final int sessionSize = 3; TypeInformation> inputType = TypeInfoParser.parse("Tuple2"); ListStateDescriptor> stateDesc = new ListStateDescriptor<>("window-contents", inputType.createSerializer(new ExecutionConfig())); WindowOperator, Iterable>, Tuple3, TimeWindow> operator = new WindowOperator<>( EventTimeSessionWindows.withGap(Time.seconds(sessionSize)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), stateDesc, new InternalIterableWindowFunction<>(new SessionWindowFunction()), EventTimeTrigger.create(), 6, null /* late data output tag */); OneInputStreamOperatorTestHarness, Tuple3> testHarness = createTestHarness(operator); ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>(); testHarness.open(); // add elements out-of-order testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 0)); testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 2), 1000)); testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 3), 2500)); testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 10)); testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), 1000)); testHarness.processWatermark(new Watermark(5500)); expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-6", 0L, 5500L), 5499)); expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-3", 10L, 4000L), 3999)); expectedOutput.add(new Watermark(5500)); // do a snapshot, close and restore again OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L); TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator()); testHarness.close(); testHarness = createTestHarness(operator); testHarness.setup(); testHarness.initializeState(snapshot); testHarness.open(); expectedOutput.clear(); //suppose the watermark alread arrived 1 testHarness.processWatermark(new Watermark(1)); //late element with timestamp 4500 had arrived,the new session window[0, 7500] is still a valid window becase of maxtimestamp < cleantime //and fired immediately testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 6), 4500)); expectedOutput.add(new Watermark(1)); expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-12", 0L, 7500L), 7499)); //when a new watermark had arrived,the same TimeWindow[0, 7500] will fired again testHarness.processWatermark(new Watermark(11000)); expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-12", 0L, 7500L), 7499)); expectedOutput.add(new Watermark(11000)); TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator()); testHarness.close(); } {code} was: sum with session window,.suppose the session gap is 3 seconds and allowedlateness is 60 seconds * w1,TimeWindow[1,9] had elements,1,2,3,6,will be fired if watermark reached 9 * if a late element (w2,TimeWindow[7,10]) had come but the watermark already at 11. * w1,w2 will be merged a new window w3 TimeWindow[1,10] and will be register a new timer by call triggerContext.onMerge(mergedWindows),w3 will be fired first time by call triggerContext.onElement(element) because of the watermark pass the w3. w3 will be fired second times because of the timer < current watermark. that mean w3 will be fired twice because of watermark pass the new merge window w3. > same merge window will be fired twice if watermark already pass the new > merged window > - > > Key: FLINK-9201 > URL: https://issues.apache.org/jira/browse/FLINK-9201 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.3.3 >Reporter: yuemeng >
[jira] [Updated] (FLINK-9201) same merge window will be fired twice if watermark already pass the new merged window
[ https://issues.apache.org/jira/browse/FLINK-9201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yuemeng updated FLINK-9201: --- Description: sum with session window,.suppose the session gap is 3 seconds and allowedlateness is 60 seconds * w1,TimeWindow[1,9] had elements,1,2,3,6,will be fired if watermark reached 9 * if a late element (w2,TimeWindow[7,10]) had come but the watermark already at 11. * w1,w2 will be merged a new window w3 TimeWindow[1,10] and will be register a new timer by call triggerContext.onMerge(mergedWindows),w3 will be fired first time by call triggerContext.onElement(element) because of the watermark pass the w3. w3 will be fired second times because of the timer < current watermark. that mean w3 will be fired twice because of watermark pass the new merge window w3. Examples {code:java} @Test @SuppressWarnings("unchecked") public void testSessionWindowsFiredTwice() throws Exception { closeCalled.set(0); final int sessionSize = 3; TypeInformation> inputType = TypeInfoParser.parse("Tuple2"); ListStateDescriptor> stateDesc = new ListStateDescriptor<>("window-contents", inputType.createSerializer(new ExecutionConfig())); WindowOperator, Iterable>, Tuple3, TimeWindow> operator = new WindowOperator<>( EventTimeSessionWindows.withGap(Time.seconds(sessionSize)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), stateDesc, new InternalIterableWindowFunction<>(new SessionWindowFunction()), EventTimeTrigger.create(), 6, null /* late data output tag */); OneInputStreamOperatorTestHarness, Tuple3> testHarness = createTestHarness(operator); ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>(); testHarness.open(); // add elements out-of-order testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 0)); testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 2), 1000)); testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 3), 2500)); testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 10)); testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), 1000)); testHarness.processWatermark(new Watermark(5500)); expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-6", 0L, 5500L), 5499)); expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-3", 10L, 4000L), 3999)); expectedOutput.add(new Watermark(5500)); // do a snapshot, close and restore again OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L); TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator()); testHarness.close(); testHarness = createTestHarness(operator); testHarness.setup(); testHarness.initializeState(snapshot); testHarness.open(); expectedOutput.clear(); //suppose the watermark alread arrived 1 testHarness.processWatermark(new Watermark(1)); //late element with timestamp 4500 had arrived,the new session window[0, 7500] is still a valid window becase of maxtimestamp < cleantime //and fired immediately testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 6), 4500)); expectedOutput.add(new Watermark(1)); expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-12", 0L, 7500L), 7499)); //when a new watermark had arrived,the same TimeWindow[0, 7500] will fired again becase of a new timer rigsterd by call triggerOnMerge testHarness.processWatermark(new Watermark(11000)); expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-12", 0L, 7500L), 7499)); expectedOutput.add(new Watermark(11000)); TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator()); testHarness.close(); } {code} was: sum with session window,.suppose the session gap is 3 seconds and allowedlateness is 60 seconds * w1,TimeWindow[1,9] had elements,1,2,3,6,will be fired if watermark reached 9 * if a late element (w2,TimeWindow[7,10]) had come but the watermark already at 11. * w1,w2 will be merged a new window w3 TimeWindow[1,10] and will be register a new timer by call triggerContext.onMerge(mergedWindows),w3 will be fired first time by call triggerContext.onElement(element) because of the watermark pass the w3. w3 will be fired second times because of the timer < current watermark. that mean w3 will be fired twice because of watermark pass the new merge window w3. Examples {code} @Test @SuppressWarnings("unchecked") public void testSessionWindowsFiredTwice() throws Exception { closeCalled.set(0); final int sessionSize = 3; TypeInformation> inputType = TypeInfoParser.parse("Tuple2"); ListStateDescriptor> stateDesc = new ListStateDescriptor<>("window-contents", inputType.createSerializer(new ExecutionConfig())); WindowOperator,
[jira] [Updated] (FLINK-9201) same merge window will be fired twice if watermark already pass the new merged window
[ https://issues.apache.org/jira/browse/FLINK-9201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yuemeng updated FLINK-9201: --- Description: sum with session window,.suppose the session gap is 3 seconds and allowedlateness is 60 seconds * w1,TimeWindow[1,9] had elements,1,2,3,6,will be fired if watermark reached 9 * if a late element (w2,TimeWindow[7,10]) had come but the watermark already at 11. * w1,w2 will be merged a new window w3 TimeWindow[1,10] and will be register a new timer by call triggerContext.onMerge(mergedWindows),w3 will be fired first time by call triggerContext.onElement(element) because of the watermark pass the w3. w3 will be fired second times because of the timer < current watermark. that mean w3 will be fired twice because of watermark pass the new merge window w3. Examples {code:java} @Test @SuppressWarnings("unchecked") public void testSessionWindowsFiredTwice() throws Exception { closeCalled.set(0); final int sessionSize = 3; TypeInformation> inputType = TypeInfoParser.parse("Tuple2"); ListStateDescriptor> stateDesc = new ListStateDescriptor<>("window-contents", inputType.createSerializer(new ExecutionConfig())); WindowOperator, Iterable>, Tuple3, TimeWindow> operator = new WindowOperator<>( EventTimeSessionWindows.withGap(Time.seconds(sessionSize)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), stateDesc, new InternalIterableWindowFunction<>(new SessionWindowFunction()), EventTimeTrigger.create(), 6, null /* late data output tag */); OneInputStreamOperatorTestHarness, Tuple3> testHarness = createTestHarness(operator); ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>(); testHarness.open(); // add elements out-of-order testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 0)); testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 2), 1000)); testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 3), 2500)); testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 10)); testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), 1000)); testHarness.processWatermark(new Watermark(5500)); expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-6", 0L, 5500L), 5499)); expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-3", 10L, 4000L), 3999)); expectedOutput.add(new Watermark(5500)); // do a snapshot, close and restore again OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L); TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator()); testHarness.close(); testHarness = createTestHarness(operator); testHarness.setup(); testHarness.initializeState(snapshot); testHarness.open(); expectedOutput.clear(); //suppose the watermark alread arrived 1 testHarness.processWatermark(new Watermark(1)); //late element with timestamp 4500 had arrived,the new session window[0, 7500] is still a valid window becase of maxtimestamp < cleantime //and fired immediately testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 6), 4500)); expectedOutput.add(new Watermark(1)); expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-12", 0L, 7500L), 7499)); //when a new watermark had arrived,the same TimeWindow[0, 7500] will fired again becase of a new timer had rigstered by call triggerOnMerge testHarness.processWatermark(new Watermark(11000)); expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-12", 0L, 7500L), 7499)); expectedOutput.add(new Watermark(11000)); TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator()); testHarness.close(); } {code} was: sum with session window,.suppose the session gap is 3 seconds and allowedlateness is 60 seconds * w1,TimeWindow[1,9] had elements,1,2,3,6,will be fired if watermark reached 9 * if a late element (w2,TimeWindow[7,10]) had come but the watermark already at 11. * w1,w2 will be merged a new window w3 TimeWindow[1,10] and will be register a new timer by call triggerContext.onMerge(mergedWindows),w3 will be fired first time by call triggerContext.onElement(element) because of the watermark pass the w3. w3 will be fired second times because of the timer < current watermark. that mean w3 will be fired twice because of watermark pass the new merge window w3. Examples {code:java} @Test @SuppressWarnings("unchecked") public void testSessionWindowsFiredTwice() throws Exception { closeCalled.set(0); final int sessionSize = 3; TypeInformation> inputType = TypeInfoParser.parse("Tuple2"); ListStateDescriptor> stateDesc = new ListStateDescriptor<>("window-contents", inputType.createSerializer(new ExecutionConfig())); WindowOp
[jira] [Updated] (FLINK-9201) same merge window will be fired twice if watermark already passed the merge window
[ https://issues.apache.org/jira/browse/FLINK-9201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yuemeng updated FLINK-9201: --- Summary: same merge window will be fired twice if watermark already passed the merge window (was: same merge window will be fired twice if watermark already passed the new merged window) > same merge window will be fired twice if watermark already passed the merge > window > -- > > Key: FLINK-9201 > URL: https://issues.apache.org/jira/browse/FLINK-9201 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.3.3 >Reporter: yuemeng >Assignee: yuemeng >Priority: Blocker > > sum with session window,.suppose the session gap is 3 seconds and > allowedlateness is 60 seconds > * w1,TimeWindow[1,9] had elements,1,2,3,6,will be fired if watermark reached > 9 > * if a late element (w2,TimeWindow[7,10]) had come but the watermark > already at 11. > * w1,w2 will be merged a new window w3 TimeWindow[1,10] and will be register > a new timer by call triggerContext.onMerge(mergedWindows),w3 will be fired > first time by call triggerContext.onElement(element) because of the watermark > pass the w3. w3 will be fired second times because of the timer < current > watermark. > that mean w3 will be fired twice because of watermark pass the new merge > window w3. > Examples > {code:java} > @Test > @SuppressWarnings("unchecked") > public void testSessionWindowsFiredTwice() throws Exception { > closeCalled.set(0); > final int sessionSize = 3; > TypeInformation> inputType = > TypeInfoParser.parse("Tuple2"); > ListStateDescriptor> stateDesc = new > ListStateDescriptor<>("window-contents", > inputType.createSerializer(new ExecutionConfig())); > WindowOperator, Iterable Integer>>, Tuple3, TimeWindow> operator = new > WindowOperator<>( > EventTimeSessionWindows.withGap(Time.seconds(sessionSize)), > new TimeWindow.Serializer(), > new TupleKeySelector(), > BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), > stateDesc, > new InternalIterableWindowFunction<>(new SessionWindowFunction()), > EventTimeTrigger.create(), > 6, > null /* late data output tag */); > OneInputStreamOperatorTestHarness, Tuple3 Long, Long>> testHarness = > createTestHarness(operator); > ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>(); > testHarness.open(); > // add elements out-of-order > testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 0)); > testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 2), > 1000)); > testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 3), > 2500)); > testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 10)); > testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), > 1000)); > testHarness.processWatermark(new Watermark(5500)); > expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-6", 0L, 5500L), > 5499)); > expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-3", 10L, 4000L), > 3999)); > expectedOutput.add(new Watermark(5500)); > // do a snapshot, close and restore again > OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L); > TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", > expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator()); > testHarness.close(); > testHarness = createTestHarness(operator); > testHarness.setup(); > testHarness.initializeState(snapshot); > testHarness.open(); > expectedOutput.clear(); > //suppose the watermark alread arrived 1 > testHarness.processWatermark(new Watermark(1)); > //late element with timestamp 4500 had arrived,the new session window[0, > 7500] is still a valid window becase of maxtimestamp < cleantime > //and fired immediately > testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 6), > 4500)); > expectedOutput.add(new Watermark(1)); > expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-12", 0L, 7500L), > 7499)); > //when a new watermark had arrived,the same TimeWindow[0, 7500] will fired > again becase of a new timer had rigstered by call triggerOnMerge > testHarness.processWatermark(new Watermark(11000)); > expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-12", 0L, 7500L), > 7499)); > expectedOutput.add(new Watermark(11000)); > TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", > expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator()); > testHarness.close(); > } > {code} > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9201) same merge window will be fired twice if watermark already passed the new merged window
[ https://issues.apache.org/jira/browse/FLINK-9201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yuemeng updated FLINK-9201: --- Summary: same merge window will be fired twice if watermark already passed the new merged window (was: same merge window will be fired twice if watermark already pass the new merged window) > same merge window will be fired twice if watermark already passed the new > merged window > --- > > Key: FLINK-9201 > URL: https://issues.apache.org/jira/browse/FLINK-9201 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.3.3 >Reporter: yuemeng >Assignee: yuemeng >Priority: Blocker > > sum with session window,.suppose the session gap is 3 seconds and > allowedlateness is 60 seconds > * w1,TimeWindow[1,9] had elements,1,2,3,6,will be fired if watermark reached > 9 > * if a late element (w2,TimeWindow[7,10]) had come but the watermark > already at 11. > * w1,w2 will be merged a new window w3 TimeWindow[1,10] and will be register > a new timer by call triggerContext.onMerge(mergedWindows),w3 will be fired > first time by call triggerContext.onElement(element) because of the watermark > pass the w3. w3 will be fired second times because of the timer < current > watermark. > that mean w3 will be fired twice because of watermark pass the new merge > window w3. > Examples > {code:java} > @Test > @SuppressWarnings("unchecked") > public void testSessionWindowsFiredTwice() throws Exception { > closeCalled.set(0); > final int sessionSize = 3; > TypeInformation> inputType = > TypeInfoParser.parse("Tuple2"); > ListStateDescriptor> stateDesc = new > ListStateDescriptor<>("window-contents", > inputType.createSerializer(new ExecutionConfig())); > WindowOperator, Iterable Integer>>, Tuple3, TimeWindow> operator = new > WindowOperator<>( > EventTimeSessionWindows.withGap(Time.seconds(sessionSize)), > new TimeWindow.Serializer(), > new TupleKeySelector(), > BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), > stateDesc, > new InternalIterableWindowFunction<>(new SessionWindowFunction()), > EventTimeTrigger.create(), > 6, > null /* late data output tag */); > OneInputStreamOperatorTestHarness, Tuple3 Long, Long>> testHarness = > createTestHarness(operator); > ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>(); > testHarness.open(); > // add elements out-of-order > testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 0)); > testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 2), > 1000)); > testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 3), > 2500)); > testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 10)); > testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), > 1000)); > testHarness.processWatermark(new Watermark(5500)); > expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-6", 0L, 5500L), > 5499)); > expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-3", 10L, 4000L), > 3999)); > expectedOutput.add(new Watermark(5500)); > // do a snapshot, close and restore again > OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L); > TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", > expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator()); > testHarness.close(); > testHarness = createTestHarness(operator); > testHarness.setup(); > testHarness.initializeState(snapshot); > testHarness.open(); > expectedOutput.clear(); > //suppose the watermark alread arrived 1 > testHarness.processWatermark(new Watermark(1)); > //late element with timestamp 4500 had arrived,the new session window[0, > 7500] is still a valid window becase of maxtimestamp < cleantime > //and fired immediately > testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 6), > 4500)); > expectedOutput.add(new Watermark(1)); > expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-12", 0L, 7500L), > 7499)); > //when a new watermark had arrived,the same TimeWindow[0, 7500] will fired > again becase of a new timer had rigstered by call triggerOnMerge > testHarness.processWatermark(new Watermark(11000)); > expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-12", 0L, 7500L), > 7499)); > expectedOutput.add(new Watermark(11000)); > TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", > expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator()); > testHarness.close(); > } > {code} > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-9201) same merge window will be fired twice if watermark already passed the merge window
[ https://issues.apache.org/jira/browse/FLINK-9201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16453759#comment-16453759 ] yuemeng edited comment on FLINK-9201 at 4/26/18 9:40 AM: - hi [~till.rohrmann] can you help me to check this issue? thanks a lot was (Author: yuemeng): hi [~till.rohrmann] can you check this issue? > same merge window will be fired twice if watermark already passed the merge > window > -- > > Key: FLINK-9201 > URL: https://issues.apache.org/jira/browse/FLINK-9201 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.3.3 >Reporter: yuemeng >Assignee: yuemeng >Priority: Blocker > > sum with session window,.suppose the session gap is 3 seconds and > allowedlateness is 60 seconds > * w1,TimeWindow[1,9] had elements,1,2,3,6,will be fired if watermark reached > 9 > * if a late element (w2,TimeWindow[7,10]) had come but the watermark > already at 11. > * w1,w2 will be merged a new window w3 TimeWindow[1,10] and will be register > a new timer by call triggerContext.onMerge(mergedWindows),w3 will be fired > first time by call triggerContext.onElement(element) because of the watermark > pass the w3. w3 will be fired second times because of the timer < current > watermark. > that mean w3 will be fired twice because of watermark pass the new merge > window w3. > Examples > {code:java} > @Test > @SuppressWarnings("unchecked") > public void testSessionWindowsFiredTwice() throws Exception { > closeCalled.set(0); > final int sessionSize = 3; > TypeInformation> inputType = > TypeInfoParser.parse("Tuple2"); > ListStateDescriptor> stateDesc = new > ListStateDescriptor<>("window-contents", > inputType.createSerializer(new ExecutionConfig())); > WindowOperator, Iterable Integer>>, Tuple3, TimeWindow> operator = new > WindowOperator<>( > EventTimeSessionWindows.withGap(Time.seconds(sessionSize)), > new TimeWindow.Serializer(), > new TupleKeySelector(), > BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), > stateDesc, > new InternalIterableWindowFunction<>(new SessionWindowFunction()), > EventTimeTrigger.create(), > 6, > null /* late data output tag */); > OneInputStreamOperatorTestHarness, Tuple3 Long, Long>> testHarness = > createTestHarness(operator); > ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>(); > testHarness.open(); > // add elements out-of-order > testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 0)); > testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 2), > 1000)); > testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 3), > 2500)); > testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 10)); > testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), > 1000)); > testHarness.processWatermark(new Watermark(5500)); > expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-6", 0L, 5500L), > 5499)); > expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-3", 10L, 4000L), > 3999)); > expectedOutput.add(new Watermark(5500)); > // do a snapshot, close and restore again > OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L); > TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", > expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator()); > testHarness.close(); > testHarness = createTestHarness(operator); > testHarness.setup(); > testHarness.initializeState(snapshot); > testHarness.open(); > expectedOutput.clear(); > //suppose the watermark alread arrived 1 > testHarness.processWatermark(new Watermark(1)); > //late element with timestamp 4500 had arrived,the new session window[0, > 7500] is still a valid window becase of maxtimestamp < cleantime > //and fired immediately > testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 6), > 4500)); > expectedOutput.add(new Watermark(1)); > expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-12", 0L, 7500L), > 7499)); > //when a new watermark had arrived,the same TimeWindow[0, 7500] will fired > again becase of a new timer had rigstered by call triggerOnMerge > testHarness.processWatermark(new Watermark(11000)); > expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-12", 0L, 7500L), > 7499)); > expectedOutput.add(new Watermark(11000)); > TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", > expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator()); > testHarness.close(); > } > {code} > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9201) same merge window will be fired twice if watermark already passed the merge window
[ https://issues.apache.org/jira/browse/FLINK-9201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16453759#comment-16453759 ] yuemeng commented on FLINK-9201: hi [~till.rohrmann] can you check this issue? > same merge window will be fired twice if watermark already passed the merge > window > -- > > Key: FLINK-9201 > URL: https://issues.apache.org/jira/browse/FLINK-9201 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.3.3 >Reporter: yuemeng >Assignee: yuemeng >Priority: Blocker > > sum with session window,.suppose the session gap is 3 seconds and > allowedlateness is 60 seconds > * w1,TimeWindow[1,9] had elements,1,2,3,6,will be fired if watermark reached > 9 > * if a late element (w2,TimeWindow[7,10]) had come but the watermark > already at 11. > * w1,w2 will be merged a new window w3 TimeWindow[1,10] and will be register > a new timer by call triggerContext.onMerge(mergedWindows),w3 will be fired > first time by call triggerContext.onElement(element) because of the watermark > pass the w3. w3 will be fired second times because of the timer < current > watermark. > that mean w3 will be fired twice because of watermark pass the new merge > window w3. > Examples > {code:java} > @Test > @SuppressWarnings("unchecked") > public void testSessionWindowsFiredTwice() throws Exception { > closeCalled.set(0); > final int sessionSize = 3; > TypeInformation> inputType = > TypeInfoParser.parse("Tuple2"); > ListStateDescriptor> stateDesc = new > ListStateDescriptor<>("window-contents", > inputType.createSerializer(new ExecutionConfig())); > WindowOperator, Iterable Integer>>, Tuple3, TimeWindow> operator = new > WindowOperator<>( > EventTimeSessionWindows.withGap(Time.seconds(sessionSize)), > new TimeWindow.Serializer(), > new TupleKeySelector(), > BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), > stateDesc, > new InternalIterableWindowFunction<>(new SessionWindowFunction()), > EventTimeTrigger.create(), > 6, > null /* late data output tag */); > OneInputStreamOperatorTestHarness, Tuple3 Long, Long>> testHarness = > createTestHarness(operator); > ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>(); > testHarness.open(); > // add elements out-of-order > testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 0)); > testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 2), > 1000)); > testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 3), > 2500)); > testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 10)); > testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), > 1000)); > testHarness.processWatermark(new Watermark(5500)); > expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-6", 0L, 5500L), > 5499)); > expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-3", 10L, 4000L), > 3999)); > expectedOutput.add(new Watermark(5500)); > // do a snapshot, close and restore again > OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L); > TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", > expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator()); > testHarness.close(); > testHarness = createTestHarness(operator); > testHarness.setup(); > testHarness.initializeState(snapshot); > testHarness.open(); > expectedOutput.clear(); > //suppose the watermark alread arrived 1 > testHarness.processWatermark(new Watermark(1)); > //late element with timestamp 4500 had arrived,the new session window[0, > 7500] is still a valid window becase of maxtimestamp < cleantime > //and fired immediately > testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 6), > 4500)); > expectedOutput.add(new Watermark(1)); > expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-12", 0L, 7500L), > 7499)); > //when a new watermark had arrived,the same TimeWindow[0, 7500] will fired > again becase of a new timer had rigstered by call triggerOnMerge > testHarness.processWatermark(new Watermark(11000)); > expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-12", 0L, 7500L), > 7499)); > expectedOutput.add(new Watermark(11000)); > TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", > expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator()); > testHarness.close(); > } > {code} > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9201) same merge window will be fired twice if watermark already passed the merge window
[ https://issues.apache.org/jira/browse/FLINK-9201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16460606#comment-16460606 ] yuemeng commented on FLINK-9201: [~jark],[~StephanEwen] can you check this issue? thanks > same merge window will be fired twice if watermark already passed the merge > window > -- > > Key: FLINK-9201 > URL: https://issues.apache.org/jira/browse/FLINK-9201 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.3.3 >Reporter: yuemeng >Assignee: yuemeng >Priority: Blocker > > sum with session window,.suppose the session gap is 3 seconds and > allowedlateness is 60 seconds > * w1,TimeWindow[1,9] had elements,1,2,3,6,will be fired if watermark reached > 9 > * if a late element (w2,TimeWindow[7,10]) had come but the watermark > already at 11. > * w1,w2 will be merged a new window w3 TimeWindow[1,10] and will be register > a new timer by call triggerContext.onMerge(mergedWindows),w3 will be fired > first time by call triggerContext.onElement(element) because of the watermark > pass the w3. w3 will be fired second times because of the timer < current > watermark. > that mean w3 will be fired twice because of watermark pass the new merge > window w3. > Examples > {code:java} > @Test > @SuppressWarnings("unchecked") > public void testSessionWindowsFiredTwice() throws Exception { > closeCalled.set(0); > final int sessionSize = 3; > TypeInformation> inputType = > TypeInfoParser.parse("Tuple2"); > ListStateDescriptor> stateDesc = new > ListStateDescriptor<>("window-contents", > inputType.createSerializer(new ExecutionConfig())); > WindowOperator, Iterable Integer>>, Tuple3, TimeWindow> operator = new > WindowOperator<>( > EventTimeSessionWindows.withGap(Time.seconds(sessionSize)), > new TimeWindow.Serializer(), > new TupleKeySelector(), > BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), > stateDesc, > new InternalIterableWindowFunction<>(new SessionWindowFunction()), > EventTimeTrigger.create(), > 6, > null /* late data output tag */); > OneInputStreamOperatorTestHarness, Tuple3 Long, Long>> testHarness = > createTestHarness(operator); > ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>(); > testHarness.open(); > // add elements out-of-order > testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 0)); > testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 2), > 1000)); > testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 3), > 2500)); > testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 10)); > testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), > 1000)); > testHarness.processWatermark(new Watermark(5500)); > expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-6", 0L, 5500L), > 5499)); > expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-3", 10L, 4000L), > 3999)); > expectedOutput.add(new Watermark(5500)); > // do a snapshot, close and restore again > OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L); > TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", > expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator()); > testHarness.close(); > testHarness = createTestHarness(operator); > testHarness.setup(); > testHarness.initializeState(snapshot); > testHarness.open(); > expectedOutput.clear(); > //suppose the watermark alread arrived 1 > testHarness.processWatermark(new Watermark(1)); > //late element with timestamp 4500 had arrived,the new session window[0, > 7500] is still a valid window becase of maxtimestamp < cleantime > //and fired immediately > testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 6), > 4500)); > expectedOutput.add(new Watermark(1)); > expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-12", 0L, 7500L), > 7499)); > //when a new watermark had arrived,the same TimeWindow[0, 7500] will fired > again becase of a new timer had rigstered by call triggerOnMerge > testHarness.processWatermark(new Watermark(11000)); > expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-12", 0L, 7500L), > 7499)); > expectedOutput.add(new Watermark(11000)); > TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", > expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator()); > testHarness.close(); > } > {code} > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-7146) FLINK SQLs support DDL
[ https://issues.apache.org/jira/browse/FLINK-7146?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yuemeng closed FLINK-7146. -- Resolution: Duplicate > FLINK SQLs support DDL > -- > > Key: FLINK-7146 > URL: https://issues.apache.org/jira/browse/FLINK-7146 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: yuemeng >Priority: Major > > For now,Flink SQL can't support DDL, we can only register a table by call > registerTableInternal in TableEnvironment > we should support DDL for sql such as create a table or create function like: > {code} > CREATE TABLE kafka_source ( > id INT, > price INT > ) PROPERTIES ( > category = 'source', > type = 'kafka', > version = '0.9.0.1', > separator = ',', > topic = 'test', > brokers = 'xx:9092', > group_id = 'test' > ); > CREATE TABLE db_sink ( > id INT, > price DOUBLE > ) PROPERTIES ( > category = 'sink', > type = 'mysql', > table_name = 'udaf_test', > url = > 'jdbc:mysql://127.0.0.1:3308/ds?useUnicode=true&characterEncoding=UTF8', > username = 'ds_dev', > password = 's]k51_(>R' > ); > CREATE TEMPORARY function 'AVGUDAF' AS > 'com.x.server.codegen.aggregate.udaf.avg.IntegerAvgUDAF'; > INSERT INTO db_sink SELECT id ,AVGUDAF(price) FROM kafka_source group by id > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9201) same merge window will be fired twice if watermark already passed the merge window
[ https://issues.apache.org/jira/browse/FLINK-9201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16468219#comment-16468219 ] yuemeng commented on FLINK-9201: [~aljoscha] thanks for your suggest i will move the test to EventTimeTriggerTest > same merge window will be fired twice if watermark already passed the merge > window > -- > > Key: FLINK-9201 > URL: https://issues.apache.org/jira/browse/FLINK-9201 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.3.3 >Reporter: yuemeng >Assignee: yuemeng >Priority: Blocker > Fix For: 1.6.0 > > > sum with session window,.suppose the session gap is 3 seconds and > allowedlateness is 60 seconds > * w1,TimeWindow[1,9] had elements,1,2,3,6,will be fired if watermark reached > 9 > * if a late element (w2,TimeWindow[7,10]) had come but the watermark > already at 11. > * w1,w2 will be merged a new window w3 TimeWindow[1,10] and will be register > a new timer by call triggerContext.onMerge(mergedWindows),w3 will be fired > first time by call triggerContext.onElement(element) because of the watermark > pass the w3. w3 will be fired second times because of the timer < current > watermark. > that mean w3 will be fired twice because of watermark pass the new merge > window w3. > Examples > {code:java} > @Test > @SuppressWarnings("unchecked") > public void testSessionWindowsFiredTwice() throws Exception { > closeCalled.set(0); > final int sessionSize = 3; > TypeInformation> inputType = > TypeInfoParser.parse("Tuple2"); > ListStateDescriptor> stateDesc = new > ListStateDescriptor<>("window-contents", > inputType.createSerializer(new ExecutionConfig())); > WindowOperator, Iterable Integer>>, Tuple3, TimeWindow> operator = new > WindowOperator<>( > EventTimeSessionWindows.withGap(Time.seconds(sessionSize)), > new TimeWindow.Serializer(), > new TupleKeySelector(), > BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), > stateDesc, > new InternalIterableWindowFunction<>(new SessionWindowFunction()), > EventTimeTrigger.create(), > 6, > null /* late data output tag */); > OneInputStreamOperatorTestHarness, Tuple3 Long, Long>> testHarness = > createTestHarness(operator); > ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>(); > testHarness.open(); > // add elements out-of-order > testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 0)); > testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 2), > 1000)); > testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 3), > 2500)); > testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 10)); > testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), > 1000)); > testHarness.processWatermark(new Watermark(5500)); > expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-6", 0L, 5500L), > 5499)); > expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-3", 10L, 4000L), > 3999)); > expectedOutput.add(new Watermark(5500)); > // do a snapshot, close and restore again > OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L); > TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", > expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator()); > testHarness.close(); > testHarness = createTestHarness(operator); > testHarness.setup(); > testHarness.initializeState(snapshot); > testHarness.open(); > expectedOutput.clear(); > //suppose the watermark alread arrived 1 > testHarness.processWatermark(new Watermark(1)); > //late element with timestamp 4500 had arrived,the new session window[0, > 7500] is still a valid window becase of maxtimestamp < cleantime > //and fired immediately > testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 6), > 4500)); > expectedOutput.add(new Watermark(1)); > expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-12", 0L, 7500L), > 7499)); > //when a new watermark had arrived,the same TimeWindow[0, 7500] will fired > again becase of a new timer had rigstered by call triggerOnMerge > testHarness.processWatermark(new Watermark(11000)); > expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-12", 0L, 7500L), > 7499)); > expectedOutput.add(new Watermark(11000)); > TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", > expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator()); > testHarness.close(); > } > {code} > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-9201) same merge window will be fired twice if watermark already passed the merge window
[ https://issues.apache.org/jira/browse/FLINK-9201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16468219#comment-16468219 ] yuemeng edited comment on FLINK-9201 at 5/9/18 2:12 AM: [~aljoscha] thanks for your suggest i will add a new test case to EventTimeTriggerTest,but still keep the old test in WindowOpertatorTest because of olny merge window will occur the probelm now was (Author: yuemeng): [~aljoscha] thanks for your suggest i will move the test to EventTimeTriggerTest > same merge window will be fired twice if watermark already passed the merge > window > -- > > Key: FLINK-9201 > URL: https://issues.apache.org/jira/browse/FLINK-9201 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.3.3 >Reporter: yuemeng >Assignee: yuemeng >Priority: Blocker > Fix For: 1.6.0 > > > sum with session window,.suppose the session gap is 3 seconds and > allowedlateness is 60 seconds > * w1,TimeWindow[1,9] had elements,1,2,3,6,will be fired if watermark reached > 9 > * if a late element (w2,TimeWindow[7,10]) had come but the watermark > already at 11. > * w1,w2 will be merged a new window w3 TimeWindow[1,10] and will be register > a new timer by call triggerContext.onMerge(mergedWindows),w3 will be fired > first time by call triggerContext.onElement(element) because of the watermark > pass the w3. w3 will be fired second times because of the timer < current > watermark. > that mean w3 will be fired twice because of watermark pass the new merge > window w3. > Examples > {code:java} > @Test > @SuppressWarnings("unchecked") > public void testSessionWindowsFiredTwice() throws Exception { > closeCalled.set(0); > final int sessionSize = 3; > TypeInformation> inputType = > TypeInfoParser.parse("Tuple2"); > ListStateDescriptor> stateDesc = new > ListStateDescriptor<>("window-contents", > inputType.createSerializer(new ExecutionConfig())); > WindowOperator, Iterable Integer>>, Tuple3, TimeWindow> operator = new > WindowOperator<>( > EventTimeSessionWindows.withGap(Time.seconds(sessionSize)), > new TimeWindow.Serializer(), > new TupleKeySelector(), > BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), > stateDesc, > new InternalIterableWindowFunction<>(new SessionWindowFunction()), > EventTimeTrigger.create(), > 6, > null /* late data output tag */); > OneInputStreamOperatorTestHarness, Tuple3 Long, Long>> testHarness = > createTestHarness(operator); > ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>(); > testHarness.open(); > // add elements out-of-order > testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 0)); > testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 2), > 1000)); > testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 3), > 2500)); > testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 10)); > testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), > 1000)); > testHarness.processWatermark(new Watermark(5500)); > expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-6", 0L, 5500L), > 5499)); > expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-3", 10L, 4000L), > 3999)); > expectedOutput.add(new Watermark(5500)); > // do a snapshot, close and restore again > OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L); > TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", > expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator()); > testHarness.close(); > testHarness = createTestHarness(operator); > testHarness.setup(); > testHarness.initializeState(snapshot); > testHarness.open(); > expectedOutput.clear(); > //suppose the watermark alread arrived 1 > testHarness.processWatermark(new Watermark(1)); > //late element with timestamp 4500 had arrived,the new session window[0, > 7500] is still a valid window becase of maxtimestamp < cleantime > //and fired immediately > testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 6), > 4500)); > expectedOutput.add(new Watermark(1)); > expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-12", 0L, 7500L), > 7499)); > //when a new watermark had arrived,the same TimeWindow[0, 7500] will fired > again becase of a new timer had rigstered by call triggerOnMerge > testHarness.processWatermark(new Watermark(11000)); > expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-12", 0L, 7500L), > 7499)); > expectedOutput.add(new Watermark(11000)); > TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", > expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator()); >
[jira] [Created] (FLINK-9329) hasRowtimeAttribute will throw NPE if user use setProctimeAttribute for table source
yuemeng created FLINK-9329: -- Summary: hasRowtimeAttribute will throw NPE if user use setProctimeAttribute for table source Key: FLINK-9329 URL: https://issues.apache.org/jira/browse/FLINK-9329 Project: Flink Issue Type: Bug Components: Table API & SQL Reporter: yuemeng Assignee: yuemeng {{{code}}} {{KafkaTableSource source = Kafka010JsonTableSource.builder() // ... .withSchema(TableSchema.builder() .field("sensorId", Types.LONG()) .field("temp", Types.DOUBLE()) // field "ptime" is of type SQL_TIMESTAMP .field("ptime", Types.SQL_TIMESTAMP()).build()) // declare "ptime" as processing time attribute .withProctimeAttribute("ptime") .build();}} tableEnv.registerTableSource("flights", kafkaTableSource); {{{code}}} {{ }} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9329) hasRowtimeAttribute will throw NPE if user use setProctimeAttribute for table source
[ https://issues.apache.org/jira/browse/FLINK-9329?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yuemeng updated FLINK-9329: --- Description: {code:java} KafkaTableSource source = Kafka010JsonTableSource.builder() .withSchema(TableSchema.builder() .field("sensorId", Types.LONG()) .field("temp", Types.DOUBLE()) .field("ptime", Types.SQL_TIMESTAMP()).build()) .withProctimeAttribute("ptime") .build(); tableEnv.registerTableSource("flights", source ); {code} {{ }} {code:java} Kafka010JsonTableSource implement the DefinedRowtimeAttributes when TableSourceUtil.hasRowtimeAttribute(soource) will call /** Returns a list with all rowtime attribute names of the [[TableSource]]. */ private def getRowtimeAttributes(tableSource: TableSource[_]): Array[String] = { tableSource match { case r: DefinedRowtimeAttributes => r.getRowtimeAttributeDescriptors.asScala.map(_.getAttributeName).toArray case _ => Array() } } r.getRowtimeAttributeDescriptors will throw NPE because of we use ProctimeAttribute here {code} was: {{{code}}} {{KafkaTableSource source = Kafka010JsonTableSource.builder() // ... .withSchema(TableSchema.builder() .field("sensorId", Types.LONG()) .field("temp", Types.DOUBLE()) // field "ptime" is of type SQL_TIMESTAMP .field("ptime", Types.SQL_TIMESTAMP()).build()) // declare "ptime" as processing time attribute .withProctimeAttribute("ptime") .build();}} tableEnv.registerTableSource("flights", kafkaTableSource); {{{code}}} {{ }} > hasRowtimeAttribute will throw NPE if user use setProctimeAttribute for table > source > > > Key: FLINK-9329 > URL: https://issues.apache.org/jira/browse/FLINK-9329 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: yuemeng >Assignee: yuemeng >Priority: Critical > > {code:java} > KafkaTableSource source = Kafka010JsonTableSource.builder() > .withSchema(TableSchema.builder() > .field("sensorId", Types.LONG()) > .field("temp", Types.DOUBLE()) > .field("ptime", Types.SQL_TIMESTAMP()).build()) > .withProctimeAttribute("ptime") > .build(); tableEnv.registerTableSource("flights", source ); {code} > {{ }} > {code:java} > Kafka010JsonTableSource implement the DefinedRowtimeAttributes when > TableSourceUtil.hasRowtimeAttribute(soource) will call > /** Returns a list with all rowtime attribute names of the [[TableSource]]. */ > private def getRowtimeAttributes(tableSource: TableSource[_]): Array[String] > = { > tableSource match { > case r: DefinedRowtimeAttributes => > r.getRowtimeAttributeDescriptors.asScala.map(_.getAttributeName).toArray > case _ => > Array() > } > } > r.getRowtimeAttributeDescriptors will throw NPE because of we use > ProctimeAttribute here > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9329) hasRowtimeAttribute will throw NPE if user use setProctimeAttribute for table source
[ https://issues.apache.org/jira/browse/FLINK-9329?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yuemeng updated FLINK-9329: --- Description: {code:java} Examples: KafkaTableSource source = Kafka010JsonTableSource.builder() .withSchema(TableSchema.builder() .field("sensorId", Types.LONG()) .field("temp", Types.DOUBLE()) .field("ptime", Types.SQL_TIMESTAMP()).build()) .withProctimeAttribute("ptime") .build(); tableEnv.registerTableSource("flights", source ); {code} {{ }} {code:java} Kafka010JsonTableSource implement the DefinedRowtimeAttributes . so when TableSourceUtil call hasRowtimeAttribute(source)to check ,it will call follow code /** Returns a list with all rowtime attribute names of the [[TableSource]]. */ private def getRowtimeAttributes(tableSource: TableSource[_]): Array[String] = { tableSource match { case r: DefinedRowtimeAttributes => r.getRowtimeAttributeDescriptors.asScala.map(_.getAttributeName).toArray case _ => Array() } } r.getRowtimeAttributeDescriptors will throw NPE because of we use ProctimeAttribute here {code} was: {code:java} KafkaTableSource source = Kafka010JsonTableSource.builder() .withSchema(TableSchema.builder() .field("sensorId", Types.LONG()) .field("temp", Types.DOUBLE()) .field("ptime", Types.SQL_TIMESTAMP()).build()) .withProctimeAttribute("ptime") .build(); tableEnv.registerTableSource("flights", source ); {code} {{ }} {code:java} Kafka010JsonTableSource implement the DefinedRowtimeAttributes when TableSourceUtil.hasRowtimeAttribute(soource) will call /** Returns a list with all rowtime attribute names of the [[TableSource]]. */ private def getRowtimeAttributes(tableSource: TableSource[_]): Array[String] = { tableSource match { case r: DefinedRowtimeAttributes => r.getRowtimeAttributeDescriptors.asScala.map(_.getAttributeName).toArray case _ => Array() } } r.getRowtimeAttributeDescriptors will throw NPE because of we use ProctimeAttribute here {code} > hasRowtimeAttribute will throw NPE if user use setProctimeAttribute for table > source > > > Key: FLINK-9329 > URL: https://issues.apache.org/jira/browse/FLINK-9329 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: yuemeng >Assignee: yuemeng >Priority: Critical > > {code:java} > Examples: > KafkaTableSource source = Kafka010JsonTableSource.builder() > .withSchema(TableSchema.builder() > .field("sensorId", Types.LONG()) > .field("temp", Types.DOUBLE()) > .field("ptime", Types.SQL_TIMESTAMP()).build()) > .withProctimeAttribute("ptime") > .build(); tableEnv.registerTableSource("flights", source ); {code} > {{ }} > {code:java} > Kafka010JsonTableSource implement the DefinedRowtimeAttributes . > so when TableSourceUtil call hasRowtimeAttribute(source)to check ,it will > call follow code > /** Returns a list with all rowtime attribute names of the [[TableSource]]. */ > private def getRowtimeAttributes(tableSource: TableSource[_]): Array[String] > = { > tableSource match { > case r: DefinedRowtimeAttributes => > r.getRowtimeAttributeDescriptors.asScala.map(_.getAttributeName).toArray > case _ => > Array() > } > } > r.getRowtimeAttributeDescriptors will throw NPE because of we use > ProctimeAttribute here > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-14666) support multiple catalog in flink table sql
yuemeng created FLINK-14666: --- Summary: support multiple catalog in flink table sql Key: FLINK-14666 URL: https://issues.apache.org/jira/browse/FLINK-14666 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.9.1, 1.9.0, 1.8.2, 1.8.0 Reporter: yuemeng currently, calcite will only use the current catalog as schema path to validate sql node, maybe this is not reasonable {code} tableEnvironment.useCatalog("user_catalog"); tableEnvironment.useDatabase("user_db"); Table table = tableEnvironment.sqlQuery("SELECT action, os,count(*) as cnt from music_queue_3 group by action, os,tumble(proctime, INTERVAL '10' SECOND)"); tableEnvironment.registerTable("v1", table); Table t2 = tableEnvironment.sqlQuery("select action, os, 1 as cnt from v1"); tableEnvironment.registerTable("v2", t2); tableEnvironment.sqlUpdate("INSERT INTO database2.kafka_table_test1 SELECT action, os,cast (cnt as BIGINT) as cnt from v2"); {code} suppose source table music_queue_3 and sink table kafka_table_test1 both in user_catalog catalog but some temp table or view such as v1, v2,v3 will register in default catalog. when we select temp table v2 and insert it into our own catalog table database2.kafka_table_test1 it always failed with sql node validate, because of schema path in catalog reader is the current catalog without default catalog,the temp table or view will never be Identified -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14666) support multiple catalog in flink table sql
[ https://issues.apache.org/jira/browse/FLINK-14666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16969842#comment-16969842 ] yuemeng commented on FLINK-14666: - [~ykt836][~jark] can you check this issue, thanks > support multiple catalog in flink table sql > --- > > Key: FLINK-14666 > URL: https://issues.apache.org/jira/browse/FLINK-14666 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.8.0, 1.8.2, 1.9.0, 1.9.1 >Reporter: yuemeng >Priority: Critical > Labels: pull-request-available > Time Spent: 20m > Remaining Estimate: 0h > > currently, calcite will only use the current catalog as schema path to > validate sql node, > maybe this is not reasonable > {code} > tableEnvironment.useCatalog("user_catalog"); > tableEnvironment.useDatabase("user_db"); > Table table = tableEnvironment.sqlQuery("SELECT action, os,count(*) as cnt > from music_queue_3 group by action, os,tumble(proctime, INTERVAL '10' > SECOND)"); tableEnvironment.registerTable("v1", table); > Table t2 = tableEnvironment.sqlQuery("select action, os, 1 as cnt from v1"); > tableEnvironment.registerTable("v2", t2); > tableEnvironment.sqlUpdate("INSERT INTO database2.kafka_table_test1 SELECT > action, os,cast (cnt as BIGINT) as cnt from v2"); > {code} > suppose source table music_queue_3 and sink table kafka_table_test1 both in > user_catalog > catalog > but some temp table or view such as v1, v2,v3 will register in default > catalog. > when we select temp table v2 and insert it into our own catalog table > database2.kafka_table_test1 > it always failed with sql node validate, because of schema path in > catalog reader is the current catalog without default catalog,the temp table > or view will never be Identified -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-14666) support multiple catalog in flink table sql
[ https://issues.apache.org/jira/browse/FLINK-14666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16969842#comment-16969842 ] yuemeng edited comment on FLINK-14666 at 11/8/19 5:27 AM: -- [~ykt836][~jark] can you check this issue, if some of you agree this is an issue can you assign it to me, I had already fixed it on the latest version was (Author: yuemeng): [~ykt836][~jark] can you check this issue, thanks > support multiple catalog in flink table sql > --- > > Key: FLINK-14666 > URL: https://issues.apache.org/jira/browse/FLINK-14666 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.8.0, 1.8.2, 1.9.0, 1.9.1 >Reporter: yuemeng >Priority: Critical > Labels: pull-request-available > Time Spent: 20m > Remaining Estimate: 0h > > currently, calcite will only use the current catalog as schema path to > validate sql node, > maybe this is not reasonable > {code} > tableEnvironment.useCatalog("user_catalog"); > tableEnvironment.useDatabase("user_db"); > Table table = tableEnvironment.sqlQuery("SELECT action, os,count(*) as cnt > from music_queue_3 group by action, os,tumble(proctime, INTERVAL '10' > SECOND)"); tableEnvironment.registerTable("v1", table); > Table t2 = tableEnvironment.sqlQuery("select action, os, 1 as cnt from v1"); > tableEnvironment.registerTable("v2", t2); > tableEnvironment.sqlUpdate("INSERT INTO database2.kafka_table_test1 SELECT > action, os,cast (cnt as BIGINT) as cnt from v2"); > {code} > suppose source table music_queue_3 and sink table kafka_table_test1 both in > user_catalog > catalog > but some temp table or view such as v1, v2,v3 will register in default > catalog. > when we select temp table v2 and insert it into our own catalog table > database2.kafka_table_test1 > it always failed with sql node validate, because of schema path in > catalog reader is the current catalog without default catalog,the temp table > or view will never be Identified -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14666) support multiple catalog in flink table sql
[ https://issues.apache.org/jira/browse/FLINK-14666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16969951#comment-16969951 ] yuemeng commented on FLINK-14666: - [~ykt836] but when you select some field from temporary table to our own catalog table because of these two tables not in the same catalog, so sql node validate can't be passed > support multiple catalog in flink table sql > --- > > Key: FLINK-14666 > URL: https://issues.apache.org/jira/browse/FLINK-14666 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.8.0, 1.8.2, 1.9.0, 1.9.1 >Reporter: yuemeng >Priority: Critical > Labels: pull-request-available > Time Spent: 20m > Remaining Estimate: 0h > > currently, calcite will only use the current catalog as schema path to > validate sql node, > maybe this is not reasonable > {code} > tableEnvironment.useCatalog("user_catalog"); > tableEnvironment.useDatabase("user_db"); > Table table = tableEnvironment.sqlQuery("SELECT action, os,count(*) as cnt > from music_queue_3 group by action, os,tumble(proctime, INTERVAL '10' > SECOND)"); tableEnvironment.registerTable("v1", table); > Table t2 = tableEnvironment.sqlQuery("select action, os, 1 as cnt from v1"); > tableEnvironment.registerTable("v2", t2); > tableEnvironment.sqlUpdate("INSERT INTO database2.kafka_table_test1 SELECT > action, os,cast (cnt as BIGINT) as cnt from v2"); > {code} > suppose source table music_queue_3 and sink table kafka_table_test1 both in > user_catalog > catalog > but some temp table or view such as v1, v2,v3 will register in default > catalog. > when we select temp table v2 and insert it into our own catalog table > database2.kafka_table_test1 > it always failed with sql node validate, because of schema path in > catalog reader is the current catalog without default catalog,the temp table > or view will never be Identified -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14666) support multiple catalog in flink table sql
[ https://issues.apache.org/jira/browse/FLINK-14666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16970052#comment-16970052 ] yuemeng commented on FLINK-14666: - [~dwysakowicz] thanks for your reply The fully qualified name will work well but for flink 1.9.0,temporary tables will also need fully qualified such as {code} tableEnvironment.registerTable("v1", table); Table t2 = tableEnvironment.sqlQuery("select action, os, 1 as cnt from default_catalog.default_database.v1"); {code} because of buildin catalog named default_catalog.and all temporary view register to it. there may be not a problem after FLIP-64 > support multiple catalog in flink table sql > --- > > Key: FLINK-14666 > URL: https://issues.apache.org/jira/browse/FLINK-14666 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.8.0, 1.8.2, 1.9.0, 1.9.1 >Reporter: yuemeng >Priority: Critical > Labels: pull-request-available > Time Spent: 20m > Remaining Estimate: 0h > > currently, calcite will only use the current catalog as schema path to > validate sql node, > maybe this is not reasonable > {code} > tableEnvironment.useCatalog("user_catalog"); > tableEnvironment.useDatabase("user_db"); > Table table = tableEnvironment.sqlQuery("SELECT action, os,count(*) as cnt > from music_queue_3 group by action, os,tumble(proctime, INTERVAL '10' > SECOND)"); tableEnvironment.registerTable("v1", table); > Table t2 = tableEnvironment.sqlQuery("select action, os, 1 as cnt from v1"); > tableEnvironment.registerTable("v2", t2); > tableEnvironment.sqlUpdate("INSERT INTO database2.kafka_table_test1 SELECT > action, os,cast (cnt as BIGINT) as cnt from v2"); > {code} > suppose source table music_queue_3 and sink table kafka_table_test1 both in > user_catalog > catalog > but some temp table or view such as v1, v2,v3 will register in default > catalog. > when we select temp table v2 and insert it into our own catalog table > database2.kafka_table_test1 > it always failed with sql node validate, because of schema path in > catalog reader is the current catalog without default catalog,the temp table > or view will never be Identified -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-14666) support multiple catalog in flink table sql
[ https://issues.apache.org/jira/browse/FLINK-14666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16970052#comment-16970052 ] yuemeng edited comment on FLINK-14666 at 11/8/19 10:40 AM: --- [~dwysakowicz] thanks for your reply The fully qualified name will work well but for flink 1.9.0,temporary tables will also need fully qualified such as {code} tableEnvironment.registerTable("v1", table); Table t2 = tableEnvironment.sqlQuery("select action, os, 1 as cnt from default_catalog.default_database.v1"); {code} because of buildin catalog named default_catalog and all temporary view register to it. there may be not a problem after FLIP-64 was (Author: yuemeng): [~dwysakowicz] thanks for your reply The fully qualified name will work well but for flink 1.9.0,temporary tables will also need fully qualified such as {code} tableEnvironment.registerTable("v1", table); Table t2 = tableEnvironment.sqlQuery("select action, os, 1 as cnt from default_catalog.default_database.v1"); {code} because of buildin catalog named default_catalog.and all temporary view register to it. there may be not a problem after FLIP-64 > support multiple catalog in flink table sql > --- > > Key: FLINK-14666 > URL: https://issues.apache.org/jira/browse/FLINK-14666 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.8.0, 1.8.2, 1.9.0, 1.9.1 >Reporter: yuemeng >Priority: Critical > Labels: pull-request-available > Time Spent: 20m > Remaining Estimate: 0h > > currently, calcite will only use the current catalog as schema path to > validate sql node, > maybe this is not reasonable > {code} > tableEnvironment.useCatalog("user_catalog"); > tableEnvironment.useDatabase("user_db"); > Table table = tableEnvironment.sqlQuery("SELECT action, os,count(*) as cnt > from music_queue_3 group by action, os,tumble(proctime, INTERVAL '10' > SECOND)"); tableEnvironment.registerTable("v1", table); > Table t2 = tableEnvironment.sqlQuery("select action, os, 1 as cnt from v1"); > tableEnvironment.registerTable("v2", t2); > tableEnvironment.sqlUpdate("INSERT INTO database2.kafka_table_test1 SELECT > action, os,cast (cnt as BIGINT) as cnt from v2"); > {code} > suppose source table music_queue_3 and sink table kafka_table_test1 both in > user_catalog > catalog > but some temp table or view such as v1, v2,v3 will register in default > catalog. > when we select temp table v2 and insert it into our own catalog table > database2.kafka_table_test1 > it always failed with sql node validate, because of schema path in > catalog reader is the current catalog without default catalog,the temp table > or view will never be Identified -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14666) support multiple catalog in flink table sql
[ https://issues.apache.org/jira/browse/FLINK-14666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16971278#comment-16971278 ] yuemeng commented on FLINK-14666: - [~dwysakowicz] I will close the issue because of your new feature in the latest master > support multiple catalog in flink table sql > --- > > Key: FLINK-14666 > URL: https://issues.apache.org/jira/browse/FLINK-14666 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.8.0, 1.8.2, 1.9.0, 1.9.1 >Reporter: yuemeng >Priority: Critical > Labels: pull-request-available > Time Spent: 20m > Remaining Estimate: 0h > > currently, calcite will only use the current catalog as schema path to > validate sql node, > maybe this is not reasonable > {code} > tableEnvironment.useCatalog("user_catalog"); > tableEnvironment.useDatabase("user_db"); > Table table = tableEnvironment.sqlQuery("SELECT action, os,count(*) as cnt > from music_queue_3 group by action, os,tumble(proctime, INTERVAL '10' > SECOND)"); tableEnvironment.registerTable("v1", table); > Table t2 = tableEnvironment.sqlQuery("select action, os, 1 as cnt from v1"); > tableEnvironment.registerTable("v2", t2); > tableEnvironment.sqlUpdate("INSERT INTO database2.kafka_table_test1 SELECT > action, os,cast (cnt as BIGINT) as cnt from v2"); > {code} > suppose source table music_queue_3 and sink table kafka_table_test1 both in > user_catalog > catalog > but some temp table or view such as v1, v2,v3 will register in default > catalog. > when we select temp table v2 and insert it into our own catalog table > database2.kafka_table_test1 > it always failed with sql node validate, because of schema path in > catalog reader is the current catalog without default catalog,the temp table > or view will never be Identified -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Closed] (FLINK-14666) support multiple catalog in flink table sql
[ https://issues.apache.org/jira/browse/FLINK-14666?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yuemeng closed FLINK-14666. --- Resolution: Implemented > support multiple catalog in flink table sql > --- > > Key: FLINK-14666 > URL: https://issues.apache.org/jira/browse/FLINK-14666 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.8.0, 1.8.2, 1.9.0, 1.9.1 >Reporter: yuemeng >Priority: Critical > Labels: pull-request-available > Time Spent: 20m > Remaining Estimate: 0h > > currently, calcite will only use the current catalog as schema path to > validate sql node, > maybe this is not reasonable > {code} > tableEnvironment.useCatalog("user_catalog"); > tableEnvironment.useDatabase("user_db"); > Table table = tableEnvironment.sqlQuery("SELECT action, os,count(*) as cnt > from music_queue_3 group by action, os,tumble(proctime, INTERVAL '10' > SECOND)"); tableEnvironment.registerTable("v1", table); > Table t2 = tableEnvironment.sqlQuery("select action, os, 1 as cnt from v1"); > tableEnvironment.registerTable("v2", t2); > tableEnvironment.sqlUpdate("INSERT INTO database2.kafka_table_test1 SELECT > action, os,cast (cnt as BIGINT) as cnt from v2"); > {code} > suppose source table music_queue_3 and sink table kafka_table_test1 both in > user_catalog > catalog > but some temp table or view such as v1, v2,v3 will register in default > catalog. > when we select temp table v2 and insert it into our own catalog table > database2.kafka_table_test1 > it always failed with sql node validate, because of schema path in > catalog reader is the current catalog without default catalog,the temp table > or view will never be Identified -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20935) can't write flink configuration to tmp file and add it to local resource in yarn session mode
yuemeng created FLINK-20935: --- Summary: can't write flink configuration to tmp file and add it to local resource in yarn session mode Key: FLINK-20935 URL: https://issues.apache.org/jira/browse/FLINK-20935 Project: Flink Issue Type: Bug Components: Deployment / YARN Affects Versions: 1.12.0, 1.13.0 Reporter: yuemeng In flink 1.12.0 or lastest version,when we execute command such as bin/yarn-session.sh -n 20 -jm 9096 -nm 4096 -st,the depoy will be failed with follow errors: {code} org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't deploy Yarn session cluster at org.apache.flink.yarn.YarnClusterDescriptor.deploySessionCluster(YarnClusterDescriptor.java:411) at org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:498) at org.apache.flink.yarn.cli.FlinkYarnSessionCli.lambda$main$4(FlinkYarnSessionCli.java:730) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754) at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:730) Caused by: java.io.FileNotFoundException: File does not exist: /tmp/application_1573723355201_0036-flink-conf.yaml688141408443326132.tmp at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1309) {code} when we called startAppMaster method in YarnClusterDescriptor,it will be try to write flink configuration to tmp file and add it to local resource. but the follow code will make the tmp file system as a distribute file system {code} // Upload the flink configuration // write out configuration file File tmpConfigurationFile = null; try { tmpConfigurationFile = File.createTempFile(appId + "-flink-conf.yaml", null); BootstrapTools.writeConfiguration(configuration, tmpConfigurationFile); String flinkConfigKey = "flink-conf.yaml"; fileUploader.registerSingleLocalResource( flinkConfigKey, new Path(tmpConfigurationFile.getAbsolutePath()), "", LocalResourceType.FILE, true, true); classPathBuilder.append("flink-conf.yaml").append(File.pathSeparator); } finally { if (tmpConfigurationFile != null && !tmpConfigurationFile.delete()) { LOG.warn("Fail to delete temporary file {}.", tmpConfigurationFile.toPath()); } } {code} {code} tmpConfigurationFile.getAbsolutePath() {code} method will be return a path without file schema and the file system will be considered as a distribute file system -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-20935) can't write flink configuration to tmp file and add it to local resource in yarn session mode
[ https://issues.apache.org/jira/browse/FLINK-20935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17263861#comment-17263861 ] yuemeng commented on FLINK-20935: - [~fly_in_gis],yes ,i set fs.default-scheme to hdfs, but create tmp file to write flink configuration and add it to local resource don‘t need rely on this configuration just like the way we handle jobGraph. {code} // write job graph to tmp file and add it to local resource // TODO: server use user main method to generate job graph if (jobGraph != null) { File tmpJobGraphFile = null; try { tmpJobGraphFile = File.createTempFile(appId.toString(), null); try (FileOutputStream output = new FileOutputStream(tmpJobGraphFile); ObjectOutputStream obOutput = new ObjectOutputStream(output)) { obOutput.writeObject(jobGraph); } final String jobGraphFilename = "job.graph"; configuration.setString(JOB_GRAPH_FILE_PATH, jobGraphFilename); fileUploader.registerSingleLocalResource( jobGraphFilename, new Path(tmpJobGraphFile.toURI()), "", LocalResourceType.FILE, true, false); classPathBuilder.append(jobGraphFilename).append(File.pathSeparator); } catch (Exception e) { LOG.warn("Add job graph to local resource fail."); throw e; } finally { if (tmpJobGraphFile != null && !tmpJobGraphFile.delete()) { LOG.warn("Fail to delete temporary file {}.", tmpJobGraphFile.toPath()); } } }{code} > can't write flink configuration to tmp file and add it to local resource in > yarn session mode > - > > Key: FLINK-20935 > URL: https://issues.apache.org/jira/browse/FLINK-20935 > Project: Flink > Issue Type: Bug > Components: Deployment / YARN >Affects Versions: 1.12.0, 1.13.0 >Reporter: yuemeng >Priority: Major > Labels: pull-request-available > > In flink 1.12.0 or lastest version,when we execute command such as > bin/yarn-session.sh -n 20 -jm 9096 -nm 4096 -st,the depoy will be failed with > follow errors: > {code} > org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't > deploy Yarn session cluster > at > org.apache.flink.yarn.YarnClusterDescriptor.deploySessionCluster(YarnClusterDescriptor.java:411) > at > org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:498) > at > org.apache.flink.yarn.cli.FlinkYarnSessionCli.lambda$main$4(FlinkYarnSessionCli.java:730) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754) > at > org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at > org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:730) > Caused by: java.io.FileNotFoundException: File does not exist: > /tmp/application_1573723355201_0036-flink-conf.yaml688141408443326132.tmp > at > org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1309) > {code} > when we called startAppMaster method in YarnClusterDescriptor,it will be try > to write flink configuration to tmp file and add it to local resource. but > the follow code will make the tmp file system as a distribute file system > {code} > // Upload the flink configuration > // write out configuration file > File tmpConfigurationFile = null; > try { > tmpConfigurationFile = File.createTempFile(appId + > "-flink-conf.yaml", null); > BootstrapTools.writeConfiguration(configuration, > tmpConfigurationFile); > String flinkConfigKey = "flink-conf.yaml"; > fileUploader.registerSingleLocalResource( > flinkConfigKey, > new > Path(tmpConfigurationFile.getAbsolutePath()), > "", > LocalResourceType.FILE, > true, > true); > > classPathBuilder.append("flink-conf.yaml").append(File.pathSeparator); > } finally { > if (tmpConfigurationFile != null && > !tmpConfigurationFile.delete()) { > LOG.warn("Fail to delete temporary file {}.", > tmpConfigurationFile.toPath()); > } > } > {code} > {code} tmpConfigurationFile.getAbsolutePath() {code} method will be return a > path without file schema and the file system will be considered as a > distribute file system -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-20935) can't write flink configuration to tmp file and add it to local resource in yarn session mode
[ https://issues.apache.org/jira/browse/FLINK-20935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17264565#comment-17264565 ] yuemeng commented on FLINK-20935: - [~trohrmann] [~fly_in_gis],yes,this is a good way to fixed this problem. I will fixed in this way > can't write flink configuration to tmp file and add it to local resource in > yarn session mode > - > > Key: FLINK-20935 > URL: https://issues.apache.org/jira/browse/FLINK-20935 > Project: Flink > Issue Type: Bug > Components: Deployment / YARN >Affects Versions: 1.12.0, 1.13.0 >Reporter: yuemeng >Assignee: yuemeng >Priority: Major > Labels: pull-request-available > > In flink 1.12.0 or lastest version,when we execute command such as > bin/yarn-session.sh -n 20 -jm 9096 -nm 4096 -st,the depoy will be failed with > follow errors: > {code} > org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't > deploy Yarn session cluster > at > org.apache.flink.yarn.YarnClusterDescriptor.deploySessionCluster(YarnClusterDescriptor.java:411) > at > org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:498) > at > org.apache.flink.yarn.cli.FlinkYarnSessionCli.lambda$main$4(FlinkYarnSessionCli.java:730) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754) > at > org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at > org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:730) > Caused by: java.io.FileNotFoundException: File does not exist: > /tmp/application_1573723355201_0036-flink-conf.yaml688141408443326132.tmp > at > org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1309) > {code} > when we called startAppMaster method in YarnClusterDescriptor,it will be try > to write flink configuration to tmp file and add it to local resource. but > the follow code will make the tmp file system as a distribute file system > {code} > // Upload the flink configuration > // write out configuration file > File tmpConfigurationFile = null; > try { > tmpConfigurationFile = File.createTempFile(appId + > "-flink-conf.yaml", null); > BootstrapTools.writeConfiguration(configuration, > tmpConfigurationFile); > String flinkConfigKey = "flink-conf.yaml"; > fileUploader.registerSingleLocalResource( > flinkConfigKey, > new > Path(tmpConfigurationFile.getAbsolutePath()), > "", > LocalResourceType.FILE, > true, > true); > > classPathBuilder.append("flink-conf.yaml").append(File.pathSeparator); > } finally { > if (tmpConfigurationFile != null && > !tmpConfigurationFile.delete()) { > LOG.warn("Fail to delete temporary file {}.", > tmpConfigurationFile.toPath()); > } > } > {code} > {code} tmpConfigurationFile.getAbsolutePath() {code} method will be return a > path without file schema and the file system will be considered as a > distribute file system -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-20935) can't write flink configuration to tmp file and add it to local resource in yarn session mode
[ https://issues.apache.org/jira/browse/FLINK-20935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17264565#comment-17264565 ] yuemeng edited comment on FLINK-20935 at 1/14/21, 3:02 AM: --- [~trohrmann] [~fly_in_gis],yes,I agree that,I will check all the call of {{registerSingleLocalResource}} to make sure that the local path is qualified was (Author: yuemeng): [~trohrmann] [~fly_in_gis],yes,this is a good way to fixed this problem. I will fixed in this way > can't write flink configuration to tmp file and add it to local resource in > yarn session mode > - > > Key: FLINK-20935 > URL: https://issues.apache.org/jira/browse/FLINK-20935 > Project: Flink > Issue Type: Bug > Components: Deployment / YARN >Affects Versions: 1.12.0, 1.13.0 >Reporter: yuemeng >Assignee: yuemeng >Priority: Major > Labels: pull-request-available > > In flink 1.12.0 or lastest version,when we execute command such as > bin/yarn-session.sh -n 20 -jm 9096 -nm 4096 -st,the depoy will be failed with > follow errors: > {code} > org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't > deploy Yarn session cluster > at > org.apache.flink.yarn.YarnClusterDescriptor.deploySessionCluster(YarnClusterDescriptor.java:411) > at > org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:498) > at > org.apache.flink.yarn.cli.FlinkYarnSessionCli.lambda$main$4(FlinkYarnSessionCli.java:730) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754) > at > org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at > org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:730) > Caused by: java.io.FileNotFoundException: File does not exist: > /tmp/application_1573723355201_0036-flink-conf.yaml688141408443326132.tmp > at > org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1309) > {code} > when we called startAppMaster method in YarnClusterDescriptor,it will be try > to write flink configuration to tmp file and add it to local resource. but > the follow code will make the tmp file system as a distribute file system > {code} > // Upload the flink configuration > // write out configuration file > File tmpConfigurationFile = null; > try { > tmpConfigurationFile = File.createTempFile(appId + > "-flink-conf.yaml", null); > BootstrapTools.writeConfiguration(configuration, > tmpConfigurationFile); > String flinkConfigKey = "flink-conf.yaml"; > fileUploader.registerSingleLocalResource( > flinkConfigKey, > new > Path(tmpConfigurationFile.getAbsolutePath()), > "", > LocalResourceType.FILE, > true, > true); > > classPathBuilder.append("flink-conf.yaml").append(File.pathSeparator); > } finally { > if (tmpConfigurationFile != null && > !tmpConfigurationFile.delete()) { > LOG.warn("Fail to delete temporary file {}.", > tmpConfigurationFile.toPath()); > } > } > {code} > {code} tmpConfigurationFile.getAbsolutePath() {code} method will be return a > path without file schema and the file system will be considered as a > distribute file system -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-20935) can't write flink configuration to tmp file and add it to local resource in yarn session mode
[ https://issues.apache.org/jira/browse/FLINK-20935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17264565#comment-17264565 ] yuemeng edited comment on FLINK-20935 at 1/18/21, 2:44 AM: --- [~trohrmann] [~fly_in_gis],yes,I agree that,I will check all the call of {{registerSingleLocalResource}} to make sure that the local path is qualified [~trohrmann] [~fly_in_gis],i had checked all the place who called {{registerSingleLocalResource method.but for now,only this place can cause the problem,because of other local file always had a file schema(file:///)}} was (Author: yuemeng): [~trohrmann] [~fly_in_gis],yes,I agree that,I will check all the call of {{registerSingleLocalResource}} to make sure that the local path is qualified > can't write flink configuration to tmp file and add it to local resource in > yarn session mode > - > > Key: FLINK-20935 > URL: https://issues.apache.org/jira/browse/FLINK-20935 > Project: Flink > Issue Type: Bug > Components: Deployment / YARN >Affects Versions: 1.12.0, 1.13.0 >Reporter: yuemeng >Assignee: yuemeng >Priority: Major > Labels: pull-request-available > > In flink 1.12.0 or lastest version,when we execute command such as > bin/yarn-session.sh -n 20 -jm 9096 -nm 4096 -st,the depoy will be failed with > follow errors: > {code} > org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't > deploy Yarn session cluster > at > org.apache.flink.yarn.YarnClusterDescriptor.deploySessionCluster(YarnClusterDescriptor.java:411) > at > org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:498) > at > org.apache.flink.yarn.cli.FlinkYarnSessionCli.lambda$main$4(FlinkYarnSessionCli.java:730) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754) > at > org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at > org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:730) > Caused by: java.io.FileNotFoundException: File does not exist: > /tmp/application_1573723355201_0036-flink-conf.yaml688141408443326132.tmp > at > org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1309) > {code} > when we called startAppMaster method in YarnClusterDescriptor,it will be try > to write flink configuration to tmp file and add it to local resource. but > the follow code will make the tmp file system as a distribute file system > {code} > // Upload the flink configuration > // write out configuration file > File tmpConfigurationFile = null; > try { > tmpConfigurationFile = File.createTempFile(appId + > "-flink-conf.yaml", null); > BootstrapTools.writeConfiguration(configuration, > tmpConfigurationFile); > String flinkConfigKey = "flink-conf.yaml"; > fileUploader.registerSingleLocalResource( > flinkConfigKey, > new > Path(tmpConfigurationFile.getAbsolutePath()), > "", > LocalResourceType.FILE, > true, > true); > > classPathBuilder.append("flink-conf.yaml").append(File.pathSeparator); > } finally { > if (tmpConfigurationFile != null && > !tmpConfigurationFile.delete()) { > LOG.warn("Fail to delete temporary file {}.", > tmpConfigurationFile.toPath()); > } > } > {code} > {code} tmpConfigurationFile.getAbsolutePath() {code} method will be return a > path without file schema and the file system will be considered as a > distribute file system -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-20935) can't write flink configuration to tmp file and add it to local resource in yarn session mode
[ https://issues.apache.org/jira/browse/FLINK-20935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17268446#comment-17268446 ] yuemeng commented on FLINK-20935: - [~trohrmann] ,yes ,JerryTaoTao is myself.i will take a look at your comment. [~fly_in_gis],if we create a local file, we can't treat it as a distribute file path. any operation like path.getFileSystem(yarnConfiguration).makeQualified(path) or fileSystem.makeQualified(resourcePath) will only make it as a qualified distribute file path,but in fact it's just a local file to upload to hdfs and download to local disk by container > can't write flink configuration to tmp file and add it to local resource in > yarn session mode > - > > Key: FLINK-20935 > URL: https://issues.apache.org/jira/browse/FLINK-20935 > Project: Flink > Issue Type: Bug > Components: Deployment / YARN >Affects Versions: 1.12.0, 1.13.0 >Reporter: yuemeng >Assignee: yuemeng >Priority: Major > Labels: pull-request-available > > In flink 1.12.0 or lastest version,when we execute command such as > bin/yarn-session.sh -n 20 -jm 9096 -nm 4096 -st,the depoy will be failed with > follow errors: > {code} > org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't > deploy Yarn session cluster > at > org.apache.flink.yarn.YarnClusterDescriptor.deploySessionCluster(YarnClusterDescriptor.java:411) > at > org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:498) > at > org.apache.flink.yarn.cli.FlinkYarnSessionCli.lambda$main$4(FlinkYarnSessionCli.java:730) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754) > at > org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at > org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:730) > Caused by: java.io.FileNotFoundException: File does not exist: > /tmp/application_1573723355201_0036-flink-conf.yaml688141408443326132.tmp > at > org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1309) > {code} > when we called startAppMaster method in YarnClusterDescriptor,it will be try > to write flink configuration to tmp file and add it to local resource. but > the follow code will make the tmp file system as a distribute file system > {code} > // Upload the flink configuration > // write out configuration file > File tmpConfigurationFile = null; > try { > tmpConfigurationFile = File.createTempFile(appId + > "-flink-conf.yaml", null); > BootstrapTools.writeConfiguration(configuration, > tmpConfigurationFile); > String flinkConfigKey = "flink-conf.yaml"; > fileUploader.registerSingleLocalResource( > flinkConfigKey, > new > Path(tmpConfigurationFile.getAbsolutePath()), > "", > LocalResourceType.FILE, > true, > true); > > classPathBuilder.append("flink-conf.yaml").append(File.pathSeparator); > } finally { > if (tmpConfigurationFile != null && > !tmpConfigurationFile.delete()) { > LOG.warn("Fail to delete temporary file {}.", > tmpConfigurationFile.toPath()); > } > } > {code} > {code} tmpConfigurationFile.getAbsolutePath() {code} method will be return a > path without file schema and the file system will be considered as a > distribute file system -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-20935) can't write flink configuration to tmp file and add it to local resource in yarn session mode
[ https://issues.apache.org/jira/browse/FLINK-20935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17269941#comment-17269941 ] yuemeng commented on FLINK-20935: - [~trohrmann], i had updated the pr, can u review it. thanks > can't write flink configuration to tmp file and add it to local resource in > yarn session mode > - > > Key: FLINK-20935 > URL: https://issues.apache.org/jira/browse/FLINK-20935 > Project: Flink > Issue Type: Bug > Components: Deployment / YARN >Affects Versions: 1.12.0, 1.13.0 >Reporter: yuemeng >Assignee: yuemeng >Priority: Major > Labels: pull-request-available > > In flink 1.12.0 or lastest version,when we execute command such as > bin/yarn-session.sh -n 20 -jm 9096 -nm 4096 -st,the depoy will be failed with > follow errors: > {code} > org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't > deploy Yarn session cluster > at > org.apache.flink.yarn.YarnClusterDescriptor.deploySessionCluster(YarnClusterDescriptor.java:411) > at > org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:498) > at > org.apache.flink.yarn.cli.FlinkYarnSessionCli.lambda$main$4(FlinkYarnSessionCli.java:730) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754) > at > org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at > org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:730) > Caused by: java.io.FileNotFoundException: File does not exist: > /tmp/application_1573723355201_0036-flink-conf.yaml688141408443326132.tmp > at > org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1309) > {code} > when we called startAppMaster method in YarnClusterDescriptor,it will be try > to write flink configuration to tmp file and add it to local resource. but > the follow code will make the tmp file system as a distribute file system > {code} > // Upload the flink configuration > // write out configuration file > File tmpConfigurationFile = null; > try { > tmpConfigurationFile = File.createTempFile(appId + > "-flink-conf.yaml", null); > BootstrapTools.writeConfiguration(configuration, > tmpConfigurationFile); > String flinkConfigKey = "flink-conf.yaml"; > fileUploader.registerSingleLocalResource( > flinkConfigKey, > new > Path(tmpConfigurationFile.getAbsolutePath()), > "", > LocalResourceType.FILE, > true, > true); > > classPathBuilder.append("flink-conf.yaml").append(File.pathSeparator); > } finally { > if (tmpConfigurationFile != null && > !tmpConfigurationFile.delete()) { > LOG.warn("Fail to delete temporary file {}.", > tmpConfigurationFile.toPath()); > } > } > {code} > {code} tmpConfigurationFile.getAbsolutePath() {code} method will be return a > path without file schema and the file system will be considered as a > distribute file system -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-29801) OperatorCoordinator need open the way to operate metricGroup interface
yuemeng created FLINK-29801: --- Summary: OperatorCoordinator need open the way to operate metricGroup interface Key: FLINK-29801 URL: https://issues.apache.org/jira/browse/FLINK-29801 Project: Flink Issue Type: Improvement Components: Runtime / Coordination Reporter: yuemeng Currently, We have no way to get metric group instances in OperatorCoordinator In some cases, we may report some metric in OperatorCoordinator such as Flink hudi integrate scene, some meta will send to operator coordinator to commit to hdfs or hms but we also need to report some metrics in operator coordinator for monitor purpose -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29801) OperatorCoordinator need open the way to operate metricGroup interface
[ https://issues.apache.org/jira/browse/FLINK-29801?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17626419#comment-17626419 ] yuemeng commented on FLINK-29801: - [~jark] [~rmetzger] Can you review this improvement, because we really need it in Hudi to report some metric > OperatorCoordinator need open the way to operate metricGroup interface > -- > > Key: FLINK-29801 > URL: https://issues.apache.org/jira/browse/FLINK-29801 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Reporter: yuemeng >Priority: Major > Labels: pull-request-available > > Currently, We have no way to get metric group instances in OperatorCoordinator > In some cases, we may report some metric in OperatorCoordinator such as Flink > hudi integrate scene, some meta will send to operator coordinator to commit > to hdfs or hms > but we also need to report some metrics in operator coordinator for monitor > purpose -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29801) OperatorCoordinator need open the way to operate metricGroup interface
[ https://issues.apache.org/jira/browse/FLINK-29801?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17626423#comment-17626423 ] yuemeng commented on FLINK-29801: - [~danny0405] Can you review this improvement, we need this to report some metrics in stream write coordinator > OperatorCoordinator need open the way to operate metricGroup interface > -- > > Key: FLINK-29801 > URL: https://issues.apache.org/jira/browse/FLINK-29801 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Reporter: yuemeng >Priority: Major > Labels: pull-request-available > > Currently, We have no way to get metric group instances in OperatorCoordinator > In some cases, we may report some metric in OperatorCoordinator such as Flink > hudi integrate scene, some meta will send to operator coordinator to commit > to hdfs or hms > but we also need to report some metrics in operator coordinator for monitor > purpose -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29801) OperatorCoordinator need open the way to operate metricGroup interface
[ https://issues.apache.org/jira/browse/FLINK-29801?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17635341#comment-17635341 ] yuemeng commented on FLINK-29801: - [~ruanhang1993] [~zhuzh] Firstly, thanks a lot for your reply The following answer to your 2 question # Each implementation of OperatorCoordinator will hold the metric group instance that can be registered for anything they wanted. this feature is not only for SourceCoordinator, eg: what about the other operator coordinators who don't have a context # Each operator coordinator's metric group is distinguished by metric group name Secondly This implement already worked in our product env, if we had or wanted a new design, we can push it forward and finish quickly because we real need it > OperatorCoordinator need open the way to operate metricGroup interface > -- > > Key: FLINK-29801 > URL: https://issues.apache.org/jira/browse/FLINK-29801 > Project: Flink > Issue Type: Improvement > Components: Runtime / Metrics >Reporter: yuemeng >Assignee: yuemeng >Priority: Major > Labels: pull-request-available > > Currently, We have no way to get metric group instances in OperatorCoordinator > In some cases, we may report some metric in OperatorCoordinator such as Flink > hudi integrate scene, some meta will send to operator coordinator to commit > to hdfs or hms > but we also need to report some metrics in operator coordinator for monitor > purpose -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29801) OperatorCoordinator need open the way to operate metricGroup interface
[ https://issues.apache.org/jira/browse/FLINK-29801?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17636464#comment-17636464 ] yuemeng commented on FLINK-29801: - Hi [~ruanhang1993], [~zhuzh] A further consideration is good, very happy to hear that and looking forward to this Flip, I will focus on any progress of this Flip and It would be even better if I could help it finish, If any part needs me to join in, just let me know > OperatorCoordinator need open the way to operate metricGroup interface > -- > > Key: FLINK-29801 > URL: https://issues.apache.org/jira/browse/FLINK-29801 > Project: Flink > Issue Type: Improvement > Components: Runtime / Metrics >Reporter: yuemeng >Assignee: yuemeng >Priority: Major > Labels: pull-request-available > > Currently, We have no way to get metric group instances in OperatorCoordinator > In some cases, we may report some metric in OperatorCoordinator such as Flink > hudi integrate scene, some meta will send to operator coordinator to commit > to hdfs or hms > but we also need to report some metrics in operator coordinator for monitor > purpose -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29801) OperatorCoordinator need open the way to operate metricGroup interface
[ https://issues.apache.org/jira/browse/FLINK-29801?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17638122#comment-17638122 ] yuemeng commented on FLINK-29801: - [~ruanhang1993] I will review this FLIP later and looking forward to working with you on this FLIP, can you tell me your Slack or Dingding Id > OperatorCoordinator need open the way to operate metricGroup interface > -- > > Key: FLINK-29801 > URL: https://issues.apache.org/jira/browse/FLINK-29801 > Project: Flink > Issue Type: Improvement > Components: Runtime / Metrics >Reporter: yuemeng >Assignee: yuemeng >Priority: Major > Labels: pull-request-available > > Currently, We have no way to get metric group instances in OperatorCoordinator > In some cases, we may report some metric in OperatorCoordinator such as Flink > hudi integrate scene, some meta will send to operator coordinator to commit > to hdfs or hms > but we also need to report some metrics in operator coordinator for monitor > purpose -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29801) OperatorCoordinator need open the way to operate metricGroup interface
[ https://issues.apache.org/jira/browse/FLINK-29801?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17638202#comment-17638202 ] yuemeng commented on FLINK-29801: - [~ruanhang1993] Here is my email(272614...@qq.com) > OperatorCoordinator need open the way to operate metricGroup interface > -- > > Key: FLINK-29801 > URL: https://issues.apache.org/jira/browse/FLINK-29801 > Project: Flink > Issue Type: Improvement > Components: Runtime / Metrics >Reporter: yuemeng >Assignee: yuemeng >Priority: Major > Labels: pull-request-available > > Currently, We have no way to get metric group instances in OperatorCoordinator > In some cases, we may report some metric in OperatorCoordinator such as Flink > hudi integrate scene, some meta will send to operator coordinator to commit > to hdfs or hms > but we also need to report some metrics in operator coordinator for monitor > purpose -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-7145) Flink SQL API should support multiple parameters for UserDefinedAggFunction
yuemeng created FLINK-7145: -- Summary: Flink SQL API should support multiple parameters for UserDefinedAggFunction Key: FLINK-7145 URL: https://issues.apache.org/jira/browse/FLINK-7145 Project: Flink Issue Type: New Feature Components: Table API & SQL Reporter: yuemeng Priority: Critical UDAF such as topK and some other udaf with bloom filter need more than one parameters ,we should make flink sql support this. base on flink sql support DML and multiple parameters udaf,we can execute sql like: {code} CREATE TEMPORARY function 'TOPK' AS 'com..aggregate.udaf.distinctUdaf.topk.ITopKUDAF'; INSERT INTO db_sink SELECT id, TOPK(price, 5, 'DESC') FROM kafka_source GROUP BY id; {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7146) FLINK SQLs support DDL
yuemeng created FLINK-7146: -- Summary: FLINK SQLs support DDL Key: FLINK-7146 URL: https://issues.apache.org/jira/browse/FLINK-7146 Project: Flink Issue Type: Sub-task Components: Table API & SQL Reporter: yuemeng For now,Flink SQL can't support DDL, we can only register a table by call registerTableInternal in TableEnvironment we should support DDL for sql such as create a table or create function like: {code} CREATE TABLE kafka_source ( id INT, price INT ) PROPERTIES ( category = 'source', type = 'kafka', version = '0.9.0.1', separator = ',', topic = 'test', brokers = 'xx:9092', group_id = 'test' ); CREATE TABLE db_sink ( id INT, price DOUBLE ) PROPERTIES ( category = 'sink', type = 'mysql', table_name = 'udaf_test', url = 'jdbc:mysql://127.0.0.1:3308/ds?useUnicode=true&characterEncoding=UTF8', username = 'ds_dev', password = 's]k51_(>R' ); CREATE TEMPORARY function 'AVGUDAF' AS 'com.x.server.codegen.aggregate.udaf.avg.IntegerAvgUDAF'; INSERT INTO db_sink SELECT id ,AVGUDAF(price) FROM kafka_source group by id {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (FLINK-7145) Flink SQL API should support multiple parameters for UserDefinedAggFunction
[ https://issues.apache.org/jira/browse/FLINK-7145?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16081847#comment-16081847 ] yuemeng edited comment on FLINK-7145 at 7/11/17 8:20 AM: - [~fhueske] thanks a lot for reply. some days ago, when i try to make flink udaf to support multiple parameters, i match a problem suppose we registered more than one function with same name topK to schema. 1)two function as follow with same name Top10(INT,INT) Top10(INT,DOUBLE) 2) execute sql {code} SELECT TUMBLE_END(rowtime, INTERVAL '10' MINUTE), top10(cnt, page) AS topPages FROM pageVisits GROUP BY TUMBLE(rowtime, INTERVAL '10' MINUTE); {code} suppose cnt is INT,page is Double,for now calcite can't find the exact function. 3) you can check this pr [https://issues.apache.org/jira/browse/CALCITE-1833] was (Author: yuemeng): [~fhueske] thanks a lot for reply. some days ago, when i try to make flink udaf to support multiple parameters, i match a problem suppose we registered more than one function with same name topK to schema. 1)two function as follow with same name Top10(INT,INT) Top10(INT,DOUBLE) 2) execute sql {code} SELECT TUMBLE_END(rowtime, INTERVAL '10' MINUTE), top10(cnt, page) AS topPages FROM pageVisits GROUP BY TUMBLE(rowtime, INTERVAL '10' MINUTE); {code} suppose cnt is INT,page is Double,for now calcite can't find the exact function. 3) you can check this pr [https://issues.apache.org/jira/browse/CALCITE-1833] > Flink SQL API should support multiple parameters for UserDefinedAggFunction > --- > > Key: FLINK-7145 > URL: https://issues.apache.org/jira/browse/FLINK-7145 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: yuemeng >Priority: Critical > > UDAF such as topK and some other udaf with bloom filter need more than one > parameters ,we should make flink sql support this. > base on flink sql support DML and multiple parameters udaf,we can execute sql > like: > {code} > CREATE TEMPORARY function 'TOPK' AS > 'com..aggregate.udaf.distinctUdaf.topk.ITopKUDAF'; > INSERT INTO db_sink SELECT id, TOPK(price, 5, 'DESC') FROM kafka_source GROUP > BY id; > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7148) Flink SQL API support DDL
yuemeng created FLINK-7148: -- Summary: Flink SQL API support DDL Key: FLINK-7148 URL: https://issues.apache.org/jira/browse/FLINK-7148 Project: Flink Issue Type: Bug Components: Table API & SQL Reporter: yuemeng For now,Flink SQL can't support DDL operation,user can only register a table by call registerTableInternal in TableEnvironment. we should support DDL such as create table or create function like: {code} CREATE TABLE kafka_source ( id INT, price INT ) PROPERTIES ( category = 'source', type = 'kafka', version = '0.9.0.1', separator = ',', topic = 'test', brokers = ':9092', group_id = 'test' ); CREATE TABLE db_sink ( id INT, price DOUBLE ) PROPERTIES ( category = 'sink', type = 'mysql', table_name = 'udaf_test', url = 'jdbc:mysql://127.0.0.1:3308/ds?useUnicode=true&characterEncoding=UTF8', username = 'ds_dev', password = 's]k51_(>R' ); CREATE TEMPORARY function 'AVGUDAF' AS 'com..server.codegen.aggregate.udaf.avg.IntegerAvgUDAF'; INSERT INTO db_sink SELECT id ,AVGUDAF(price) FROM kafka_source group by id {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7148) Flink SQL API support DDL
[ https://issues.apache.org/jira/browse/FLINK-7148?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yuemeng updated FLINK-7148: --- Priority: Critical (was: Major) > Flink SQL API support DDL > -- > > Key: FLINK-7148 > URL: https://issues.apache.org/jira/browse/FLINK-7148 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: yuemeng >Priority: Critical > > For now,Flink SQL can't support DDL operation,user can only register a table > by call registerTableInternal in TableEnvironment. we should support DDL such > as create table or create function like: > {code} > CREATE TABLE kafka_source ( > id INT, > price INT > ) PROPERTIES ( > category = 'source', > type = 'kafka', > version = '0.9.0.1', > separator = ',', > topic = 'test', > brokers = ':9092', > group_id = 'test' > ); > CREATE TABLE db_sink ( > id INT, > price DOUBLE > ) PROPERTIES ( > category = 'sink', > type = 'mysql', > table_name = 'udaf_test', > url = > 'jdbc:mysql://127.0.0.1:3308/ds?useUnicode=true&characterEncoding=UTF8', > username = 'ds_dev', > password = 's]k51_(>R' > ); > CREATE TEMPORARY function 'AVGUDAF' AS > 'com..server.codegen.aggregate.udaf.avg.IntegerAvgUDAF'; > INSERT INTO db_sink SELECT id ,AVGUDAF(price) FROM kafka_source group by id > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7145) Flink SQL API should support multiple parameters for UserDefinedAggFunction
[ https://issues.apache.org/jira/browse/FLINK-7145?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16081847#comment-16081847 ] yuemeng commented on FLINK-7145: [~fhueske] thanks a lot for reply. some days ago, when i try to make flink udaf to support multiple parameters, i match a problem suppose we registered more than one function with same name topK to schema. 1)two function as follow with same name Top10(INT,INT) Top10(INT,DOUBLE) 2) execute sql {code} SELECT TUMBLE_END(rowtime, INTERVAL '10' MINUTE), top10(cnt, page) AS topPages FROM pageVisits GROUP BY TUMBLE(rowtime, INTERVAL '10' MINUTE); {code} suppose cnt is INT,page is Double,for now calcite can't find the exact function. 3) you can check this pr [https://issues.apache.org/jira/browse/CALCITE-1833] > Flink SQL API should support multiple parameters for UserDefinedAggFunction > --- > > Key: FLINK-7145 > URL: https://issues.apache.org/jira/browse/FLINK-7145 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: yuemeng >Priority: Critical > > UDAF such as topK and some other udaf with bloom filter need more than one > parameters ,we should make flink sql support this. > base on flink sql support DML and multiple parameters udaf,we can execute sql > like: > {code} > CREATE TEMPORARY function 'TOPK' AS > 'com..aggregate.udaf.distinctUdaf.topk.ITopKUDAF'; > INSERT INTO db_sink SELECT id, TOPK(price, 5, 'DESC') FROM kafka_source GROUP > BY id; > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7148) Flink SQL API support DDL
[ https://issues.apache.org/jira/browse/FLINK-7148?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yuemeng updated FLINK-7148: --- Issue Type: New Feature (was: Bug) > Flink SQL API support DDL > -- > > Key: FLINK-7148 > URL: https://issues.apache.org/jira/browse/FLINK-7148 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: yuemeng > > For now,Flink SQL can't support DDL operation,user can only register a table > by call registerTableInternal in TableEnvironment. we should support DDL such > as create table or create function like: > {code} > CREATE TABLE kafka_source ( > id INT, > price INT > ) PROPERTIES ( > category = 'source', > type = 'kafka', > version = '0.9.0.1', > separator = ',', > topic = 'test', > brokers = ':9092', > group_id = 'test' > ); > CREATE TABLE db_sink ( > id INT, > price DOUBLE > ) PROPERTIES ( > category = 'sink', > type = 'mysql', > table_name = 'udaf_test', > url = > 'jdbc:mysql://127.0.0.1:3308/ds?useUnicode=true&characterEncoding=UTF8', > username = 'ds_dev', > password = 's]k51_(>R' > ); > CREATE TEMPORARY function 'AVGUDAF' AS > 'com..server.codegen.aggregate.udaf.avg.IntegerAvgUDAF'; > INSERT INTO db_sink SELECT id ,AVGUDAF(price) FROM kafka_source group by id > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7145) Flink SQL API should support multiple parameters for UserDefinedAggFunction
[ https://issues.apache.org/jira/browse/FLINK-7145?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16081877#comment-16081877 ] yuemeng commented on FLINK-7145: [~fhueske] ok ,thanks for your reply, I see, I will close this iusse . For flink , update the calcite version will be solve this problem I will test this new feature in flink > Flink SQL API should support multiple parameters for UserDefinedAggFunction > --- > > Key: FLINK-7145 > URL: https://issues.apache.org/jira/browse/FLINK-7145 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: yuemeng > > UDAF such as topK and some other udaf with bloom filter need more than one > parameters ,we should make flink sql support this. > base on flink sql support DML and multiple parameters udaf,we can execute sql > like: > {code} > CREATE TEMPORARY function 'TOPK' AS > 'com..aggregate.udaf.distinctUdaf.topk.ITopKUDAF'; > INSERT INTO db_sink SELECT id, TOPK(price, 5, 'DESC') FROM kafka_source GROUP > BY id; > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7148) Flink SQL API support DDL
[ https://issues.apache.org/jira/browse/FLINK-7148?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16081862#comment-16081862 ] yuemeng commented on FLINK-7148: [~fhueske] can you check this feature? If so, can you assign this issue to me? > Flink SQL API support DDL > -- > > Key: FLINK-7148 > URL: https://issues.apache.org/jira/browse/FLINK-7148 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: yuemeng >Priority: Critical > > For now,Flink SQL can't support DDL operation,user can only register a table > by call registerTableInternal in TableEnvironment. we should support DDL such > as create table or create function like: > {code} > CREATE TABLE kafka_source ( > id INT, > price INT > ) PROPERTIES ( > category = 'source', > type = 'kafka', > version = '0.9.0.1', > separator = ',', > topic = 'test', > brokers = ':9092', > group_id = 'test' > ); > CREATE TABLE db_sink ( > id INT, > price DOUBLE > ) PROPERTIES ( > category = 'sink', > type = 'mysql', > table_name = 'udaf_test', > url = > 'jdbc:mysql://127.0.0.1:3308/ds?useUnicode=true&characterEncoding=UTF8', > username = 'ds_dev', > password = 's]k51_(>R' > ); > CREATE TEMPORARY function 'AVGUDAF' AS > 'com..server.codegen.aggregate.udaf.avg.IntegerAvgUDAF'; > INSERT INTO db_sink SELECT id ,AVGUDAF(price) FROM kafka_source group by id > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (FLINK-7148) Flink SQL API support DDL
[ https://issues.apache.org/jira/browse/FLINK-7148?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16081965#comment-16081965 ] yuemeng edited comment on FLINK-7148 at 7/11/17 9:29 AM: - [~jark] [~fhueske] this is duplicate with FLINK-6962,i will close this issue. was (Author: yuemeng): [~jark] [~fhueske] this is duplicate with FLINK-6962,i wil close this issue. > Flink SQL API support DDL > -- > > Key: FLINK-7148 > URL: https://issues.apache.org/jira/browse/FLINK-7148 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: yuemeng >Priority: Critical > > For now,Flink SQL can't support DDL operation,user can only register a table > by call registerTableInternal in TableEnvironment. we should support DDL such > as create table or create function like: > {code} > CREATE TABLE kafka_source ( > id INT, > price INT > ) PROPERTIES ( > category = 'source', > type = 'kafka', > version = '0.9.0.1', > separator = ',', > topic = 'test', > brokers = ':9092', > group_id = 'test' > ); > CREATE TABLE db_sink ( > id INT, > price DOUBLE > ) PROPERTIES ( > category = 'sink', > type = 'mysql', > table_name = 'udaf_test', > url = > 'jdbc:mysql://127.0.0.1:3308/ds?useUnicode=true&characterEncoding=UTF8', > username = 'ds_dev', > password = 's]k51_(>R' > ); > CREATE TEMPORARY function 'AVGUDAF' AS > 'com..server.codegen.aggregate.udaf.avg.IntegerAvgUDAF'; > INSERT INTO db_sink SELECT id ,AVGUDAF(price) FROM kafka_source group by id > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7148) Flink SQL API support DDL
[ https://issues.apache.org/jira/browse/FLINK-7148?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16081965#comment-16081965 ] yuemeng commented on FLINK-7148: [~jark] [~fhueske] this is duplicate with FLINK-6962,i wil close this issue. > Flink SQL API support DDL > -- > > Key: FLINK-7148 > URL: https://issues.apache.org/jira/browse/FLINK-7148 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: yuemeng >Priority: Critical > > For now,Flink SQL can't support DDL operation,user can only register a table > by call registerTableInternal in TableEnvironment. we should support DDL such > as create table or create function like: > {code} > CREATE TABLE kafka_source ( > id INT, > price INT > ) PROPERTIES ( > category = 'source', > type = 'kafka', > version = '0.9.0.1', > separator = ',', > topic = 'test', > brokers = ':9092', > group_id = 'test' > ); > CREATE TABLE db_sink ( > id INT, > price DOUBLE > ) PROPERTIES ( > category = 'sink', > type = 'mysql', > table_name = 'udaf_test', > url = > 'jdbc:mysql://127.0.0.1:3308/ds?useUnicode=true&characterEncoding=UTF8', > username = 'ds_dev', > password = 's]k51_(>R' > ); > CREATE TEMPORARY function 'AVGUDAF' AS > 'com..server.codegen.aggregate.udaf.avg.IntegerAvgUDAF'; > INSERT INTO db_sink SELECT id ,AVGUDAF(price) FROM kafka_source group by id > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-7148) Flink SQL API support DDL
[ https://issues.apache.org/jira/browse/FLINK-7148?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yuemeng closed FLINK-7148. -- Resolution: Duplicate > Flink SQL API support DDL > -- > > Key: FLINK-7148 > URL: https://issues.apache.org/jira/browse/FLINK-7148 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: yuemeng >Priority: Critical > > For now,Flink SQL can't support DDL operation,user can only register a table > by call registerTableInternal in TableEnvironment. we should support DDL such > as create table or create function like: > {code} > CREATE TABLE kafka_source ( > id INT, > price INT > ) PROPERTIES ( > category = 'source', > type = 'kafka', > version = '0.9.0.1', > separator = ',', > topic = 'test', > brokers = ':9092', > group_id = 'test' > ); > CREATE TABLE db_sink ( > id INT, > price DOUBLE > ) PROPERTIES ( > category = 'sink', > type = 'mysql', > table_name = 'udaf_test', > url = > 'jdbc:mysql://127.0.0.1:3308/ds?useUnicode=true&characterEncoding=UTF8', > username = 'ds_dev', > password = 's]k51_(>R' > ); > CREATE TEMPORARY function 'AVGUDAF' AS > 'com..server.codegen.aggregate.udaf.avg.IntegerAvgUDAF'; > INSERT INTO db_sink SELECT id ,AVGUDAF(price) FROM kafka_source group by id > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7148) Flink SQL API support DDL
[ https://issues.apache.org/jira/browse/FLINK-7148?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16082009#comment-16082009 ] yuemeng commented on FLINK-7148: [~fhueske] thanks ,i will do that. > Flink SQL API support DDL > -- > > Key: FLINK-7148 > URL: https://issues.apache.org/jira/browse/FLINK-7148 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: yuemeng >Priority: Critical > > For now,Flink SQL can't support DDL operation,user can only register a table > by call registerTableInternal in TableEnvironment. we should support DDL such > as create table or create function like: > {code} > CREATE TABLE kafka_source ( > id INT, > price INT > ) PROPERTIES ( > category = 'source', > type = 'kafka', > version = '0.9.0.1', > separator = ',', > topic = 'test', > brokers = ':9092', > group_id = 'test' > ); > CREATE TABLE db_sink ( > id INT, > price DOUBLE > ) PROPERTIES ( > category = 'sink', > type = 'mysql', > table_name = 'udaf_test', > url = > 'jdbc:mysql://127.0.0.1:3308/ds?useUnicode=true&characterEncoding=UTF8', > username = 'ds_dev', > password = 's]k51_(>R' > ); > CREATE TEMPORARY function 'AVGUDAF' AS > 'com..server.codegen.aggregate.udaf.avg.IntegerAvgUDAF'; > INSERT INTO db_sink SELECT id ,AVGUDAF(price) FROM kafka_source group by id > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7151) FLINK SQL support create temporary function and table
yuemeng created FLINK-7151: -- Summary: FLINK SQL support create temporary function and table Key: FLINK-7151 URL: https://issues.apache.org/jira/browse/FLINK-7151 Project: Flink Issue Type: New Feature Reporter: yuemeng Based on create temporary function and table.we can register a udf,udaf,udtf use sql: {code} CREATE TEMPORARY function 'TOPK' AS 'com..aggregate.udaf.distinctUdaf.topk.ITopKUDAF'; INSERT INTO db_sink SELECT id, TOPK(price, 5, 'DESC') FROM kafka_source GROUP BY id; {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7145) Flink SQL API should support multiple parameters for UserDefinedAggFunction
[ https://issues.apache.org/jira/browse/FLINK-7145?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16082016#comment-16082016 ] yuemeng commented on FLINK-7145: [~jark] TopN with different accumulate(...),there are some question: 1)which accumulate method should be be called in agg function since agg function only call accumulate ,no other operations to find the exact accumulate method 2) suppose UDAF such as topN have multiple parameters with different type,How to ensure that the correct function can be matched > Flink SQL API should support multiple parameters for UserDefinedAggFunction > --- > > Key: FLINK-7145 > URL: https://issues.apache.org/jira/browse/FLINK-7145 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: yuemeng > > UDAF such as topK and some other udaf with bloom filter need more than one > parameters ,we should make flink sql support this. > base on flink sql support DML and multiple parameters udaf,we can execute sql > like: > {code} > CREATE TEMPORARY function 'TOPK' AS > 'com..aggregate.udaf.distinctUdaf.topk.ITopKUDAF'; > INSERT INTO db_sink SELECT id, TOPK(price, 5, 'DESC') FROM kafka_source GROUP > BY id; > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (FLINK-7145) Flink SQL API should support multiple parameters for UserDefinedAggFunction
[ https://issues.apache.org/jira/browse/FLINK-7145?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16082016#comment-16082016 ] yuemeng edited comment on FLINK-7145 at 7/11/17 10:34 AM: -- [~jark] [~fhueske] TopN with different accumulate(...),there are some question: 1)which accumulate method should be be called in agg function since agg function only call accumulate ,no other operations to find the exact accumulate method 2) suppose UDAF such as topN have multiple parameters with different type,How to ensure that the correct function can be matched was (Author: yuemeng): [~jark] TopN with different accumulate(...),there are some question: 1)which accumulate method should be be called in agg function since agg function only call accumulate ,no other operations to find the exact accumulate method 2) suppose UDAF such as topN have multiple parameters with different type,How to ensure that the correct function can be matched > Flink SQL API should support multiple parameters for UserDefinedAggFunction > --- > > Key: FLINK-7145 > URL: https://issues.apache.org/jira/browse/FLINK-7145 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: yuemeng > > UDAF such as topK and some other udaf with bloom filter need more than one > parameters ,we should make flink sql support this. > base on flink sql support DML and multiple parameters udaf,we can execute sql > like: > {code} > CREATE TEMPORARY function 'TOPK' AS > 'com..aggregate.udaf.distinctUdaf.topk.ITopKUDAF'; > INSERT INTO db_sink SELECT id, TOPK(price, 5, 'DESC') FROM kafka_source GROUP > BY id; > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7145) Flink SQL API should support multiple parameters for UserDefinedAggFunction
[ https://issues.apache.org/jira/browse/FLINK-7145?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16082075#comment-16082075 ] yuemeng commented on FLINK-7145: [~fhueske] can you tell me where i can run follow sql {code} SELECT TUMBLE_END(rowtime, INTERVAL '10' MINUTE), top10(cnt, page) AS topPages FROM pageVisits GROUP BY TUMBLE(rowtime, INTERVAL '10' MINUTE) {code} i can't find any ut with this sql in current master is cnt and page field with same type? > Flink SQL API should support multiple parameters for UserDefinedAggFunction > --- > > Key: FLINK-7145 > URL: https://issues.apache.org/jira/browse/FLINK-7145 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: yuemeng > > UDAF such as topK and some other udaf with bloom filter need more than one > parameters ,we should make flink sql support this. > base on flink sql support DML and multiple parameters udaf,we can execute sql > like: > {code} > CREATE TEMPORARY function 'TOPK' AS > 'com..aggregate.udaf.distinctUdaf.topk.ITopKUDAF'; > INSERT INTO db_sink SELECT id, TOPK(price, 5, 'DESC') FROM kafka_source GROUP > BY id; > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-7145) Flink SQL API should support multiple parameters for UserDefinedAggFunction
[ https://issues.apache.org/jira/browse/FLINK-7145?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yuemeng closed FLINK-7145. -- Resolution: Duplicate > Flink SQL API should support multiple parameters for UserDefinedAggFunction > --- > > Key: FLINK-7145 > URL: https://issues.apache.org/jira/browse/FLINK-7145 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: yuemeng > > UDAF such as topK and some other udaf with bloom filter need more than one > parameters ,we should make flink sql support this. > base on flink sql support DML and multiple parameters udaf,we can execute sql > like: > {code} > CREATE TEMPORARY function 'TOPK' AS > 'com..aggregate.udaf.distinctUdaf.topk.ITopKUDAF'; > INSERT INTO db_sink SELECT id, TOPK(price, 5, 'DESC') FROM kafka_source GROUP > BY id; > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7151) FLINK SQL support create temporary function and table
[ https://issues.apache.org/jira/browse/FLINK-7151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16083380#comment-16083380 ] yuemeng commented on FLINK-7151: [~fhueske] thanks for your reply. I will split this issue with two different specific title.thank you [~jark] user can use temporary function to create temporary udf or udaf just like hive for their own job. suppose user want to use a special udf(not buitin function) for their own special job.what we do? maybe we can create a temporary function for this job. {code} Hive: create temporary function mytest as 'test.udf.ToLowerCase'; select mytest(test.name) from test; {code} temporary table is conceptual table,not a physical table,we can use it for streaming etl logical {code} CREATE TABLE kafka_source ( id INT, name VARCHAR(100), price INT, comment VARCHAR(100) ) PROPERTIES ( category = 'source', type = 'kafka', version = '0.9.0.1', separator = ',', topic = 'test', brokers = g:9092', group_id = 'test' ); CREATE TABLE tmp ( id INT, name VARCHAR(100), price INT ) PROPERTIES ( category = 'tmp' ); CREATE TABLE db_sink ( id INT, name VARCHAR(100), price INT ) PROPERTIES ( category = 'sink', type = 'mysql', table_name = 'test', url = 'jdbc:mysql://127.0.0.1:3307/ds?useUnicode=true&characterEncoding=UTF8', username = 'ds_dev', password = 's]k51_(>R' ); -- INSERT INTO db_sink SELECT id, name,price FROM kafka_source; --INSERT INTO db_sink SELECT id, name, sum(price) FROM kafka_source GROUP BY id,name HAVING sum(price) > 1 AND id < 10; INSERT INTO tmp SELECT id,name,price + 1.0 FROM kafka_source; INSERT into db_sink SELECT id,name,price + 1 as price FROM tmp; {code} > FLINK SQL support create temporary function and table > - > > Key: FLINK-7151 > URL: https://issues.apache.org/jira/browse/FLINK-7151 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: yuemeng > > Based on create temporary function and table.we can register a udf,udaf,udtf > use sql: > {code} > CREATE TEMPORARY function 'TOPK' AS > 'com..aggregate.udaf.distinctUdaf.topk.ITopKUDAF'; > INSERT INTO db_sink SELECT id, TOPK(price, 5, 'DESC') FROM kafka_source GROUP > BY id; > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7151) FLINK SQL support create temporary function and table
[ https://issues.apache.org/jira/browse/FLINK-7151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16083845#comment-16083845 ] yuemeng commented on FLINK-7151: [~jark] yes,extend the ExternalCatalog to support register external functions is a very good way ,i think,maybe we can register a function use sql ddl instead of api here some questions: 1) all buitin function in flink will be register to shcema each job cycle.it's a ugly way 2) agg function such as sum,avg should matched by calcite,we can directly get the exact function by aggCall instead of hard code > FLINK SQL support create temporary function and table > - > > Key: FLINK-7151 > URL: https://issues.apache.org/jira/browse/FLINK-7151 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: yuemeng > > Based on create temporary function and table.we can register a udf,udaf,udtf > use sql: > {code} > CREATE TEMPORARY function 'TOPK' AS > 'com..aggregate.udaf.distinctUdaf.topk.ITopKUDAF'; > INSERT INTO db_sink SELECT id, TOPK(price, 5, 'DESC') FROM kafka_source GROUP > BY id; > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7151) FLINK SQL support create temporary function and table
[ https://issues.apache.org/jira/browse/FLINK-7151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16089346#comment-16089346 ] yuemeng commented on FLINK-7151: [~fhueske] can you assgin this issue to me ,i want to implement this for flink thanks a lot > FLINK SQL support create temporary function and table > - > > Key: FLINK-7151 > URL: https://issues.apache.org/jira/browse/FLINK-7151 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: yuemeng > > Based on create temporary function and table.we can register a udf,udaf,udtf > use sql: > {code} > CREATE TEMPORARY function 'TOPK' AS > 'com..aggregate.udaf.distinctUdaf.topk.ITopKUDAF'; > INSERT INTO db_sink SELECT id, TOPK(price, 5, 'DESC') FROM kafka_source GROUP > BY id; > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)