[jira] [Commented] (FLINK-10145) Add replace supported in TableAPI and SQL
[ https://issues.apache.org/jira/browse/FLINK-10145?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16585019#comment-16585019 ] ASF GitHub Bot commented on FLINK-10145: yanghua commented on a change in pull request #6576: [FLINK-10145][table] Add replace supported in TableAPI and SQL URL: https://github.com/apache/flink/pull/6576#discussion_r211091901 ## File path: docs/dev/table/functions.md ## @@ -2784,6 +2808,18 @@ STRING.substring(INT1, INT2) + + +{% highlight scala %} +STRING.replace(STRING1, STRING2) +{% endhighlight %} + + +Returns a new string from STRING replaced STRING1(non-overlapping) with STRING2. +E.g., 'hello world'.replace('world', 'flink') returns 'hello flink'; 'ababab'.replace('abab', 'z') returns "zab" Review comment: replace all '' to "" This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add replace supported in TableAPI and SQL > - > > Key: FLINK-10145 > URL: https://issues.apache.org/jira/browse/FLINK-10145 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Guibo Pan >Assignee: Guibo Pan >Priority: Major > Labels: pull-request-available > > replace is an useful function for String. > for example: > {code:java} > select replace("Hello World", "World", "Flink") // return "Hello Flink" > select replace("ababab", "abab", "z") // return "zab" > {code} > It is supported as a UDF in Hive, more details please see[1] > [1]: > https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF#LanguageManualUDF-StringFunctions -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10145) Add replace supported in TableAPI and SQL
[ https://issues.apache.org/jira/browse/FLINK-10145?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16585022#comment-16585022 ] ASF GitHub Bot commented on FLINK-10145: yanghua commented on a change in pull request #6576: [FLINK-10145][table] Add replace supported in TableAPI and SQL URL: https://github.com/apache/flink/pull/6576#discussion_r211091915 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala ## @@ -410,3 +410,27 @@ case class ToBase64(child: Expression) extends UnaryExpression with InputTypeSpe override def toString: String = s"($child).toBase64" } + +/** + * Returns the string `str` with all non-overlapping occurrences Review comment: remove all the "``" , just describe the function This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add replace supported in TableAPI and SQL > - > > Key: FLINK-10145 > URL: https://issues.apache.org/jira/browse/FLINK-10145 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Guibo Pan >Assignee: Guibo Pan >Priority: Major > Labels: pull-request-available > > replace is an useful function for String. > for example: > {code:java} > select replace("Hello World", "World", "Flink") // return "Hello Flink" > select replace("ababab", "abab", "z") // return "zab" > {code} > It is supported as a UDF in Hive, more details please see[1] > [1]: > https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF#LanguageManualUDF-StringFunctions -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10145) Add replace supported in TableAPI and SQL
[ https://issues.apache.org/jira/browse/FLINK-10145?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16585024#comment-16585024 ] ASF GitHub Bot commented on FLINK-10145: yanghua commented on a change in pull request #6576: [FLINK-10145][table] Add replace supported in TableAPI and SQL URL: https://github.com/apache/flink/pull/6576#discussion_r211091930 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala ## @@ -149,6 +149,9 @@ class SqlExpressionTest extends ExpressionTestBase { testSqlApi("RPAD('hi',4,'??')", "hi??") testSqlApi("FROM_BASE64('aGVsbG8gd29ybGQ=')", "hello world") testSqlApi("TO_BASE64('hello world')", "aGVsbG8gd29ybGQ=") +testSqlApi("REPLACE('hello world', 'world', 'flink')", "hello flink") +testSqlApi("REPLACE('ababab', 'abab', 'Z')", "Zab") +testSqlApi("REPLACE('ababab', 'a', '')", "bbb") Review comment: It seems this test file is a show case, so just add one example . This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add replace supported in TableAPI and SQL > - > > Key: FLINK-10145 > URL: https://issues.apache.org/jira/browse/FLINK-10145 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Guibo Pan >Assignee: Guibo Pan >Priority: Major > Labels: pull-request-available > > replace is an useful function for String. > for example: > {code:java} > select replace("Hello World", "World", "Flink") // return "Hello Flink" > select replace("ababab", "abab", "z") // return "zab" > {code} > It is supported as a UDF in Hive, more details please see[1] > [1]: > https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF#LanguageManualUDF-StringFunctions -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10145) Add replace supported in TableAPI and SQL
[ https://issues.apache.org/jira/browse/FLINK-10145?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16585023#comment-16585023 ] ASF GitHub Bot commented on FLINK-10145: yanghua commented on a change in pull request #6576: [FLINK-10145][table] Add replace supported in TableAPI and SQL URL: https://github.com/apache/flink/pull/6576#discussion_r211091900 ## File path: docs/dev/table/functions.md ## @@ -2592,6 +2604,18 @@ STRING.substring(INT1, INT2) + + +{% highlight java %} +STRING.replace(STRING1, STRING2) +{% endhighlight %} + + +Returns a new string from STRING replaced STRING1(non-overlapping) with STRING2. +E.g., 'hello world'.replace('world', 'flink') returns 'hello flink'; 'ababab'.replace('abab', 'z') returns "zab" Review comment: replace all '' to "" This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add replace supported in TableAPI and SQL > - > > Key: FLINK-10145 > URL: https://issues.apache.org/jira/browse/FLINK-10145 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Guibo Pan >Assignee: Guibo Pan >Priority: Major > Labels: pull-request-available > > replace is an useful function for String. > for example: > {code:java} > select replace("Hello World", "World", "Flink") // return "Hello Flink" > select replace("ababab", "abab", "z") // return "zab" > {code} > It is supported as a UDF in Hive, more details please see[1] > [1]: > https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF#LanguageManualUDF-StringFunctions -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10145) Add replace supported in TableAPI and SQL
[ https://issues.apache.org/jira/browse/FLINK-10145?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16585021#comment-16585021 ] ASF GitHub Bot commented on FLINK-10145: yanghua commented on a change in pull request #6576: [FLINK-10145][table] Add replace supported in TableAPI and SQL URL: https://github.com/apache/flink/pull/6576#discussion_r211091929 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala ## @@ -94,6 +94,21 @@ class ScalarFunctionsTest extends ScalarTypesTestBase { "his is a test String.") } + @Test + def testReplace(): Unit = { Review comment: add more test for invalid input such : empty string, null or a string is not available of the original string This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add replace supported in TableAPI and SQL > - > > Key: FLINK-10145 > URL: https://issues.apache.org/jira/browse/FLINK-10145 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Guibo Pan >Assignee: Guibo Pan >Priority: Major > Labels: pull-request-available > > replace is an useful function for String. > for example: > {code:java} > select replace("Hello World", "World", "Flink") // return "Hello Flink" > select replace("ababab", "abab", "z") // return "zab" > {code} > It is supported as a UDF in Hive, more details please see[1] > [1]: > https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF#LanguageManualUDF-StringFunctions -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10145) Add replace supported in TableAPI and SQL
[ https://issues.apache.org/jira/browse/FLINK-10145?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16585020#comment-16585020 ] ASF GitHub Bot commented on FLINK-10145: yanghua commented on a change in pull request #6576: [FLINK-10145][table] Add replace supported in TableAPI and SQL URL: https://github.com/apache/flink/pull/6576#discussion_r211091912 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala ## @@ -459,6 +459,19 @@ trait ImplicitExpressionOperations { } } + /** +* Create a new string of the given string with non-overlapping occurrences +* of given search replaced with replacement. +* +* @param search Review comment: remove `@param` and `@return` to match the style of other methods This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add replace supported in TableAPI and SQL > - > > Key: FLINK-10145 > URL: https://issues.apache.org/jira/browse/FLINK-10145 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Guibo Pan >Assignee: Guibo Pan >Priority: Major > Labels: pull-request-available > > replace is an useful function for String. > for example: > {code:java} > select replace("Hello World", "World", "Flink") // return "Hello Flink" > select replace("ababab", "abab", "z") // return "zab" > {code} > It is supported as a UDF in Hive, more details please see[1] > [1]: > https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF#LanguageManualUDF-StringFunctions -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanghua commented on a change in pull request #6576: [FLINK-10145][table] Add replace supported in TableAPI and SQL
yanghua commented on a change in pull request #6576: [FLINK-10145][table] Add replace supported in TableAPI and SQL URL: https://github.com/apache/flink/pull/6576#discussion_r211091915 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala ## @@ -410,3 +410,27 @@ case class ToBase64(child: Expression) extends UnaryExpression with InputTypeSpe override def toString: String = s"($child).toBase64" } + +/** + * Returns the string `str` with all non-overlapping occurrences Review comment: remove all the "``" , just describe the function This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] yanghua commented on a change in pull request #6576: [FLINK-10145][table] Add replace supported in TableAPI and SQL
yanghua commented on a change in pull request #6576: [FLINK-10145][table] Add replace supported in TableAPI and SQL URL: https://github.com/apache/flink/pull/6576#discussion_r211091900 ## File path: docs/dev/table/functions.md ## @@ -2592,6 +2604,18 @@ STRING.substring(INT1, INT2) + + +{% highlight java %} +STRING.replace(STRING1, STRING2) +{% endhighlight %} + + +Returns a new string from STRING replaced STRING1(non-overlapping) with STRING2. +E.g., 'hello world'.replace('world', 'flink') returns 'hello flink'; 'ababab'.replace('abab', 'z') returns "zab" Review comment: replace all '' to "" This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] yanghua commented on a change in pull request #6576: [FLINK-10145][table] Add replace supported in TableAPI and SQL
yanghua commented on a change in pull request #6576: [FLINK-10145][table] Add replace supported in TableAPI and SQL URL: https://github.com/apache/flink/pull/6576#discussion_r211091912 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala ## @@ -459,6 +459,19 @@ trait ImplicitExpressionOperations { } } + /** +* Create a new string of the given string with non-overlapping occurrences +* of given search replaced with replacement. +* +* @param search Review comment: remove `@param` and `@return` to match the style of other methods This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] yanghua commented on a change in pull request #6576: [FLINK-10145][table] Add replace supported in TableAPI and SQL
yanghua commented on a change in pull request #6576: [FLINK-10145][table] Add replace supported in TableAPI and SQL URL: https://github.com/apache/flink/pull/6576#discussion_r211091901 ## File path: docs/dev/table/functions.md ## @@ -2784,6 +2808,18 @@ STRING.substring(INT1, INT2) + + +{% highlight scala %} +STRING.replace(STRING1, STRING2) +{% endhighlight %} + + +Returns a new string from STRING replaced STRING1(non-overlapping) with STRING2. +E.g., 'hello world'.replace('world', 'flink') returns 'hello flink'; 'ababab'.replace('abab', 'z') returns "zab" Review comment: replace all '' to "" This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] yanghua commented on a change in pull request #6576: [FLINK-10145][table] Add replace supported in TableAPI and SQL
yanghua commented on a change in pull request #6576: [FLINK-10145][table] Add replace supported in TableAPI and SQL URL: https://github.com/apache/flink/pull/6576#discussion_r211091929 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala ## @@ -94,6 +94,21 @@ class ScalarFunctionsTest extends ScalarTypesTestBase { "his is a test String.") } + @Test + def testReplace(): Unit = { Review comment: add more test for invalid input such : empty string, null or a string is not available of the original string This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] yanghua commented on a change in pull request #6576: [FLINK-10145][table] Add replace supported in TableAPI and SQL
yanghua commented on a change in pull request #6576: [FLINK-10145][table] Add replace supported in TableAPI and SQL URL: https://github.com/apache/flink/pull/6576#discussion_r211091930 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala ## @@ -149,6 +149,9 @@ class SqlExpressionTest extends ExpressionTestBase { testSqlApi("RPAD('hi',4,'??')", "hi??") testSqlApi("FROM_BASE64('aGVsbG8gd29ybGQ=')", "hello world") testSqlApi("TO_BASE64('hello world')", "aGVsbG8gd29ybGQ=") +testSqlApi("REPLACE('hello world', 'world', 'flink')", "hello flink") +testSqlApi("REPLACE('ababab', 'abab', 'Z')", "Zab") +testSqlApi("REPLACE('ababab', 'a', '')", "bbb") Review comment: It seems this test file is a show case, so just add one example . This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10145) Add replace supported in TableAPI and SQL
[ https://issues.apache.org/jira/browse/FLINK-10145?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16584999#comment-16584999 ] ASF GitHub Bot commented on FLINK-10145: Guibo-Pan commented on issue #6576: [FLINK-10145][table] Add replace supported in TableAPI and SQL URL: https://github.com/apache/flink/pull/6576#issuecomment-414101901 cc @yanghua @tillrohrmann a small feature~ This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add replace supported in TableAPI and SQL > - > > Key: FLINK-10145 > URL: https://issues.apache.org/jira/browse/FLINK-10145 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Guibo Pan >Assignee: Guibo Pan >Priority: Major > Labels: pull-request-available > > replace is an useful function for String. > for example: > {code:java} > select replace("Hello World", "World", "Flink") // return "Hello Flink" > select replace("ababab", "abab", "z") // return "zab" > {code} > It is supported as a UDF in Hive, more details please see[1] > [1]: > https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF#LanguageManualUDF-StringFunctions -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] Guibo-Pan commented on issue #6576: [FLINK-10145][table] Add replace supported in TableAPI and SQL
Guibo-Pan commented on issue #6576: [FLINK-10145][table] Add replace supported in TableAPI and SQL URL: https://github.com/apache/flink/pull/6576#issuecomment-414101901 cc @yanghua @tillrohrmann a small feature~ This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-9849) Create hbase connector for hbase version to 2.0.1
[ https://issues.apache.org/jira/browse/FLINK-9849?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-9849: -- Description: Currently hbase 1.4.3 is used for hbase connector. We should create connector for hbase 2.0.1 which was recently released. Since there are API changes for the 2.0.1 release, a new hbase connector is desirable. was: Currently hbase 1.4.3 is used for hbase connector. We should create connector for hbase 2.0.1 which was recently released. > Create hbase connector for hbase version to 2.0.1 > - > > Key: FLINK-9849 > URL: https://issues.apache.org/jira/browse/FLINK-9849 > Project: Flink > Issue Type: Improvement >Reporter: Ted Yu >Assignee: zhangminglei >Priority: Major > Labels: pull-request-available > Attachments: hbase-2.1.0.dep > > > Currently hbase 1.4.3 is used for hbase connector. > We should create connector for hbase 2.0.1 which was recently released. > Since there are API changes for the 2.0.1 release, a new hbase connector is > desirable. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9924) Upgrade zookeeper to 3.4.13
[ https://issues.apache.org/jira/browse/FLINK-9924?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-9924: -- Description: zookeeper 3.4.13 is being released. ZOOKEEPER-2959 fixes data loss when observer is used ZOOKEEPER-2184 allows ZooKeeper Java clients to work in dynamic IP (container / cloud) environment was: zookeeper 3.4.13 is being released. ZOOKEEPER-2959 fixes data loss when observer is used ZOOKEEPER-2184 allows ZooKeeper Java clients to work in dynamic IP (container / cloud) environment > Upgrade zookeeper to 3.4.13 > --- > > Key: FLINK-9924 > URL: https://issues.apache.org/jira/browse/FLINK-9924 > Project: Flink > Issue Type: Task >Reporter: Ted Yu >Assignee: vinoyang >Priority: Major > > zookeeper 3.4.13 is being released. > ZOOKEEPER-2959 fixes data loss when observer is used > ZOOKEEPER-2184 allows ZooKeeper Java clients to work in dynamic IP (container > / cloud) environment -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10168) support filtering files by modified/created time in StreamExecutionEnvironment.readFile()
[ https://issues.apache.org/jira/browse/FLINK-10168?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li updated FLINK-10168: - Description: support filtering files by modified/created time in {{StreamExecutionEnvironment.readFile()}} for example, in a source dir with lots of file, we only want to read files that is created or modified after a specific time. This API can expose a generic filter function of files, and let users define filtering rules. Currently Flink only supports filtering files by path. What this means is that, currently the API is {{FileInputFormat.setFilesFilters(PathFiter)}} that takes only one file path filter. A more generic API that can take more filters can look like this 1) {{FileInputFormat.setFilesFilters(List (PathFiter, ModifiedTileFilter, ... ))}} 2) or {{FileInputFormat.setFilesFilters(FileFiter),}} and {{FileFilter}} exposes all file attributes that Flink's file system can provide, like path and modified time I lean towards the 2nd option, because it gives users more flexibility to define complex filtering rules based on combinations of file attributes. was: support filtering files by modified/created time in {{StreamExecutionEnvironment.readFile()}} for example, in a source dir with lots of file, we only want to read files that is created or modified after a specific time. This API can expose a generic filter function of files, and let users define filtering rules. Currently Flink only supports filtering files by path. What this means is that, currently the API is {{FileInputFormat.setFilesFilters(PathFiter)}} that takes only one file path filter. A more generic API that can take more filters can look like this 1) {{FileInputFormat.setFilesFilters(List (PathFiter, ModifiedTileFilter, ... ))}} 2) or {{FileInputFormat.setFilesFilters(FileFiter),}} and {{FileFilter}} exposes all file attributes that Flink's file system can provide, like path and modified time I lean towards the 2nd option, because it gives users more flexibility to defined filtering rules based on combinations of file attributes. > support filtering files by modified/created time in > StreamExecutionEnvironment.readFile() > - > > Key: FLINK-10168 > URL: https://issues.apache.org/jira/browse/FLINK-10168 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.6.0 >Reporter: Bowen Li >Assignee: buptljy >Priority: Major > Fix For: 1.7.0 > > > support filtering files by modified/created time in > {{StreamExecutionEnvironment.readFile()}} > for example, in a source dir with lots of file, we only want to read files > that is created or modified after a specific time. > This API can expose a generic filter function of files, and let users define > filtering rules. Currently Flink only supports filtering files by path. What > this means is that, currently the API is > {{FileInputFormat.setFilesFilters(PathFiter)}} that takes only one file path > filter. A more generic API that can take more filters can look like this 1) > {{FileInputFormat.setFilesFilters(List (PathFiter, ModifiedTileFilter, ... > ))}} > 2) or {{FileInputFormat.setFilesFilters(FileFiter),}} and {{FileFilter}} > exposes all file attributes that Flink's file system can provide, like path > and modified time > I lean towards the 2nd option, because it gives users more flexibility to > define complex filtering rules based on combinations of file attributes. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10168) support filtering files by modified/created time in StreamExecutionEnvironment.readFile()
[ https://issues.apache.org/jira/browse/FLINK-10168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16584865#comment-16584865 ] Bowen Li commented on FLINK-10168: -- Hi [~wind_ljy] , please see the updated description of what I mean by 'generic'. File name, its prefix, and suffix can be deduced from file path. Though it might be good to abstract them and provide to users. > support filtering files by modified/created time in > StreamExecutionEnvironment.readFile() > - > > Key: FLINK-10168 > URL: https://issues.apache.org/jira/browse/FLINK-10168 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.6.0 >Reporter: Bowen Li >Assignee: buptljy >Priority: Major > Fix For: 1.7.0 > > > support filtering files by modified/created time in > {{StreamExecutionEnvironment.readFile()}} > for example, in a source dir with lots of file, we only want to read files > that is created or modified after a specific time. > This API can expose a generic filter function of files, and let users define > filtering rules. Currently Flink only supports filtering files by path. What > this means is that, currently the API is > {{FileInputFormat.setFilesFilters(PathFiter)}} that takes only one file path > filter. A more generic API that can take more filters can look like this 1) > {{FileInputFormat.setFilesFilters(List (PathFiter, ModifiedTileFilter, ... > ))}} > 2) or {{FileInputFormat.setFilesFilters(FileFiter),}} and {{FileFilter}} > exposes all file attributes that Flink's file system can provide, like path > and modified time > I lean towards the 2nd option, because it gives users more flexibility to > define complex filtering rules based on combinations of file attributes. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10168) support filtering files by modified/created time in StreamExecutionEnvironment.readFile()
[ https://issues.apache.org/jira/browse/FLINK-10168?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li updated FLINK-10168: - Description: support filtering files by modified/created time in {{StreamExecutionEnvironment.readFile()}} for example, in a source dir with lots of file, we only want to read files that is created or modified after a specific time. This API can expose a generic filter function of files, and let users define filtering rules. Currently Flink only supports filtering files by path. What this means is that, currently the API is {{FileInputFormat.setFilesFilters(PathFiter)}} that takes only one file path filter. A more generic API that can take more filters can look like this 1) {{FileInputFormat.setFilesFilters(List (PathFiter, ModifiedTileFilter, ... ))}} 2) or {{FileInputFormat.setFilesFilters(FileFiter),}} and {{FileFilter}} exposes all file attributes that Flink's file system can provide, like path and modified time I lean towards the 2nd option, because it gives users more flexibility to defined filtering rules based on combinations of file attributes. was: support filtering files by modified/created time in {{StreamExecutionEnvironment.readFile()}} for example, in a source dir with lots of file, we only want to read files that is created or modified after a specific time. This API can expose a generic filter function of files, and let users define filtering rules. Currently Flink only supports filtering files by path. > support filtering files by modified/created time in > StreamExecutionEnvironment.readFile() > - > > Key: FLINK-10168 > URL: https://issues.apache.org/jira/browse/FLINK-10168 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.6.0 >Reporter: Bowen Li >Assignee: buptljy >Priority: Major > Fix For: 1.7.0 > > > support filtering files by modified/created time in > {{StreamExecutionEnvironment.readFile()}} > for example, in a source dir with lots of file, we only want to read files > that is created or modified after a specific time. > This API can expose a generic filter function of files, and let users define > filtering rules. Currently Flink only supports filtering files by path. What > this means is that, currently the API is > {{FileInputFormat.setFilesFilters(PathFiter)}} that takes only one file path > filter. A more generic API that can take more filters can look like this 1) > {{FileInputFormat.setFilesFilters(List (PathFiter, ModifiedTileFilter, ... > ))}} > 2) or {{FileInputFormat.setFilesFilters(FileFiter),}} and {{FileFilter}} > exposes all file attributes that Flink's file system can provide, like path > and modified time > I lean towards the 2nd option, because it gives users more flexibility to > defined filtering rules based on combinations of file attributes. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10145) Add replace supported in TableAPI and SQL
[ https://issues.apache.org/jira/browse/FLINK-10145?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-10145: --- Labels: pull-request-available (was: ) > Add replace supported in TableAPI and SQL > - > > Key: FLINK-10145 > URL: https://issues.apache.org/jira/browse/FLINK-10145 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Guibo Pan >Assignee: Guibo Pan >Priority: Major > Labels: pull-request-available > > replace is an useful function for String. > for example: > {code:java} > select replace("Hello World", "World", "Flink") // return "Hello Flink" > select replace("ababab", "abab", "z") // return "zab" > {code} > It is supported as a UDF in Hive, more details please see[1] > [1]: > https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF#LanguageManualUDF-StringFunctions -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10145) Add replace supported in TableAPI and SQL
[ https://issues.apache.org/jira/browse/FLINK-10145?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16584852#comment-16584852 ] ASF GitHub Bot commented on FLINK-10145: Guibo-Pan opened a new pull request #6576: [FLINK-10145][table] Add replace supported in TableAPI and SQL URL: https://github.com/apache/flink/pull/6576 ## What is the purpose of the change *This pull request add replace supported in TableAPI and SQL* ## Brief change log - *Add replace supported in TableAPI and SQL* ## Verifying this change This change is already covered by existing tests, such as *ScalarFunctionsTest#testReplace*. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (docs) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add replace supported in TableAPI and SQL > - > > Key: FLINK-10145 > URL: https://issues.apache.org/jira/browse/FLINK-10145 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Guibo Pan >Assignee: Guibo Pan >Priority: Major > Labels: pull-request-available > > replace is an useful function for String. > for example: > {code:java} > select replace("Hello World", "World", "Flink") // return "Hello Flink" > select replace("ababab", "abab", "z") // return "zab" > {code} > It is supported as a UDF in Hive, more details please see[1] > [1]: > https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF#LanguageManualUDF-StringFunctions -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] Guibo-Pan opened a new pull request #6576: [FLINK-10145][table] Add replace supported in TableAPI and SQL
Guibo-Pan opened a new pull request #6576: [FLINK-10145][table] Add replace supported in TableAPI and SQL URL: https://github.com/apache/flink/pull/6576 ## What is the purpose of the change *This pull request add replace supported in TableAPI and SQL* ## Brief change log - *Add replace supported in TableAPI and SQL* ## Verifying this change This change is already covered by existing tests, such as *ScalarFunctionsTest#testReplace*. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (docs) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] jrthe42 edited a comment on issue #6575: [hotfix][table] Fix bug in RowtimeValidator when getting custom TimestampExtractor
jrthe42 edited a comment on issue #6575: [hotfix][table] Fix bug in RowtimeValidator when getting custom TimestampExtractor URL: https://github.com/apache/flink/pull/6575#issuecomment-414063096 A test has been added for using custom ```TimestampExtractor```. cc @twalthr . This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] jrthe42 commented on issue #6575: [hotfix][table] Fix bug in RowtimeValidator when getting custom TimestampExtractor
jrthe42 commented on issue #6575: [hotfix][table] Fix bug in RowtimeValidator when getting custom TimestampExtractor URL: https://github.com/apache/flink/pull/6575#issuecomment-414063096 A test has been added for using custom ```TimestampExtractor```. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9116) Introduce getAll and removeAll for MapState
[ https://issues.apache.org/jira/browse/FLINK-9116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16584760#comment-16584760 ] ASF GitHub Bot commented on FLINK-9116: --- klion26 commented on issue #6558: [FLINK-9116] Introduce getAll and removeAll for MapState URL: https://github.com/apache/flink/pull/6558#issuecomment-414059575 Hi, @StefanRRichter I have addressed your comments. Please have another review. In my opinion, add getAll/removeAll interface is convenience for these who want to get all values for a collection of keys, or remove a collection of keys from the state. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Introduce getAll and removeAll for MapState > --- > > Key: FLINK-9116 > URL: https://issues.apache.org/jira/browse/FLINK-9116 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > We have supported {{putAll(List)}} in {{MapState}}, I think we should also > support {{getAll(Iterable)}} and {{removeAll(Iterable)}} in {{MapState}}, it > can be convenient in some scenario. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] klion26 commented on issue #6558: [FLINK-9116] Introduce getAll and removeAll for MapState
klion26 commented on issue #6558: [FLINK-9116] Introduce getAll and removeAll for MapState URL: https://github.com/apache/flink/pull/6558#issuecomment-414059575 Hi, @StefanRRichter I have addressed your comments. Please have another review. In my opinion, add getAll/removeAll interface is convenience for these who want to get all values for a collection of keys, or remove a collection of keys from the state. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9962) allow users to specify TimeZone in DateTimeBucketer
[ https://issues.apache.org/jira/browse/FLINK-9962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16584676#comment-16584676 ] ASF GitHub Bot commented on FLINK-9962: --- kl0u commented on issue #6492: [FLINK-9962] [FS connector] allow users to specify TimeZone in DateTimeBucketer URL: https://github.com/apache/flink/pull/6492#issuecomment-414036401 Hi @bowenli86 ! Are you planning to continue working on this issue? If not, I could work on that next week ;) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > allow users to specify TimeZone in DateTimeBucketer > --- > > Key: FLINK-9962 > URL: https://issues.apache.org/jira/browse/FLINK-9962 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Affects Versions: 1.5.1, 1.6.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently {{DateTimeBucketer}} will return a bucket path by using local > timezone. We should add a {{timezone}} constructor param to allow users to > specify a timezone. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] kl0u commented on issue #6492: [FLINK-9962] [FS connector] allow users to specify TimeZone in DateTimeBucketer
kl0u commented on issue #6492: [FLINK-9962] [FS connector] allow users to specify TimeZone in DateTimeBucketer URL: https://github.com/apache/flink/pull/6492#issuecomment-414036401 Hi @bowenli86 ! Are you planning to continue working on this issue? If not, I could work on that next week ;) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10168) support filtering files by modified/created time in StreamExecutionEnvironment.readFile()
[ https://issues.apache.org/jira/browse/FLINK-10168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16584675#comment-16584675 ] buptljy commented on FLINK-10168: - [~phoenixjiangnan] Thanks! I think this will be a good improvement. We can define some readfile functions, which are based on the prefix and suffix of file names and last modified time. However, is it necessary to expose a generic filter function and let developers define their own file filters? Do we really have so many different application scenarios of readfile function? As far as I know, most cases can be covered by the three functions above. > support filtering files by modified/created time in > StreamExecutionEnvironment.readFile() > - > > Key: FLINK-10168 > URL: https://issues.apache.org/jira/browse/FLINK-10168 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.6.0 >Reporter: Bowen Li >Assignee: buptljy >Priority: Major > Fix For: 1.7.0 > > > support filtering files by modified/created time in > {{StreamExecutionEnvironment.readFile()}} > for example, in a source dir with lots of file, we only want to read files > that is created or modified after a specific time. > This API can expose a generic filter function of files, and let users define > filtering rules. Currently Flink only supports filtering files by path. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10163) Support CREATE VIEW in SQL Client
[ https://issues.apache.org/jira/browse/FLINK-10163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16584672#comment-16584672 ] Timo Walther commented on FLINK-10163: -- [~suez1224] Yes, I think the full implementation as [~hequn8128] added it to the issue should be done with the DDL FLIP. We need a proper parser for this that also allows escaping like \{{CREATE VIEW `My Table Name`}} and specifying optional column names and types. Also \{{DROP VIEW}} and \{{REPLACE VIEW}}. This should be integrated with the Table API. This issue descibes just a MVP solution for allowing the very basic \{{CREATE_VIEW name AS query}} for the SQL Client. Because there is currently no way of defining a similar functionality like \{{tableEnv.registerTable("name", ...)}} in Table API. > Support CREATE VIEW in SQL Client > - > > Key: FLINK-10163 > URL: https://issues.apache.org/jira/browse/FLINK-10163 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > > The possibility to define a name for a subquery would improve the usability > of the SQL Client. The SQL standard defines \{{CREATE VIEW}} for defining a > virtual table. > > Example: > {code} > CREATE VIEW viewName > [ '(' columnName [, columnName ]* ')' ] > AS Query > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-10168) support filtering files by modified/created time in StreamExecutionEnvironment.readFile()
[ https://issues.apache.org/jira/browse/FLINK-10168?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] buptljy reassigned FLINK-10168: --- Assignee: buptljy > support filtering files by modified/created time in > StreamExecutionEnvironment.readFile() > - > > Key: FLINK-10168 > URL: https://issues.apache.org/jira/browse/FLINK-10168 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.6.0 >Reporter: Bowen Li >Assignee: buptljy >Priority: Major > Fix For: 1.7.0 > > > support filtering files by modified/created time in > {{StreamExecutionEnvironment.readFile()}} > for example, in a source dir with lots of file, we only want to read files > that is created or modified after a specific time. > This API can expose a generic filter function of files, and let users define > filtering rules. Currently Flink only supports filtering files by path. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9407) Support orc rolling sink writer
[ https://issues.apache.org/jira/browse/FLINK-9407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16584667#comment-16584667 ] ASF GitHub Bot commented on FLINK-9407: --- phenixmzy edited a comment on issue #6075: [FLINK-9407] [hdfs connector] Support orc rolling sink writer URL: https://github.com/apache/flink/pull/6075#issuecomment-414035357 @zhangminglei the OrcFileWriter with BucketingSink can rolling when the batchSize is full? It seems that one record one file. the code: === def orcSchemaMetaInfo = String.format( "struct<%s,%s,%s...,%s>", "nt:string", "event_time:string", "event_id:string", .., "appname:string") def getRowSink(distPath : String) = { val sink = new BucketingSink[Row](distPath + "/with-bucket/") sink.setBatchSize(1024 * 1024 * 1024) .setBucketer(new DateTimeBucketer[[Row]] ("MMdd/HHmm")) .setWriter(new OrcFileWriter[[Row]] (orcSchemaMetaInfo)) .setPartPrefix("sdk-etl") sink } def getOrcRow(item : sdkItem) : Row={ val row = Row.of( item.getNt, item.getEvent_time, item.getEvent_id,.,item.getAppid,item.getAppname) row } ... val kafkaConsumer = new FlinkKafkaConsumer011(inputTopic, new SimpleStringSchema, params.getProperties) val messageStream = env.addSource(kafkaConsumer) .flatMap(in => SDKParse.parseSDK(in, inputTopic)) .filter(item => item != None) .flatMap(item => Some(item).get) .map(item => getOrcRow(item)) messageStream.addSink(getRowSink(distPath)) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support orc rolling sink writer > --- > > Key: FLINK-9407 > URL: https://issues.apache.org/jira/browse/FLINK-9407 > Project: Flink > Issue Type: New Feature > Components: filesystem-connector >Reporter: zhangminglei >Assignee: zhangminglei >Priority: Major > Labels: pull-request-available > > Currently, we only support {{StringWriter}}, {{SequenceFileWriter}} and > {{AvroKeyValueSinkWriter}}. I would suggest add an orc writer for rolling > sink. > Below, FYI. > I tested the PR and verify the results with spark sql. Obviously, we can get > the results of what we had written down before. But I will give more tests in > the next couple of days. Including the performance under compression with > short checkpoint intervals. And more UTs. > {code:java} > scala> spark.read.orc("hdfs://10.199.196.0:9000/data/hive/man/2018-07-06--21") > res1: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more > field] > scala> > scala> res1.registerTempTable("tablerice") > warning: there was one deprecation warning; re-run with -deprecation for > details > scala> spark.sql("select * from tablerice") > res3: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more > field] > scala> res3.show(3) > +-+---+---+ > | name|age|married| > +-+---+---+ > |Sagar| 26| false| > |Sagar| 30| false| > |Sagar| 34| false| > +-+---+---+ > only showing top 3 rows > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9407) Support orc rolling sink writer
[ https://issues.apache.org/jira/browse/FLINK-9407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16584665#comment-16584665 ] ASF GitHub Bot commented on FLINK-9407: --- phenixmzy edited a comment on issue #6075: [FLINK-9407] [hdfs connector] Support orc rolling sink writer URL: https://github.com/apache/flink/pull/6075#issuecomment-414035357 @zhangminglei the OrcFileWriter with BucketingSink can rolling when the batchSize is full? It seems that one record one file. the code: === def orcSchemaMetaInfo = String.format( "struct<%s,%s,%s...,%s>", "nt:string", "event_time:string", "event_id:string", .., "appname:string") def getRowSink(distPath : String) = { val sink = new BucketingSink[Row](distPath + "/with-bucket/") sink.setBatchSize(1024 * 1024 * 1024) .setBucketer(new DateTimeBucketer[Row]("MMdd/HHmm")) .setWriter(new OrcFileWriter[Row](orcSchemaMetaInfo)) .setPartPrefix("sdk-etl") sink } def getOrcRow(item : sdkItem) : Row={ val row = Row.of( item.getNt, item.getEvent_time, item.getEvent_id,.,item.getAppid,item.getAppname) row } ... val kafkaConsumer = new FlinkKafkaConsumer011(inputTopic, new SimpleStringSchema, params.getProperties) val messageStream = env.addSource(kafkaConsumer) .flatMap(in => SDKParse.parseSDK(in, inputTopic)) .filter(item => item != None) .flatMap(item => Some(item).get) .map(item => getOrcRow(item)) messageStream.addSink(getRowSink(distPath)) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support orc rolling sink writer > --- > > Key: FLINK-9407 > URL: https://issues.apache.org/jira/browse/FLINK-9407 > Project: Flink > Issue Type: New Feature > Components: filesystem-connector >Reporter: zhangminglei >Assignee: zhangminglei >Priority: Major > Labels: pull-request-available > > Currently, we only support {{StringWriter}}, {{SequenceFileWriter}} and > {{AvroKeyValueSinkWriter}}. I would suggest add an orc writer for rolling > sink. > Below, FYI. > I tested the PR and verify the results with spark sql. Obviously, we can get > the results of what we had written down before. But I will give more tests in > the next couple of days. Including the performance under compression with > short checkpoint intervals. And more UTs. > {code:java} > scala> spark.read.orc("hdfs://10.199.196.0:9000/data/hive/man/2018-07-06--21") > res1: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more > field] > scala> > scala> res1.registerTempTable("tablerice") > warning: there was one deprecation warning; re-run with -deprecation for > details > scala> spark.sql("select * from tablerice") > res3: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more > field] > scala> res3.show(3) > +-+---+---+ > | name|age|married| > +-+---+---+ > |Sagar| 26| false| > |Sagar| 30| false| > |Sagar| 34| false| > +-+---+---+ > only showing top 3 rows > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] phenixmzy edited a comment on issue #6075: [FLINK-9407] [hdfs connector] Support orc rolling sink writer
phenixmzy edited a comment on issue #6075: [FLINK-9407] [hdfs connector] Support orc rolling sink writer URL: https://github.com/apache/flink/pull/6075#issuecomment-414035357 @zhangminglei the OrcFileWriter with BucketingSink can rolling when the batchSize is full? It seems that one record one file. the code: === def orcSchemaMetaInfo = String.format( "struct<%s,%s,%s...,%s>", "nt:string", "event_time:string", "event_id:string", .., "appname:string") def getRowSink(distPath : String) = { val sink = new BucketingSink[Row](distPath + "/with-bucket/") sink.setBatchSize(1024 * 1024 * 1024) .setBucketer(new DateTimeBucketer[[Row]] ("MMdd/HHmm")) .setWriter(new OrcFileWriter[[Row]] (orcSchemaMetaInfo)) .setPartPrefix("sdk-etl") sink } def getOrcRow(item : sdkItem) : Row={ val row = Row.of( item.getNt, item.getEvent_time, item.getEvent_id,.,item.getAppid,item.getAppname) row } ... val kafkaConsumer = new FlinkKafkaConsumer011(inputTopic, new SimpleStringSchema, params.getProperties) val messageStream = env.addSource(kafkaConsumer) .flatMap(in => SDKParse.parseSDK(in, inputTopic)) .filter(item => item != None) .flatMap(item => Some(item).get) .map(item => getOrcRow(item)) messageStream.addSink(getRowSink(distPath)) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] phenixmzy edited a comment on issue #6075: [FLINK-9407] [hdfs connector] Support orc rolling sink writer
phenixmzy edited a comment on issue #6075: [FLINK-9407] [hdfs connector] Support orc rolling sink writer URL: https://github.com/apache/flink/pull/6075#issuecomment-414035357 @zhangminglei the OrcFileWriter with BucketingSink can rolling when the batchSize is full? It seems that one record one file. the code: === def orcSchemaMetaInfo = String.format( "struct<%s,%s,%s...,%s>", "nt:string", "event_time:string", "event_id:string", .., "appname:string") def getRowSink(distPath : String) = { val sink = new BucketingSink[Row](distPath + "/with-bucket/") sink.setBatchSize(1024 * 1024 * 1024) .setBucketer(new DateTimeBucketer[Row]("MMdd/HHmm")) .setWriter(new OrcFileWriter[Row](orcSchemaMetaInfo)) .setPartPrefix("sdk-etl") sink } def getOrcRow(item : sdkItem) : Row={ val row = Row.of( item.getNt, item.getEvent_time, item.getEvent_id,.,item.getAppid,item.getAppname) row } ... val kafkaConsumer = new FlinkKafkaConsumer011(inputTopic, new SimpleStringSchema, params.getProperties) val messageStream = env.addSource(kafkaConsumer) .flatMap(in => SDKParse.parseSDK(in, inputTopic)) .filter(item => item != None) .flatMap(item => Some(item).get) .map(item => getOrcRow(item)) messageStream.addSink(getRowSink(distPath)) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9407) Support orc rolling sink writer
[ https://issues.apache.org/jira/browse/FLINK-9407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16584664#comment-16584664 ] ASF GitHub Bot commented on FLINK-9407: --- phenixmzy commented on issue #6075: [FLINK-9407] [hdfs connector] Support orc rolling sink writer URL: https://github.com/apache/flink/pull/6075#issuecomment-414035357 @zhangminglei the OrcFileWriter with BucketingSink can rolling when the batchSize is full?It seems that one record one file. the code: === def orcSchemaMetaInfo = String.format( "struct<%s,%s,%s...,%s>", "nt:string", "event_time:string", "event_id:string", .., "appname:string") def getRowSink(distPath : String) = { val sink = new BucketingSink[Row](distPath + "/with-bucket/") sink.setBatchSize(1024 * 1024 * 1024) .setBucketer(new DateTimeBucketer[Row]("MMdd/HHmm")) .setWriter(new OrcFileWriter[Row](orcSchemaMetaInfo)) .setPartPrefix("sdk-etl") sink } def getOrcRow(item : sdkItem) : Row={ val row = Row.of( item.getNt, item.getEvent_time, item.getEvent_id,.,item.getAppid,item.getAppname) row } ... val kafkaConsumer = new FlinkKafkaConsumer011(inputTopic, new SimpleStringSchema, params.getProperties) val messageStream = env.addSource(kafkaConsumer) .flatMap(in => SDKParse.parseSDK(in, inputTopic)) .filter(item => item != None) .flatMap(item => Some(item).get) .map(item => getOrcRow(item)) messageStream.addSink(getRowSink(distPath)) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support orc rolling sink writer > --- > > Key: FLINK-9407 > URL: https://issues.apache.org/jira/browse/FLINK-9407 > Project: Flink > Issue Type: New Feature > Components: filesystem-connector >Reporter: zhangminglei >Assignee: zhangminglei >Priority: Major > Labels: pull-request-available > > Currently, we only support {{StringWriter}}, {{SequenceFileWriter}} and > {{AvroKeyValueSinkWriter}}. I would suggest add an orc writer for rolling > sink. > Below, FYI. > I tested the PR and verify the results with spark sql. Obviously, we can get > the results of what we had written down before. But I will give more tests in > the next couple of days. Including the performance under compression with > short checkpoint intervals. And more UTs. > {code:java} > scala> spark.read.orc("hdfs://10.199.196.0:9000/data/hive/man/2018-07-06--21") > res1: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more > field] > scala> > scala> res1.registerTempTable("tablerice") > warning: there was one deprecation warning; re-run with -deprecation for > details > scala> spark.sql("select * from tablerice") > res3: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more > field] > scala> res3.show(3) > +-+---+---+ > | name|age|married| > +-+---+---+ > |Sagar| 26| false| > |Sagar| 30| false| > |Sagar| 34| false| > +-+---+---+ > only showing top 3 rows > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] phenixmzy commented on issue #6075: [FLINK-9407] [hdfs connector] Support orc rolling sink writer
phenixmzy commented on issue #6075: [FLINK-9407] [hdfs connector] Support orc rolling sink writer URL: https://github.com/apache/flink/pull/6075#issuecomment-414035357 @zhangminglei the OrcFileWriter with BucketingSink can rolling when the batchSize is full?It seems that one record one file. the code: === def orcSchemaMetaInfo = String.format( "struct<%s,%s,%s...,%s>", "nt:string", "event_time:string", "event_id:string", .., "appname:string") def getRowSink(distPath : String) = { val sink = new BucketingSink[Row](distPath + "/with-bucket/") sink.setBatchSize(1024 * 1024 * 1024) .setBucketer(new DateTimeBucketer[Row]("MMdd/HHmm")) .setWriter(new OrcFileWriter[Row](orcSchemaMetaInfo)) .setPartPrefix("sdk-etl") sink } def getOrcRow(item : sdkItem) : Row={ val row = Row.of( item.getNt, item.getEvent_time, item.getEvent_id,.,item.getAppid,item.getAppname) row } ... val kafkaConsumer = new FlinkKafkaConsumer011(inputTopic, new SimpleStringSchema, params.getProperties) val messageStream = env.addSource(kafkaConsumer) .flatMap(in => SDKParse.parseSDK(in, inputTopic)) .filter(item => item != None) .flatMap(item => Some(item).get) .map(item => getOrcRow(item)) messageStream.addSink(getRowSink(distPath)) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10068) Add documentation for async/RocksDB-based timers
[ https://issues.apache.org/jira/browse/FLINK-10068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16584658#comment-16584658 ] ASF GitHub Bot commented on FLINK-10068: twalthr commented on issue #6504: [FLINK-10068][docs] Add documentation for RocksDB-based timers and st… URL: https://github.com/apache/flink/pull/6504#issuecomment-414035015 Thanks for the update @StefanRRichter. LGTM % the single comment. Feel free to merge this. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add documentation for async/RocksDB-based timers > > > Key: FLINK-10068 > URL: https://issues.apache.org/jira/browse/FLINK-10068 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > Documentation how to activate RocksDB based timers, and update that > snapshotting now works async, expect for heap-timers + > rocks-incremental-snapshot). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] twalthr commented on issue #6504: [FLINK-10068][docs] Add documentation for RocksDB-based timers and st…
twalthr commented on issue #6504: [FLINK-10068][docs] Add documentation for RocksDB-based timers and st… URL: https://github.com/apache/flink/pull/6504#issuecomment-414035015 Thanks for the update @StefanRRichter. LGTM % the single comment. Feel free to merge this. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10068) Add documentation for async/RocksDB-based timers
[ https://issues.apache.org/jira/browse/FLINK-10068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16584657#comment-16584657 ] ASF GitHub Bot commented on FLINK-10068: twalthr commented on a change in pull request #6504: [FLINK-10068][docs] Add documentation for RocksDB-based timers and st… URL: https://github.com/apache/flink/pull/6504#discussion_r211066338 ## File path: docs/dev/stream/operators/process_function.md ## @@ -277,19 +277,18 @@ Both types of timers (processing-time and event-time) are internally maintained The `TimerService` deduplicates timers per key and timestamp, i.e., there is at most one timer per key and timestamp. If multiple timers are registered for the same timestamp, the `onTimer()` method will be called just once. -**Note:** Flink synchronizes invocations of `onTimer()` and `processElement()`. Hence, users do not have to worry about concurrent modification of state. +Note Flink synchronizes invocations of `onTimer()` and `processElement()`. Hence, users do not have to worry about concurrent modification of state. ### Fault Tolerance Timers are fault tolerant and checkpointed along with the state of the application. In case of a failure recovery or when starting an application from a savepoint, the timers are restored. -**Note:** Checkpointed processing-time timers that were supposed to fire before their restoration, will fire immediately. +Note Checkpointed processing-time timers that were supposed to fire before their restoration, will fire immediately. This might happen when an application recovers from a failure or when it is started from a savepoint. -**Note:** Timers are always synchronously checkpointed, regardless of the configuration of the state backends. -Therefore, a large number of timers can significantly increase checkpointing time. -See the "Timer Coalescing" section for advice on how to reduce the number of timers. +Note Timers are always asynchronously checkpointed, except for the combination of RocksDB backend / with incremental snapshots / with heap-based timers (will be resolved with `FLINK-10026`). Review comment: If there is nothing more to say in your opinion, then I'm fine with this. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add documentation for async/RocksDB-based timers > > > Key: FLINK-10068 > URL: https://issues.apache.org/jira/browse/FLINK-10068 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > Documentation how to activate RocksDB based timers, and update that > snapshotting now works async, expect for heap-timers + > rocks-incremental-snapshot). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] twalthr commented on a change in pull request #6504: [FLINK-10068][docs] Add documentation for RocksDB-based timers and st…
twalthr commented on a change in pull request #6504: [FLINK-10068][docs] Add documentation for RocksDB-based timers and st… URL: https://github.com/apache/flink/pull/6504#discussion_r211066338 ## File path: docs/dev/stream/operators/process_function.md ## @@ -277,19 +277,18 @@ Both types of timers (processing-time and event-time) are internally maintained The `TimerService` deduplicates timers per key and timestamp, i.e., there is at most one timer per key and timestamp. If multiple timers are registered for the same timestamp, the `onTimer()` method will be called just once. -**Note:** Flink synchronizes invocations of `onTimer()` and `processElement()`. Hence, users do not have to worry about concurrent modification of state. +Note Flink synchronizes invocations of `onTimer()` and `processElement()`. Hence, users do not have to worry about concurrent modification of state. ### Fault Tolerance Timers are fault tolerant and checkpointed along with the state of the application. In case of a failure recovery or when starting an application from a savepoint, the timers are restored. -**Note:** Checkpointed processing-time timers that were supposed to fire before their restoration, will fire immediately. +Note Checkpointed processing-time timers that were supposed to fire before their restoration, will fire immediately. This might happen when an application recovers from a failure or when it is started from a savepoint. -**Note:** Timers are always synchronously checkpointed, regardless of the configuration of the state backends. -Therefore, a large number of timers can significantly increase checkpointing time. -See the "Timer Coalescing" section for advice on how to reduce the number of timers. +Note Timers are always asynchronously checkpointed, except for the combination of RocksDB backend / with incremental snapshots / with heap-based timers (will be resolved with `FLINK-10026`). Review comment: If there is nothing more to say in your opinion, then I'm fine with this. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10068) Add documentation for async/RocksDB-based timers
[ https://issues.apache.org/jira/browse/FLINK-10068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16584656#comment-16584656 ] ASF GitHub Bot commented on FLINK-10068: twalthr commented on a change in pull request #6504: [FLINK-10068][docs] Add documentation for RocksDB-based timers and st… URL: https://github.com/apache/flink/pull/6504#discussion_r211066305 ## File path: docs/dev/stream/operators/process_function.md ## @@ -277,19 +277,18 @@ Both types of timers (processing-time and event-time) are internally maintained The `TimerService` deduplicates timers per key and timestamp, i.e., there is at most one timer per key and timestamp. If multiple timers are registered for the same timestamp, the `onTimer()` method will be called just once. Review comment: Writing more docs about this might be a bigger change but moving 10 lines into a subsection on a different page can be done in this PR. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add documentation for async/RocksDB-based timers > > > Key: FLINK-10068 > URL: https://issues.apache.org/jira/browse/FLINK-10068 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > Documentation how to activate RocksDB based timers, and update that > snapshotting now works async, expect for heap-timers + > rocks-incremental-snapshot). -- This message was sent by Atlassian JIRA (v7.6.3#76005)