Re: Issue with flink 1.16 and hive dialect
Hi, Ram. Thanks for reaching out. 1: About Hive dialect issue, may be you're using JDK11? There's a known issue in FLINK-27450[1]. The main reason that Hive dosen't fully support JDK11. More specific to your case, it has been tracked in HIVE-21584[2]. Flink has upgrade the Hive 2.x version to 2.3.9 to include this patch. But unfortunately, IIRC, this patch is still not available in Hive 3.x. 2: About the creating table issue, thanks for reporting it. I tried it and it turns out that it's a bug. I have created FLINK-32596 [3] to trace it. It only happen with Flink dialect & partitioned table & Hive Catalog. In most case, we recommend user to use Hive dialect to created hive tables, then we miss the test to cover use Flink dialect to create partitioed table in Hive Catalog. So this bug has been hiden for a while. For your case, as a work around, I think you can try to create the table in Hive itself with the following SQL: CREATE TABLE testsource( `geo_altitude` FLOAT ) PARTITIONED by ( `date` STRING) tblproperties ( 'sink.partition-commit.delay'='1 s', 'sink.partition-commit.policy.kind'='metastore,success-file'); [1] https://issues.apache.org/jira/browse/FLINK-27450 [2] https://issues.apache.org/jira/browse/HIVE-21584 [3] https://issues.apache.org/jira/browse/FLINK-32596 Best regards, Yuxia 发件人: "ramkrishna vasudevan" 收件人: "User" , "dev" 发送时间: 星期五, 2023年 7 月 14日 下午 8:46:20 主题: Issue with flink 1.16 and hive dialect Hi All, I am not sure if this was already discussed in this forum. In our set up with 1.16.0 flink we have ensured that the setup has all the necessary things for Hive catalog to work. The flink dialect works fine functionally (with some issues will come to that later). But when i follow the steps here in [ https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/hive-compatibility/hive-dialect/queries/overview/#examples | https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/hive-compatibility/hive-dialect/queries/overview/#examples ] I am getting an exception once i set to hive dialect at org.apache.flink.table.client.SqlClient.main(SqlClient.java:161) [flink-sql-client-1.16.0-0.0-SNAPSHOT.jar:1.16.0-0.0-SNAPSHOT] Caused by: java.lang.ClassCastException: class jdk.internal.loader.ClassLoaders$AppClassLoader cannot be cast to class java.net.URLClassLoader (jdk.internal.loader.ClassLoaders$AppClassLoader and java.net.URLClassLoader are in module java.base of loader 'bootstrap') at org.apache.hadoop.hive.ql.session.SessionState.(SessionState.java:413) ~[flink-sql-connector-hive-3.1.2_2.12-1.16.0-0.0-SNAPSHOT.jar:1.16.0-0.0-SNAPSHOT] at org.apache.hadoop.hive.ql.session.SessionState.(SessionState.java:389) ~[flink-sql-connector-hive-3.1.2_2.12-1.16.0-0.0-SNAPSHOT.jar:1.16.0-0.0-SNAPSHOT] at org.apache.flink.table.planner.delegation.hive.HiveSessionState.(HiveSessionState.java:80) ~[flink-sql-connector-hive-3.1.2_2.12-1.16.0-0.0-SNAPSHOT.jar:1.16.0-0.0-SNAPSHOT] at org.apache.flink.table.planner.delegation.hive.HiveSessionState.startSessionState(HiveSessionState.java:128) ~[flink-sql-connector-hive-3.1.2_2.12-1.16.0-0.0-SNAPSHOT.jar:1.16.0-0.0-SNAPSHOT] at org.apache.flink.table.planner.delegation.hive.HiveParser.parse(HiveParser.java:210) ~[flink-sql-connector-hive-3.1.2_2.12-1.16.0-0.0-SNAPSHOT.jar:1.16.0-0.0-SNAPSHOT] at org.apache.flink.table.client.gateway.local.LocalExecutor.parseStatement(LocalExecutor.java:172) ~[flink-sql-client-1.16.0-0.0-SNAPSHOT.jar:1.16.0-0.0-SNAPSHOT] I have ensured the dialect related steps are completed followed including [ https://issues.apache.org/jira/browse/FLINK-25128 | https://issues.apache.org/jira/browse/FLINK-25128 ] In the flink catalog - if we create a table > CREATE TABLE testsource( > > `date` STRING, > `geo_altitude` FLOAT > ) > PARTITIONED by ( `date`) > > WITH ( > > 'connector' = 'hive', > 'sink.partition-commit.delay'='1 s', > 'sink.partition-commit.policy.kind'='metastore,success-file' > ); The parition always gets created on the last set of columns and not on the columns that we specify. Is this a known bug? Regards Ram
Re: [ANNOUNCE] Apache Flink has won the 2023 SIGMOD Systems Award
Congratulations! Best regards, Yuxia 发件人: "Pushpa Ramakrishnan" 收件人: "Xintong Song" 抄送: "dev" , "User" 发送时间: 星期一, 2023年 7 月 03日 下午 8:36:30 主题: Re: [ANNOUNCE] Apache Flink has won the 2023 SIGMOD Systems Award Congratulations \uD83E\uDD73 On 03-Jul-2023, at 3:30 PM, Xintong Song wrote: BQ_BEGIN Dear Community, I'm pleased to share this good news with everyone. As some of you may have already heard, Apache Flink has won the 2023 SIGMOD Systems Award [1]. "Apache Flink greatly expanded the use of stream data-processing." -- SIGMOD Awards Committee SIGMOD is one of the most influential data management research conferences in the world. The Systems Award is awarded to an individual or set of individuals to recognize the development of a software or hardware system whose technical contributions have had significant impact on the theory or practice of large-scale data management systems. Winning of the award indicates the high recognition of Flink's technological advancement and industry influence from academia. As an open-source project, Flink wouldn't have come this far without the wide, active and supportive community behind it. Kudos to all of us who helped make this happen, including the over 1,400 contributors and many others who contributed in ways beyond code. Best, Xintong (on behalf of the Flink PMC) [1] [ https://sigmod.org/2023-sigmod-systems-award/ | https://sigmod.org/2023-sigmod-systems-award/ ] BQ_END
Re: [Slack] Request to upload new invitation link
Hi, Stephen. Welcome to join Flink Slack channel. Here's my invitation link: https://join.slack.com/t/apache-flink/shared_invite/zt-1y7kmx7te-zUg1yfLdGu3Th9En_p4n~g Best regards, Yuxia 发件人: "Stephen Chu" 收件人: "User" 抄送: "Satyam Shanker" , "Vaibhav Gosain" , "Steve Jiang" 发送时间: 星期四, 2023年 6 月 29日 上午 12:49:21 主题: [Slack] Request to upload new invitation link Hi there, I'd love to join the Flink Slack channel, but it seems the link is outdated: [ https://join.slack.com/t/apache-flink/shared_invite/zt-1thin01ch-tYuj6Zwu8qf0QsivHY0anw | https://join.slack.com/t/apache-flink/shared_invite/zt-1thin01ch-tYuj6Zwu8qf0QsivHY0anw ] Would someone be able to update or send me a new invite link? Thanks, Stephen
Re: [DISCUSS] Hive dialect shouldn't fall back to Flink's default dialect
Hi, Jingsong. It's hard to provide an option regarding to the fact that we also want to decouple Hive with flink planner. If we still need this fall back behavior, we will still depend on `ParserImpl` provided by flink-table-planner on HiveParser. But to try best to minimize the impact to users and more user-friendly, I'll remind users may use set table.sql-dialect = default to switch to Flink's default dialect in error message when fail to parse the sql in HiveParser. Best regards, Yuxia Best regards, Yuxia - 原始邮件 - 发件人: "Jingsong Li" 收件人: "Rui Li" 抄送: "dev" , "yuxia" , "User" 发送时间: 星期二, 2023年 5 月 30日 下午 3:21:56 主题: Re: [DISCUSS] Hive dialect shouldn't fall back to Flink's default dialect +1, the fallback looks weird now, it is outdated. But, it is good to provide an option. I don't know if there are some users who depend on this fallback. Best, Jingsong On Tue, May 30, 2023 at 1:47 PM Rui Li wrote: > > +1, the fallback was just intended as a temporary workaround to run > catalog/module related statements with hive dialect. > > On Mon, May 29, 2023 at 3:59 PM Benchao Li wrote: >> >> Big +1 on this, thanks yuxia for driving this! >> >> yuxia 于2023年5月29日周一 14:55写道: >> >> > Hi, community. >> > >> > I want to start the discussion about Hive dialect shouldn't fall back to >> > Flink's default dialect. >> > >> > Currently, when the HiveParser fail to parse the sql in Hive dialect, >> > it'll fall back to Flink's default parser[1] to handle flink-specific >> > statements like "CREATE CATALOG xx with (xx);". >> > >> > As I‘m involving with Hive dialect and have some communication with >> > community users who use Hive dialectrecently, I'm thinking throw exception >> > directly instead of falling back to Flink's default dialect when fail to >> > parse the sql in Hive dialect >> > >> > Here're some reasons: >> > >> > First of all, it'll hide some error with Hive dialect. For example, we >> > found we can't use Hive dialect any more with Flink sql client in release >> > validation phase[2], finally we find a modification in Flink sql client >> > cause it, but our test case can't find it earlier for although HiveParser >> > faill to parse it but then it'll fall back to default parser and pass test >> > case successfully. >> > >> > Second, conceptually, Hive dialect should be do nothing with Flink's >> > default dialect. They are two totally different dialect. If we do need a >> > dialect mixing Hive dialect and default dialect , may be we need to propose >> > a new hybrid dialect and announce the hybrid behavior to users. >> > Also, It made some users confused for the fallback behavior. The fact >> > comes from I had been ask by community users. Throw an excpetioin directly >> > when fail to parse the sql statement in Hive dialect will be more >> > intuitive. >> > >> > Last but not least, it's import to decouple Hive with Flink planner[3] >> > before we can externalize Hive connector[4]. If we still fall back to Flink >> > default dialct, then we will need depend on `ParserImpl` in Flink planner, >> > which will block us removing the provided dependency of Hive dialect as >> > well as externalizing Hive connector. >> > >> > Although we hadn't announced the fall back behavior ever, but some users >> > may implicitly depend on this behavior in theirs sql jobs. So, I hereby >> > open the dicussion about abandoning the fall back behavior to make Hive >> > dialect clear and isoloted. >> > Please remember it won't break the Hive synatax but the syntax specified >> > to Flink may fail after then. But for the failed sql, you can use `SET >> > table.sql-dialect=default;` to switch to Flink dialect. >> > If there's some flink-specific statements we found should be included in >> > Hive dialect to be easy to use, I think we can still add them as specific >> > cases to Hive dialect. >> > >> > Look forwards to your feedback. I'd love to listen the feedback from >> > community to take the next steps. >> > >> > [1]: >> > https://github.com/apache/flink/blob/678370b18e1b6c4a23e5ce08f8efd05675a0cc17/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java#L348 >> > [2]:https://issues.apache.org/jira/browse/FLINK-26681 >> > [3]:https://issues.apache.org/jira/browse/FLINK-31413 >> > [4]:https://issues.apache.org/jira/browse/FLINK-30064 >> > >> > >> > >> > Best regards, >> > Yuxia >> > >> >> >> -- >> >> Best, >> Benchao Li > > > > -- > Best regards! > Rui Li
[DISCUSS] Hive dialect shouldn't fall back to Flink's default dialect
Hi, community . I want to start the discussion about Hive dialect shouldn't fall back to Flink's default dialect. Currently, when the HiveParser fail to parse the sql in Hive dialect, it'll fall back to Flink's default parser[1] to handle flink-specific statements like "CREATE CATALOG xx with (xx);". As I‘m involving with Hive dialect and have some communication with community users who use Hive dialectrecently, I'm thinking throw exception directly instead of falling back to Flink's default dialect when fail to parse the sql in Hive dialect Here're some reasons: First of all, it'll hide some error with Hive dialect. For example, we found we can't use Hive dialect any more with Flink sql client in release validation phase[2], finally we find a modification in Flink sql client cause it, but our test case can't find it earlier for although HiveParser faill to parse it but then it'll fall back to default parser and pass test case successfully. Second, conceptually, Hive dialect should be do nothing with Flink's default dialect. They are two totally different dialect. If we do need a dialect mixing Hive dialect and default dialect , may be we need to propose a new hybrid dialect and announce the hybrid behavior to users. Also, It made some users confused for the fallback behavior. The fact comes from I had been ask by community users. Throw an excpetioin directly when fail to parse the sql statement in Hive dialect will be more intuitive. Last but not least, it's import to decouple Hive with Flink planner[3] before we can externalize Hive connector[4]. If we still fall back to Flink default dialct, then we will need depend on `ParserImpl` in Flink planner, which will block us removing the provided dependency of Hive dialect as well as externalizing Hive connector. Although we hadn't announced the fall back behavior ever, but some users may implicitly depend on this behavior in theirs sql jobs. So, I hereby open the dicussion about abandoning the fall back behavior to make Hive dialect clear and isoloted. Please remember it won't break the Hive synatax but the syntax specified to Flink may fail after then. But for the failed sql, you can use `SET table.sql-dialect=default;` to switch to Flink dialect. If there's some flink-specific statements we found should be included in Hive dialect to be easy to use, I think we can still add them as specific cases to Hive dialect. Look forwards to your feedback. I'd love to listen the feedback from community to take the next steps. [1]:https://github.com/apache/flink/blob/678370b18e1b6c4a23e5ce08f8efd05675a0cc17/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java#L348 [2]:https://issues.apache.org/jira/browse/FLINK-26681 [3]:https://issues.apache.org/jira/browse/FLINK-31413 [4]:https://issues.apache.org/jira/browse/FLINK-30064 Best regards, Yuxia
Re: [ANNOUNCE] Flink Table Store Joins Apache Incubator as Apache Paimon(incubating)
congratulations! Best regards, Yuxia 发件人: "Andrew Otto" 收件人: "Matthias Pohl" 抄送: "Jing Ge" , "Leonard Xu" , "Yu Li" , "dev" , "User" , "user-zh" 发送时间: 星期一, 2023年 3 月 27日 下午 8:57:50 主题: Re: [ANNOUNCE] Flink Table Store Joins Apache Incubator as Apache Paimon(incubating) Exciting! If this ends up working well, Wikimedia Foundation would love to try it out! On Mon, Mar 27, 2023 at 8:39 AM Matthias Pohl via user < [ mailto:user@flink.apache.org | user@flink.apache.org ] > wrote: Congratulations and good luck with pushing the project forward. On Mon, Mar 27, 2023 at 2:35 PM Jing Ge via user < [ mailto:user@flink.apache.org | user@flink.apache.org ] > wrote: BQ_BEGIN Congrats! Best regards, Jing On Mon, Mar 27, 2023 at 2:32 PM Leonard Xu < [ mailto:xbjt...@gmail.com | xbjt...@gmail.com ] > wrote: BQ_BEGIN Congratulations! Best, Leonard BQ_BEGIN On Mar 27, 2023, at 5:23 PM, Yu Li < [ mailto:car...@gmail.com | car...@gmail.com ] > wrote: Dear Flinkers, As you may have noticed, we are pleased to announce that Flink Table Store has joined the Apache Incubator as a separate project called Apache Paimon(incubating) [1] [2] [3]. The new project still aims at building a streaming data lake platform for high-speed data ingestion, change data tracking and efficient real-time analytics, with the vision of supporting a larger ecosystem and establishing a vibrant and neutral open source community. We would like to thank everyone for their great support and efforts for the Flink Table Store project, and warmly welcome everyone to join the development and activities of the new project. Apache Flink will continue to be one of the first-class citizens supported by Paimon, and we believe that the Flink and Paimon communities will maintain close cooperation. 亲爱的Flinkers, 正如您可能已经注意到的,我们很高兴地宣布,Flink Table Store 已经正式加入 Apache 孵化器独立孵化 [1] [2] [3]。新项目的名字是 Apache Paimon(incubating),仍致力于打造一个支持高速数据摄入、流式数据订阅和高效实时分析的新一代流式湖仓平台。此外,新项目将支持更加丰富的生态,并建立一个充满活力和中立的开源社区。 在这里我们要感谢大家对 Flink Table Store 项目的大力支持和投入,并热烈欢迎大家加入新项目的开发和社区活动。Apache Flink 将继续作为 Paimon 支持的主力计算引擎之一,我们也相信 Flink 和 Paimon 社区将继续保持密切合作。 Best Regards, Yu (on behalf of the Apache Flink PMC and Apache Paimon PPMC) 致礼, 李钰(谨代表 Apache Flink PMC 和 Apache Paimon PPMC) [1] [ https://paimon.apache.org/ | https://paimon.apache.org/ ] [2] [ https://github.com/apache/incubator-paimon | https://github.com/apache/incubator-paimon ] [3] [ https://cwiki.apache.org/confluence/display/INCUBATOR/PaimonProposal | https://cwiki.apache.org/confluence/display/INCUBATOR/PaimonProposal ] BQ_END BQ_END BQ_END
Re: Are the Table API Connectors production ready?
The plan shows the filters has been pushed down. But remeber, although pused down, the filesystem table won't accept the filter. So, it'll be still like scan all files. Best regards, Yuxia 发件人: "Maryam Moafimadani" 收件人: "Hang Ruan" 抄送: "yuxia" , "ravi suryavanshi" , "Yaroslav Tkachenko" , "Shammon FY" , "User" 发送时间: 星期一, 2023年 3 月 13日 下午 10:07:57 主题: Re: Are the Table API Connectors production ready? Hi All, It's exciting to see file filtering in the plan for development. I am curious whether the following query on a filesystem connector would actually push down the filter on metadata `file.path`? Select score, `file.path` from MyUserTable WHERE `file.path` LIKE '%prefix_%' == Optimized Execution Plan == Calc(select=[score, file.path], where=[LIKE(file.path, '%2022070611284%')]) +- TableSourceScan(table=[[default_catalog, default_database, MyUserTable, filter=[LIKE(file.path, _UTF-16LE'%2022070611284%')]]], fields=[score, file.path]) Thanks, Maryam On Mon, Mar 13, 2023 at 8:55 AM Hang Ruan < [ mailto:ruanhang1...@gmail.com | ruanhang1...@gmail.com ] > wrote: Hi, yuxia, I would like to help to complete this task. Best, Hang yuxia < [ mailto:luoyu...@alumni.sjtu.edu.cn | luoyu...@alumni.sjtu.edu.cn ] > 于2023年3月13日周一 09:32写道: BQ_BEGIN Yeah, you're right. We don't provide filtering files with patterns. And actually we had already a jira[1] for it. I was intended to do this in the past, but don't have much time. Anyone who are insterested can take it over. We're happy to help review. [1] [ https://issues.apache.org/jira/browse/FLINK-17398 | https://issues.apache.org/jira/browse/FLINK-17398 ] Best regards, Yuxia 发件人: "User" < [ mailto:user@flink.apache.org | user@flink.apache.org ] > 收件人: "Yaroslav Tkachenko" < [ mailto:yaros...@goldsky.com | yaros...@goldsky.com ] >, "Shammon FY" < [ mailto:zjur...@gmail.com | zjur...@gmail.com ] > 抄送: "User" < [ mailto:user@flink.apache.org | user@flink.apache.org ] > 发送时间: 星期一, 2023年 3 月 13日 上午 12:36:46 主题: Re: Are the Table API Connectors production ready? Thanks a lot, Yaroslav and Shammon. I want to use the Filesystem Connector. I tried it works well till it is running. If the job is restarted. It processes all the files again. Could not find the move or delete option after collecting the files. Also, I could not find the filtering using patterns. Pattern matching is required as different files exist in the same folder. Regards, Ravi On Friday, 10 March, 2023 at 05:47:27 am IST, Shammon FY < [ mailto:zjur...@gmail.com | zjur...@gmail.com ] > wrote: Hi Ravi Agree with Yaroslav and if you find any problems in use, you can create an issue in jira [ https://issues.apache.org/jira/issues/?jql=project%20%3D%20FLINK | https://issues.apache.org/jira/issues/?jql=project%20%3D%20FLINK ] . I have used kafka/jdbc/hive in production too, they work well. Best, Shammon On Fri, Mar 10, 2023 at 1:42 AM Yaroslav Tkachenko < [ mailto:yaros...@goldsky.com | yaros...@goldsky.com ] > wrote: BQ_BEGIN Hi Ravi, All of them should be production ready. I've personally used half of them in production. Do you have any specific concerns? On Thu, Mar 9, 2023 at 9:39 AM [ http://ravi_suryavanshi.yahoo.com/ | ravi_suryavanshi.yahoo.com ] via user < [ mailto:user@flink.apache.org | user@flink.apache.org ] > wrote: BQ_BEGIN Hi, Can anyone help me here? Thanks and regards, Ravi On Monday, 27 February, 2023 at 09:33:18 am IST, [ http://ravi_suryavanshi.yahoo.com/ | ravi_suryavanshi.yahoo.com ] via user < [ mailto:user@flink.apache.org | user@flink.apache.org ] > wrote: Hi Team, In Flink 1.16.0, we would like to use some of the Table API Connectors for production. Kindly let me know if the below connectors are production ready or only for testing purposes. Name Version Source Sink [ https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/filesystem/ | Filesystem ] Bounded and Unbounded Scan, Lookup Streaming Sink, Batch Sink [ https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/elasticsearch/ | Elasticsearch ] 6.x & 7.x Not supported Streaming Sink, Batch Sink [ https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/opensearch/ | Opensearch ] 1.x & 2.x Not supported Streaming Sink, Batch Sink [ https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/kafka/ | Apache Kafka ] 0.10+ Unbounded Scan Streaming Sink, Batch Sink [ https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/dynamodb/ | Amazon DynamoDB ]Not supported Streaming S
Re: Are the Table API Connectors production ready?
Thanks Hang for taking it. Assigned to you~ Best regards, Yuxia 发件人: "Hang Ruan" 收件人: "yuxia" 抄送: "ravi suryavanshi" , "Yaroslav Tkachenko" , "Shammon FY" , "User" 发送时间: 星期一, 2023年 3 月 13日 下午 8:54:49 主题: Re: Are the Table API Connectors production ready? Hi, yuxia, I would like to help to complete this task. Best, Hang yuxia < [ mailto:luoyu...@alumni.sjtu.edu.cn | luoyu...@alumni.sjtu.edu.cn ] > 于2023年3月13日周一 09:32写道: Yeah, you're right. We don't provide filtering files with patterns. And actually we had already a jira[1] for it. I was intended to do this in the past, but don't have much time. Anyone who are insterested can take it over. We're happy to help review. [1] [ https://issues.apache.org/jira/browse/FLINK-17398 | https://issues.apache.org/jira/browse/FLINK-17398 ] Best regards, Yuxia 发件人: "User" < [ mailto:user@flink.apache.org | user@flink.apache.org ] > 收件人: "Yaroslav Tkachenko" < [ mailto:yaros...@goldsky.com | yaros...@goldsky.com ] >, "Shammon FY" < [ mailto:zjur...@gmail.com | zjur...@gmail.com ] > 抄送: "User" < [ mailto:user@flink.apache.org | user@flink.apache.org ] > 发送时间: 星期一, 2023年 3 月 13日 上午 12:36:46 主题: Re: Are the Table API Connectors production ready? Thanks a lot, Yaroslav and Shammon. I want to use the Filesystem Connector. I tried it works well till it is running. If the job is restarted. It processes all the files again. Could not find the move or delete option after collecting the files. Also, I could not find the filtering using patterns. Pattern matching is required as different files exist in the same folder. Regards, Ravi On Friday, 10 March, 2023 at 05:47:27 am IST, Shammon FY < [ mailto:zjur...@gmail.com | zjur...@gmail.com ] > wrote: Hi Ravi Agree with Yaroslav and if you find any problems in use, you can create an issue in jira [ https://issues.apache.org/jira/issues/?jql=project%20%3D%20FLINK | https://issues.apache.org/jira/issues/?jql=project%20%3D%20FLINK ] . I have used kafka/jdbc/hive in production too, they work well. Best, Shammon On Fri, Mar 10, 2023 at 1:42 AM Yaroslav Tkachenko < [ mailto:yaros...@goldsky.com | yaros...@goldsky.com ] > wrote: BQ_BEGIN Hi Ravi, All of them should be production ready. I've personally used half of them in production. Do you have any specific concerns? On Thu, Mar 9, 2023 at 9:39 AM [ http://ravi_suryavanshi.yahoo.com/ | ravi_suryavanshi.yahoo.com ] via user < [ mailto:user@flink.apache.org | user@flink.apache.org ] > wrote: BQ_BEGIN Hi, Can anyone help me here? Thanks and regards, Ravi On Monday, 27 February, 2023 at 09:33:18 am IST, [ http://ravi_suryavanshi.yahoo.com/ | ravi_suryavanshi.yahoo.com ] via user < [ mailto:user@flink.apache.org | user@flink.apache.org ] > wrote: Hi Team, In Flink 1.16.0, we would like to use some of the Table API Connectors for production. Kindly let me know if the below connectors are production ready or only for testing purposes. Name Version Source Sink [ https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/filesystem/ | Filesystem ] Bounded and Unbounded Scan, Lookup Streaming Sink, Batch Sink [ https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/elasticsearch/ | Elasticsearch ] 6.x & 7.x Not supported Streaming Sink, Batch Sink [ https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/opensearch/ | Opensearch ] 1.x & 2.x Not supported Streaming Sink, Batch Sink [ https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/kafka/ | Apache Kafka ] 0.10+ Unbounded Scan Streaming Sink, Batch Sink [ https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/dynamodb/ | Amazon DynamoDB ]Not supported Streaming Sink, Batch Sink [ https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/kinesis/ | Amazon Kinesis Data Streams ] Unbounded Scan Streaming Sink [ https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/firehose/ | Amazon Kinesis Data Firehose ] Not supported Streaming Sink [ https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/jdbc/ | JDBC ] Bounded Scan, LookupStreaming Sink, Batch Sink [ https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/hbase/ | Apache HBase ] 1.4.x & 2.2.x Bounded Scan, LookupStreaming Sink, Batch Sink [ https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/hive/overview/ | Apache Hive ] Thanks and regards BQ_END BQ_END
Re: Are the Table API Connectors production ready?
Yeah, you're right. We don't provide filtering files with patterns. And actually we had already a jira[1] for it. I was intended to do this in the past, but don't have much time. Anyone who are insterested can take it over. We're happy to help review. [1] https://issues.apache.org/jira/browse/FLINK-17398 Best regards, Yuxia 发件人: "User" 收件人: "Yaroslav Tkachenko" , "Shammon FY" 抄送: "User" 发送时间: 星期一, 2023年 3 月 13日 上午 12:36:46 主题: Re: Are the Table API Connectors production ready? Thanks a lot, Yaroslav and Shammon. I want to use the Filesystem Connector. I tried it works well till it is running. If the job is restarted. It processes all the files again. Could not find the move or delete option after collecting the files. Also, I could not find the filtering using patterns. Pattern matching is required as different files exist in the same folder. Regards, Ravi On Friday, 10 March, 2023 at 05:47:27 am IST, Shammon FY wrote: Hi Ravi Agree with Yaroslav and if you find any problems in use, you can create an issue in jira [ https://issues.apache.org/jira/issues/?jql=project%20%3D%20FLINK | https://issues.apache.org/jira/issues/?jql=project%20%3D%20FLINK ] . I have used kafka/jdbc/hive in production too, they work well. Best, Shammon On Fri, Mar 10, 2023 at 1:42 AM Yaroslav Tkachenko < [ mailto:yaros...@goldsky.com | yaros...@goldsky.com ] > wrote: Hi Ravi, All of them should be production ready. I've personally used half of them in production. Do you have any specific concerns? On Thu, Mar 9, 2023 at 9:39 AM [ http://ravi_suryavanshi.yahoo.com/ | ravi_suryavanshi.yahoo.com ] via user < [ mailto:user@flink.apache.org | user@flink.apache.org ] > wrote: BQ_BEGIN Hi, Can anyone help me here? Thanks and regards, Ravi On Monday, 27 February, 2023 at 09:33:18 am IST, [ http://ravi_suryavanshi.yahoo.com/ | ravi_suryavanshi.yahoo.com ] via user < [ mailto:user@flink.apache.org | user@flink.apache.org ] > wrote: Hi Team, In Flink 1.16.0, we would like to use some of the Table API Connectors for production. Kindly let me know if the below connectors are production ready or only for testing purposes. Name Version Source Sink [ https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/filesystem/ | Filesystem ] Bounded and Unbounded Scan, Lookup Streaming Sink, Batch Sink [ https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/elasticsearch/ | Elasticsearch ] 6.x & 7.x Not supported Streaming Sink, Batch Sink [ https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/opensearch/ | Opensearch ] 1.x & 2.x Not supported Streaming Sink, Batch Sink [ https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/kafka/ | Apache Kafka ] 0.10+ Unbounded Scan Streaming Sink, Batch Sink [ https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/dynamodb/ | Amazon DynamoDB ]Not supported Streaming Sink, Batch Sink [ https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/kinesis/ | Amazon Kinesis Data Streams ] Unbounded Scan Streaming Sink [ https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/firehose/ | Amazon Kinesis Data Firehose ] Not supported Streaming Sink [ https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/jdbc/ | JDBC ] Bounded Scan, LookupStreaming Sink, Batch Sink [ https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/hbase/ | Apache HBase ] 1.4.x & 2.2.x Bounded Scan, LookupStreaming Sink, Batch Sink [ https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/hive/overview/ | Apache Hive ] Thanks and regards BQ_END
Re: CSV File Sink in Streaming Use Case
Hi, as the doc said: 'The BulkFormat reads and decodes batches of records at a time.' So, the bulk is not binded to column format, the bulk writer for csv is indeed implemented in the Flink code. Actaully, you can use either Row or Bulk depending on what style you would like to write data. As for the doc missing for CSV BulkFormat and not public in flink-csv, I really don't know why. I guess the reason maybe Flink won't expose it the datastream api, but only expose to table api. Best regards, Yuxia 发件人: "User" 收件人: "User" 发送时间: 星期二, 2023年 3 月 07日 下午 7:35:47 主题: CSV File Sink in Streaming Use Case Hi, I am working on a Java DataStream application and need to implement a File sink with CSV format. I see that I have two options here - Row and Bulk ( [ https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/datastream/filesystem/#format-types-1 | https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/datastream/filesystem/#format-types-1 ] ) So for CSV file distribution which one should I use? Row or Bulk? I think the documentation is confusing for File connectors. Because I can see an example for PyFlink which uses a BulkWriter for CSV. But the same class is not public in flink-csv. So does Flink not support CSVBulkWriter for Java? And for Table API File sink explicitly supports CSV for Row format. But fails to mention anything about CSV in DataStream File sink. This all is just really confusing. Any leads on this are much appreciated. Thanks
Re: Example of dynamic table
What do your mean "try the feature of dynamic table", do you want to know the concept of dynamic table[1] or User-defined Sources & Sinks[2] with dynamic table? [1]: https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/dynamic_tables/ [2]: https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sourcessinks/ Best regards, Yuxia - 原始邮件 - 发件人: "Jie Han" 收件人: "User" 发送时间: 星期三, 2023年 3 月 08日 上午 7:54:06 主题: Example of dynamic table Hello community! I want to try the feature of dynamic table but do not find examples in the official doc. Is this part missing?
Re: Is there any API method for dynamic loading of the UDF jar
Flink 1.17 which is to be released recently will support it[1] in table environment[2]. No any difference in performance between TableEnvironment method calls and TableEnvironment.executeSql, the api is different but the runtime is same under the hood. You can choose any one of them according to your need / perference. [1] https://issues.apache.org/jira/browse/FLINK-27660 [2]: https://github.com/apache/flink/blob/56b124bcfd661a295ab8772d265c12de25f690ab/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java#L541 Best regards, Yuxia 发件人: "neha goyal" 收件人: "User" 发送时间: 星期一, 2023年 2 月 27日 上午 2:23:01 主题: Is there any API method for dynamic loading of the UDF jar Hello, In Flink 16, CREATE FUNCTION USING JAR functionality has been introduced where we can specify the jar resources and the jar can be located in a remote file system such as hdfs/s3. I don't see an alternative method for the same functionality using the TableEnvironment methods call, for example, createTemporarySystemFunction doesn't take any URI. Will these methods be provided in the future? Is there any difference in performance if we use TableEnvironment method calls vs TableEnvironment.executeSql for the same feature? which one is recommended? Thanks and regards
Re: Inconsistent data format of flink-training-repo and learn-flink doc
Seems in this pr[1], startTime and endTIme have been replaced with a single eveentTime, but the doc forget to update. Coud you please help create a JIRA[2] for it? And if you're interested to fix it, weclome to contribute. [1] https://github.com/apache/flink-training/pull/36 [2] https://issues.apache.org/jira/projects/FLINK/issues Best regards, Yuxia 发件人: "Zhongpu Chen" 收件人: "User" 发送时间: 星期二, 2023年 2 月 21日 下午 8:49:13 主题: Inconsistent data format of flink-training-repo and learn-flink doc Hi, The data format specified in flink-training-repo ( [ https://github.com/apache/flink-training/tree/release-1.16 | https://github.com/apache/flink-training/tree/release-1.16 ] ) shows that a TaixRide is either a start or an end one with the eventTime. However, the Java code in "Data Pipelines & ETL" ( [ https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/learn-flink/etl/ | https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/learn-flink/etl/ ] ) tried to get both "startTime" and "endTime". But in fact, those are not defined in flink-training-repo. ```java Interval rideInterval = new Interval ( ride . startTime , ride . endTime ); ``` I think such inconsistency would puzzle new comers of Flink. -- Zhongpu Chen
Re: Flink SQL support array transform function
May be you can try with a non-lambda function. But TBH, I haven't seen any Flink UDF that accepts function as parameter in my previous experience. I'm afraid that it's no allowed to pass a function as parameter. Best regards, Yuxia 发件人: "Xuekui" 收件人: "yuxia" 抄送: "fskmine" , "Caizhi Weng" , "User" 发送时间: 星期二, 2023年 2 月 21日 上午 11:25:48 主题: Re:Re: Flink SQL support array transform function Hi YuXia, Thanks for your advice. By adding the hint, the type validation can pass. But still I can't pass the function to this udf Here is my query select array_transform(ids, id -> id +1) from tmp_table The lambda function id -> id +1 can't be passed because "->" is not supported in calcite now. Exception in thread "main" org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered "- >" at line 3, column 40. at org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:74) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:660) Original Email Sender: "yuxia" < luoyu...@alumni.sjtu.edu.cn >; Sent Time: 2023/2/20 10:00 To: "Xuekui" < baixue...@foxmail.com >; Cc recipient: "fskmine" < fskm...@gmail.com >; "Caizhi Weng" < tsreape...@gmail.com >; "User" < user@flink.apache.org >; Subject: Re: Flink SQL support array transform function Hi, Xuekui. As said in the exception stack, may be you can try to provide hint for the function's parameters. class ArrayTransformFunction extends ScalarFunction { def eval ( @DataTypeHint("ARRAY") a: Array [Long], @DataTypeHint("RAW") function: Long => Long): Array [Long] = { a.map(e => function(e)) } } Hope it can help. For more detail, please refer to Flink doc[1] [1]: [ https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/#type-inference | https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/#type-inference ] Best regards, Yuxia 发件人: "Xuekui" 收件人: "fskmine" , "Caizhi Weng" 抄送: "User" 发送时间: 星期四, 2023年 2 月 16日 上午 10:54:05 主题: Re: Flink SQL support array transform function Hi Caizhi, I've tried to write UDF to support this function, but I found I can't pass the function parameter to udf because the data type of function is not supported. An exception throws in SQL validation. My UDF code: class ArrayTransformFunction extends ScalarFunction { def eval (a: Array [Long], function: Long => Long): Array [Long] = { a.map(e => function(e)) } } Exception: Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. An error occurred in the type inference logic of function 'transform'. at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:152) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:111) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:189) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:77) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:660) at SQLTest$.main(SQLTest.scala:44) at SQLTest.main(SQLTest.scala) Caused by: org.apache.flink.table.api.ValidationException: An error occurred in the type inference logic of function 'transform'. at org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToBridgingSqlFunction(FunctionCatalogOperatorTable.java:163) at org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToSqlFunction(FunctionCatalogOperatorTable.java:146) at org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lambda$lookupOperatorOverloads$0(FunctionCatalogOperatorTable.java:100) at java.util.Optional.flatMap(Optional.java:241) at org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lookupOperatorOverloads(FunctionCatalogOperatorTable.java:98) at org.apache.calcite.sql.util.ChainedSqlOperatorTable.lookupOperatorOverloads(ChainedSqlOperatorTable.java:67) at org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1260) at org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1275) at org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1245) at org.apache.calcite.sql.validate.SqlValidatorImp
Re: Issue with de-serializing CompiledPlan and UNNEST_ROWS in Table API
Hi, Daniel Henneberger. Thanks for reporting. It seems a bug to me. Could you please help create a Jira[1] for it? As a workaround, is it possible not to use UNNEST? May be you can try to use EXPLODE function for the Flink planner will rewrites UNNEST to explode function in implementation[2]. [1] https://issues.apache.org/jira/projects/FLINK/issues/ [2] https://github.com/apache/flink/blob/bf342d2f67a46e5266c3595734574db270f1b48c/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalUnnestRule.scala Best regards, Yuxia 发件人: "Daniel Henneberger" 收件人: "User" 发送时间: 星期三, 2023年 2 月 22日 上午 5:35:02 主题: Issue with de-serializing CompiledPlan and UNNEST_ROWS in Table API Dear Apache Flink community, I could use some help with a serialization issue I'm having while using the Table API. Specifically, I'm trying to deserialize a serialized CompiledPlan, but I'm running into trouble with the UNNEST_ROWS operation. It seems that the CompilePlan deserializer isn't looking up any functions in the BuiltInFunctionDefinitions class, which is causing the de-serialization to fail. Do any of you have experience with this issue or know of a workaround for serializing a Table API plan? Below is code to replicate. Thanks, Daniel Henneberger private void test () { EnvironmentSettings settings = EnvironmentSettings . newInstance ().inStreamingMode().build(); TableEnvironment tEnv = TableEnvironment . create ( settings ); // Create a table of values Table table = tEnv .fromValues(createNestedDatatype(), Row . of ( List . of ( Row . of ( "nested" )), "name" )); tEnv .createTemporaryView( "table1" , table ); // Invoke the unnest operation Table unnested = tEnv .sqlQuery( "SELECT name, nested \n " + "FROM table1 CROSS JOIN UNNEST(arr) AS t (nested)" ); StatementSet statementSet = tEnv .createStatementSet(); statementSet .addInsert( TableDescriptor . forConnector ( "print" ).build(), unnested ); // Serialize the plan CompiledPlan plan = statementSet .compilePlan(); String json = plan .asJsonString(); // Attempt to load the plan // This fails with the error 'Could not resolve internal system function '$UNNEST_ROWS$1'. This is a bug, please file an issue.' CompiledPlan plan2 = tEnv .loadPlan( PlanReference . fromJsonString ( json )); plan2 .execute().print(); } private DataType createNestedDatatype () { return DataTypes . ROW ( DataTypes . FIELD ( "arr" , DataTypes . ARRAY ( DataTypes . ROW ( DataTypes . FIELD ( "nested" , DataTypes . STRING ()) ))), DataTypes . FIELD ( "name" , DataTypes . STRING ())); }
Re: Metrics or runtimeContext in global commit
It seems no other way to get the runtimeContext in a global commit. For me, I think it's reasoable to propose the fetature. I added flink-devs channel for more attention/discussion in flink devs. Best regards, Yuxia - 原始邮件 - 发件人: "Tobias Fröhlich" 收件人: "User" 发送时间: 星期二, 2023年 2 月 14日 下午 9:26:34 主题: Metrics or runtimeContext in global commit Dear flink team, I would like to use metrics (which are then written to an influxdb) in the method org.apache.flink.api.connector.sink2.Committer::commit(Collection> committables) that I use for global commit. I use the helper method StandardSinkTopologies.addGlobalCommitter(...) to define the post-commit topology. The problem is: When I implement the interface Committer, I cannot get the runtimeContext that I need for the metrics, because it is not an Operator. The only solution I found was by cloning the flink source code and amending it in the following way: 1. declaring an abstract class "CommitterWithRuntimeContext" that implements Committer and has: - an additional field for the runtimeContext - setter and getter for this field - an abstract method "void init()" 2. in the setup() method of GlobalCommitterOperator (which is an operator and thus has a runtimeContext) adding the following lines at the end: if (committer instanceof CommitterWithRuntimeContext) { ((CommitterWithRuntimeContext) committer).setRuntimeContext(getRuntimeContext()); ((CommitterWithRuntimeContext) committer).init(); } I can then implement the method CommitterWithRuntimeContext::init() in our code and call the method CommitterWithRuntimeContext::getRuntimeContext() when I need the runtimeContext. Is there another way to get the runtimeContext in a global commit? If not, is it justified to propose a feature request for a future release, where the global commit method can be implemented in a way that the user has access to the runtimeContext? Best regards and thanks in advance Tobias Fröhlich
Re: Flink SQL support array transform function
Hi, Xuekui. As said in the exception stack, may be you can try to provide hint for the function's parameters. class ArrayTransformFunction extends ScalarFunction { def eval ( @DataTypeHint("ARRAY") a: Array [Long], @DataTypeHint("RAW") function: Long => Long): Array [Long] = { a.map(e => function(e)) } } Hope it can help. For more detail, please refer to Flink doc[1] [1]: [ https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/#type-inference | https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/#type-inference ] Best regards, Yuxia 发件人: "Xuekui" 收件人: "fskmine" , "Caizhi Weng" 抄送: "User" 发送时间: 星期四, 2023年 2 月 16日 上午 10:54:05 主题: Re: Flink SQL support array transform function Hi Caizhi, I've tried to write UDF to support this function, but I found I can't pass the function parameter to udf because the data type of function is not supported. An exception throws in SQL validation. My UDF code: class ArrayTransformFunction extends ScalarFunction { def eval (a: Array [Long], function: Long => Long): Array [Long] = { a.map(e => function(e)) } } Exception: Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. An error occurred in the type inference logic of function 'transform'. at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:152) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:111) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:189) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:77) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:660) at SQLTest$.main(SQLTest.scala:44) at SQLTest.main(SQLTest.scala) Caused by: org.apache.flink.table.api.ValidationException: An error occurred in the type inference logic of function 'transform'. at org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToBridgingSqlFunction(FunctionCatalogOperatorTable.java:163) at org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToSqlFunction(FunctionCatalogOperatorTable.java:146) at org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lambda$lookupOperatorOverloads$0(FunctionCatalogOperatorTable.java:100) at java.util.Optional.flatMap(Optional.java:241) at org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lookupOperatorOverloads(FunctionCatalogOperatorTable.java:98) at org.apache.calcite.sql.util.ChainedSqlOperatorTable.lookupOperatorOverloads(ChainedSqlOperatorTable.java:67) at org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1260) at org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1275) at org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1245) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1009) at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:724) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:147) ... 6 more Caused by: org.apache.flink.table.api.ValidationException: Could not extract a valid type inference for function class 'udf.ArrayTransformFunction'. Please check for implementation mistakes and/or provide a corresponding hint. at org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:333) at org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInference(TypeInferenceExtractor.java:150) at org.apache.flink.table.types.extraction.TypeInferenceExtractor.forScalarFunction(TypeInferenceExtractor.java:83) at org.apache.flink.table.functions.ScalarFunction.getTypeInference(ScalarFunction.java:143) at org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToBridgingSqlFunction(FunctionCatalogOperatorTable.java:160) ... 17 more Caused by: org.apache.flink.table.api.ValidationException: Error in extracting a signature to output mapping. at org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:333) at org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractOutputMapping(FunctionMappingExtractor.java:117) at org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInferenceOrError(TypeInferenceExtractor.java:161) at org.apache.fli
Re: KafkaSink handling message size produce errors
Hi, Hatem. I think there is no way to catch the exception and then ignore it in current implementation for KafkaSink. You may also need to extend the KafkaSink. Best regards, Yuxia 发件人: "Hatem Mostafa" 收件人: "User" 发送时间: 星期四, 2023年 2 月 16日 下午 9:32:44 主题: KafkaSink handling message size produce errors Hello, I am writing a flink job that reads and writes into kafka, it is using a window operator and eventually writing the result of the window into a kafka topic. The accumulated data can exceed the maximum message size after compression on the producer level. I want to be able to catch the exception coming from the producer and ignore this window. I could not find a way to do that in [ https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#kafka-sink | KafkaSink ] , is there a way to do so? I attached here an example of an error that I would like to handle gracefully. This question is similar to one that was asked on stackoverflow [ https://stackoverflow.com/questions/52308911/how-to-handle-exceptions-in-kafka-sink | here ] but the answer is relevant for older versions of flink. Regards, Hatem
Re: Non-Determinism in Table-API with Kafka and Event Time
HI, Theo. I'm wondering what the Event-Time-Windowed Query you are using looks like. For example, how do you define the watermark? Considering you read records from the 10 partitions, and it may well that the records will arrive the window process operator out of order. Is it possible that the records exceed the watermark, but there're still some records will arrive? If that's the case, every time, the records used to calculate result may well different and then result in non-determinism result. Best regards, Yuxia - 原始邮件 - 发件人: "Theodor Wübker" 收件人: "User" 发送时间: 星期日, 2023年 2 月 12日 下午 4:25:45 主题: Non-Determinism in Table-API with Kafka and Event Time Hey everyone, I experience non-determinism in my Table API Program at the moment and (as a relatively unexperienced Flink and Kafka user) I can’t really explain to myself why it happens. So, I have a topic with 10 Partitions and a bit of Data on it. Now I run a simple SELECT * query on this, that moves some attributes around and writes everything on another topic with 10 partitions. Then, on this topic I run a Event-Time-Windowed Query. Now I experience Non-Determinism: The results of the windowed query differ with every execution. I thought this might be, because the SELECT query wrote the data to the partitioned topic without keys. So I tried it again with the same key I used for the original topic. It resulted in the exact same topic structure. Now when I run the Event-Time-Windowed query, I get incorrect results (too few result-entries). I have already read a lot of the Docs on this and can’t seem to figure it out. I would much appreciate, if someone could shed a bit of light on this. Is there anything in particular I should be aware of, when reading partitioned topics and running an event time query on that? Thanks :) Best, Theo
Re: Flink Hudi HMS Catalog problem
HI, Flink provides HiveCatalog which can store native Hive table and other type Flink table(more exactly, a DDL mapping), with which, Flink can access Hive table and other Flink tables. Does it meet you requirement? Best regards, Yuxia 发件人: "melin li" 收件人: "User" 发送时间: 星期三, 2023年 2 月 08日 下午 5:51:34 主题: Flink Hudi HMS Catalog problem Flink SQL reads and writes Hudi and synchronizes Hive tables via the Hudi HMS Catalog, If the hive database has both the parquet table and the hudi table, two different flink catalogs need to be registered, causing problems. Not very friendly for data analysts to use. Yes spark does not have this problem, you can use spark_catalog catalog to access hudi and parquet tables, not sure if this problem is solved in hudi or flink?
Re: Seeking suggestions for ingesting large amount of data from S3
Hi, Eric. Thanks for reaching out. I'm wondering how do you use the Table API to ingest the data. Since the OOM is too general, do you have any clue for OOM? May be you can use jmap to what occupy the most of memory. If find, you can try to figure out what's the reason, is it cause by memory lack or others. Btw, have ever tried with Flink SQL to ingeset the data. Does the OOM still happen? Best regards, Yuxia 发件人: "Yang Liu" 收件人: "User" 发送时间: 星期五, 2023年 2 月 10日 上午 5:10:49 主题: Seeking suggestions for ingesting large amount of data from S3 Hi all, We are trying to ingest large amounts of data (20TB) from S3 using Flink filesystem connector to bootstrap a Hudi table. Data are well partitioned in S3 by date/time, but we have been facing OOM issues in Flink jobs, so we wanted to update the Flink job to ingest the data chunk by chuck (partition by partition) by some kind of looping instead of all at once. Curious what’s the recommended way to do this in Flink. I believe this should be a common use case, so hope to get some ideas here. We have been using Table APIs, but open to other APIs. Thanks & Regards Eric
Re: Unsubscribe
Hi. To unsubscribe, you should send email to user-unsubscr...@flink.apache.org with any contents or subject. Please see more in the Flink Doc[1] [1] https://flink.apache.org/community.html#how-to-subscribe-to-a-mailing-list Best regards, Yuxia 发件人: "liang ji" 收件人: "User" 发送时间: 星期三, 2023年 2 月 08日 下午 2:10:03 主题: Unsubscribe
Re: Unsubscribe
Hi, All. To unsubscribe, you can send email to user-unsubscr...@flink.apache.org with any contents or subject. Please see more in the Flink Doc[1] [1] https://flink.apache.org/community.html#how-to-subscribe-to-a-mailing-list Best regards, Yuxia 发件人: "Ragini Manjaiah" 收件人: "Soumen Choudhury" 抄送: "User" 发送时间: 星期三, 2023年 2 月 08日 上午 11:06:30 主题: Re: Unsubscribe Hi Soumen, I want to unsubscribe from this mailing list. Thanks & Regards Ragini Manjaiah On Fri, Feb 3, 2023 at 4:07 PM Soumen Choudhury < [ mailto:sou@gmail.com | sou@gmail.com ] > wrote: -- Regards Soumen Choudhury Cell : +91865316168 mail to : [ mailto:sou@gmail.com | sou@gmail.com ]
Re: I want to subscribe users' questions
Maybe you will also be interested in joining Flink Slack, here is my invite link for joining Flink Slack: https://join.slack.com/t/apache-flink/shared_invite/zt-1obpql04h-R3o5XM8d~Siyl3KGldkl2Q Best regards, Yuxia - 原始邮件 - 发件人: "guanyuan chen" 收件人: "User" , "user-zh" 发送时间: 星期五, 2023年 2 月 03日 下午 7:48:55 主题: I want to subscribe users' questions Hi, My name is Guanyuan Chen.I am a big data development engineer, tencent wechat department, china. I have 4 years experience in flink developing, and want to subscribe flink's development news and help someone developing flink job willingly. Thanks a lot.
Re: Unable to do event time window aggregation with Kafka source
Hi, Lucas. What do you mean by saying "unable to do event time window aggregation with watermarkedStream"? What exception it will throw? Best regards, Yuxia 发件人: "wei_yuze" 收件人: "User" 发送时间: 星期二, 2023年 2 月 07日 下午 1:43:59 主题: Unable to do event time window aggregation with Kafka source Hello! I was unable to do event time window aggregation with Kafka source, but had no problem with "fromElement" source. The code is attached as follow. The code has two data sources, named "streamSource" and "kafkaSource" respectively. The program works well with "streamSource", but not with "watermarkedStream". public class WindowReduceTest2 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 使用fromElement数据源 DataStreamSource streamSource = env.fromElements( new Event2("Alice", "./home", "2023-02-04 17:10:11"), new Event2("Bob", "./cart", "2023-02-04 17:10:12"), new Event2("Alice", "./home", "2023-02-04 17:10:13"), new Event2("Alice", "./home", "2023-02-04 17:10:15"), new Event2("Cary", "./home", "2023-02-04 17:10:16"), new Event2("Cary", "./home", "2023-02-04 17:10:16") ); // 使用Kafka数据源 JsonDeserializationSchema jsonFormat = new JsonDeserializationSchema<>(Event2.class); KafkaSource source = KafkaSource.builder() .setBootstrapServers(Config.KAFKA_BROKERS) .setTopics(Config.KAFKA_TOPIC) .setGroupId("my-group") .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(jsonFormat) .build(); DataStreamSource kafkaSource = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source"); kafkaSource.print(); // 生成watermark,从数据中提取时间作为事件时间 SingleOutputStreamOperator watermarkedStream = kafkaSource.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ZERO) .withTimestampAssigner(new SerializableTimestampAssigner() { @Override public long extractTimestamp(Event2 element, long recordTimestamp) { SimpleDateFormat simpleDateFormat = new SimpleDateFormat("-MM-dd HH:mm:ss"); Date date = null; try { date = simpleDateFormat.parse(element.getTime()); } catch (ParseException e) { throw new RuntimeException(e); } long time = date.getTime(); System.out.println(time); return time; } })); // 窗口聚合 watermarkedStream.map(new MapFunction>() { @Override public Tuple2 map(Event2 value) throws Exception { // 将数据转换成二元组,方便计算 return Tuple2.of(value.getUser(), 1L); } }) .keyBy(r -> r.f0) // 设置滚动事件时间窗口 .window(TumblingEventTimeWindows.of(Time.seconds(5))) .reduce(new ReduceFunction>() { @Override public Tuple2 reduce(Tuple2 value1, Tuple2 value2) throws Exception { // 定义累加规则,窗口闭合时,向下游发送累加结果 return Tuple2.of(value1.f0, value1.f1 + value2.f1); } }) .print("Aggregated stream"); env.execute(); } } Notably, if TumblingEventTimeWindows was replaced with TumblingProcessingTimeWindows, the program works well even with "watermarkedStream" Thanks for your time! Lucas
Re: Design decisions around flink table store
Hi, Bright. Thanks for reaching out. That's a really good question. Briefly speaking, the reason is both Hudi and iceberg are not efficient for updating. Also, the FLIP for flink-table-store has said why not hudi [1]: " Why doesn't FileStore use Hudi directly? 1: Hudi aims to support the update of upsert, so it needs to forcibly define the primary key and time column. It is not easy to support all changelog types 2: The update of Hudi is based on the index (currently there are BloomFilter and HBase). The data in the bucket is out of order. Every merge needs to be reread and rewritten, which is expensive. We need fast update storage, LSM is more suitable. " Also I have add JingSong Li to the mail list. He is the creator/maintainer of flink-table-store. Maybe he can provide more detail. [1] [ https://cwiki.apache.org/confluence/display/Flink/FLIP-188%3A+Introduce+Built-in+Dynamic+Table+Storage#FLIP188:IntroduceBuiltinDynamicTableStorage-UsingHudi | https://cwiki.apache.org/confluence/display/Flink/FLIP-188%3A+Introduce+Built-in+Dynamic+Table+Storage#FLIP188:IntroduceBuiltinDynamicTableStorage-UsingHudi ] Best regards, Yuxia 发件人: "graceking lau" 收件人: "User" 发送时间: 星期一, 2023年 2 月 06日 上午 9:24:31 主题: Design decisions around flink table store Hi there, Recently I had a chance to get to know the flink-table-store project. I was attracted by the idea behind it at first glance. After reading the docs, I've got a question in my head for a while. It's about the design of the file storage. It looks like we can implement it based on the other popular open-source libraries other than creating a totally new component (lsm tree based). Hudi or iceburg looks like a good choice, since they both support change logs saving and querying. If we do it like this, there is no need to create a component for other related computation engines (spark, hive or trinno) since they are already supported by hudi or iceburg. It looks like a better solution for me instead of creating another wheel. So, here are my questions. Is there any issue not to write data as hudi or iceburg? Why don't we choose them in the first design decision? Looking forward to your answer! (Not knowing if it's a good way to ask questions here, but I didn't find another way yet. If it's not ok to ask in the mail, could someone please point the right direction for me?) Best regards, Bright.
Re: How to add permission validation? flink reads and writes hive table。
HI, melin li. Could you please explain a bit more about unified access check in flink? Best regards, Yuxia 发件人: "melin li" 收件人: "User" 发送时间: 星期三, 2023年 2 月 01日 下午 2:39:15 主题: How to add permission validation? flink reads and writes hive table。 flink supports both sql and jar types.How can we implement a unified access check in flink? spark supports extensions; flink lacks extensions.
Re: Custom catalog implementation - getting table schema for computed columns
HI, > about the question can I assume that ResolvedCatalogTable will be always a > runtime type. Sorry for I don't really understand your question , why do you have such assumtion? Best regards, Yuxia 发件人: "Krzysztof Chmielewski" 收件人: "User" 发送时间: 星期六, 2023年 1 月 21日 上午 3:13:12 主题: Re: Custom catalog implementation - getting table schema for computed columns Ok, so now I see that runtime type of "table" parameter is ResolvedCatalogTable that has method getResolvedSchema. So I guess my question is, can I assume that ResolvedCatalogTable will be always a runtime type? pt., 20 sty 2023 o 19:27 Krzysztof Chmielewski < [ mailto:krzysiek.chmielew...@gmail.com | krzysiek.chmielew...@gmail.com ] > napisał(a): Hi, I'm implementing a custom Catalog where for "create table" I need to get tables schema, both column names and types from DDL. Now the Catalog's createTable method has "CatalogBaseTable table" argument. The CatalogBaseTable has a deprecated "getSchema" and suggest to use getUnresolvedSchema instead. I was able to resolve schema types for physical columns, but I'm struggling with computed columns [1]. To be more precise I'm struggling to get//resolve the type of this field. I see that all implementations that would be needed to resolve inderlying expression of UnresolvedComputedColumn are marked as @Internal. On the other hand the deprecated "getSchema" has a proper type for this ComputedColumn. I'm wondering now, what should I do. Should I use the deprecated API that has what I need already or should I use suggested API and somehow try to resolve the type using @Internal classes which also does not seems safe. I would appreciate for any hint here. [1] [ https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/sql/create/#:~:text=BIGINT%2C%20%60name%60%20STRING)-,Computed%20Columns,-Computed%20columns%20are | https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/sql/create/#:~:text=BIGINT%2C%20%60name%60%20STRING)-,Computed%20Columns,-Computed%20columns%20are ]
Re: Custom catalog implementation - getting table schema for computed columns
Hi. Just FYI, I have seen some catalogs are still use deprecated TableSchema in flink hive, Iceberg, etc connector. But it's in Flink plan to drop the deprecated table schema [1]. In long term, seems use new schema api is a better choice. If it's for the case of Catalog's createTable method, from the code base [1], the passed CatalogBaseTable looks like should be a instance of ResolvedCatalogBaseTable with which you can get the resolve schema. From the commit history[3], since Flink 1.13, the pased CatalogBaseTable is intance of ResolvedCatalogBaseTable. I think maybe you can cast it ResolvedCatalogBaseTable and get the resolved schema. But please remeber, the cast will fail when the Flink version is lower than 1.13 since only from Flink 1.13, the passed CatalogBaseTable is intance of ResolvedCatalogBaseTable. [1] https://issues.apache.org/jira/browse/FLINK-29072 [2] https://github.com/apache/flink/blob/75a92efd7b35501698e5de253e5231d680830c16/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java#L654 [3] [ https://issues.apache.org/jira/browse/FLINK-21396 | https://issues.apache.org/jira/browse/FLINK-21396 ] Best regards, Yuxia 发件人: "Krzysztof Chmielewski" 收件人: "User" 发送时间: 星期六, 2023年 1 月 21日 上午 2:27:25 主题: Custom catalog implementation - getting table schema for computed columns Hi, I'm implementing a custom Catalog where for "create table" I need to get tables schema, both column names and types from DDL. Now the Catalog's createTable method has "CatalogBaseTable table" argument. The CatalogBaseTable has a deprecated "getSchema" and suggest to use getUnresolvedSchema instead. I was able to resolve schema types for physical columns, but I'm struggling with computed columns [1]. To be more precise I'm struggling to get//resolve the type of this field. I see that all implementations that would be needed to resolve inderlying expression of UnresolvedComputedColumn are marked as @Internal. On the other hand the deprecated "getSchema" has a proper type for this ComputedColumn. I'm wondering now, what should I do. Should I use the deprecated API that has what I need already or should I use suggested API and somehow try to resolve the type using @Internal classes which also does not seems safe. I would appreciate for any hint here. [1] [ https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/sql/create/#:~:text=BIGINT%2C%20%60name%60%20STRING)-,Computed%20Columns,-Computed%20columns%20are | https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/sql/create/#:~:text=BIGINT%2C%20%60name%60%20STRING)-,Computed%20Columns,-Computed%20columns%20are ]
Re: request for link to join
Hi, all. Here is my invitation link: https://join.slack.com/t/apache-flink/shared_invite/zt-1obpql04h-R3o5XM8d~Siyl3KGldkl2Q Best regards, Yuxia 发件人: "P Singh" 收件人: "Tamir Sagi" , "Wai Chee Yau" , "User" 发送时间: 星期日, 2023年 1 月 29日 下午 7:35:21 主题: Re: request for link to join Unable to join link asking to contact admin. Please do the needful. Get [ https://aka.ms/o0ukef | Outlook for iOS ] From: Tamir Sagi Sent: Sunday, January 29, 2023 1:52:51 PM To: Wai Chee Yau ; user@flink.apache.org Subject: Re: request for link to join Welcome Wai, [ https://flink.apache.org/community.html#slack | https://flink.apache.org/community.html#slack ] From: Wai Chee Yau Sent: Sunday, January 29, 2023 9:48 AM To: user@flink.apache.org Subject: request for link to join EXTERNAL EMAIL hi can i please get an invite link to join Slack for Flink? thanks Confidentiality: This communication and any attachments are intended for the above-named persons only and may be confidential and/or legally privileged. Any opinions expressed in this communication are not necessarily those of NICE Actimize. If this communication has come to you in error you must take no action based on it, nor must you copy or show it to anyone; please delete/destroy and inform the sender by e-mail immediately. Monitoring: NICE Actimize may monitor incoming and outgoing e-mails. Viruses: Although we have taken steps toward ensuring that this e-mail and attachments are free from any virus, we advise that in keeping with good computing practice the recipient should ensure they are actually virus free.
Re: Detect Table options override by Query Dynamic options
Hi, Krzysztof Chmielewski. I'm afraid that there's no a way to detect in Table Factory as the passed catalog table has contained the overriden options by query dynamci option and seems we have no any flag to identify it in Table Factory. Best regards, Yuxia 发件人: "Krzysztof Chmielewski" 收件人: "User" 发送时间: 星期五, 2022年 12 月 30日 上午 12:26:01 主题: Detect Table options override by Query Dynamic options Hi, I'm working on custom Table Factory implementation. Is there a way to detect in Table Factory createDynamicTableSink/createDynamicTableSource which table DDL options were override by query dynamic options [1]? [1] [ https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/hints/#dynamic-table-options | https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/hints/#dynamic-table-options ] Regards, Krzysztof Chmielewski
Re: Using TumblingEventTimeWindows on low traffic kafka topic
Yes, your understanding is correct. To handle this, you can define a watermark strategy that will detect idleness and mark an input as idle. Please refer to these two documents[1][2] for more details. [1] https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/datastream/kafka/#idleness [2] https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/datastream/event-time/generating_watermarks/#dealing-with-idle-sources Best regards, Yuxia 发件人: "deepthi s" 收件人: "User" 发送时间: 星期四, 2022年 12 月 22日 上午 9:46:00 主题: Using TumblingEventTimeWindows on low traffic kafka topic (Adding subject) On Wed, Dec 21, 2022 at 5:41 PM deepthi s < [ mailto:deepthi.sridha...@gmail.com | deepthi.sridha...@gmail.com ] > wrote: Hello, I am new to even-time processing and need some help. We have a kafka source with very low qps and multiple topic partitions have no data for long periods of time. Additionally, data from the source can come out of order (within bounds) and the application needs to process the events in order per key. So we wanted to try and sort the events in the application. I am using BoundedOutOfOrdernessWatermarks for generating the watermarks and using TumblingEventTimeWindows to collect the keyed events and sort them in the ProcessWindowFunction. I am seeing that the window doesn’t close at all and based on my understanding it is because there are no events for some source partitions. All operators have the same parallelism as source kafka partition count. Pseudo code for my processing: SingleOutputStreamOperator myStream = env.fromSource( setupSource () , WatermarkStrategy. noWatermarks () , "Kafka Source" , TypeInformation. of (RowData. class )) .map(rowData -> convertToMyEvent(rowData)) .assignTimestampsAndWatermarks(WatermarkStrategy . forBoundedOutOfOrderness ( Duration. ofMinutes (misAlignmentThresholdMinutes)) .withTimestampAssigner((event , timestamp) -> event. timestamp )) // Key the events by urn which is the key used for the output kafka topic .keyBy((event) -> event.urn.toString()) // Set up a tumbling window of misAlignmentThresholdMinutes .window(TumblingEventTimeWindows. of (Time. of (misAlignmentThresholdMinutes , TimeUnit. MINUTES ))) .process( new EventTimerOrderProcessFunction()) .sinkTo(setupSink()) ; Is the understanding correct that the based on the WatermarkStrategy I have, multiple operators will keep emitting LONG.MIN_VALUE - threshold if no events are read for those partitions, causing the downstream keyBy operator also to emit LONG.MIN_VALUE - threshold watermark (as the min of all watermarks it sees from its input map operators) and so the window doesn’t close at all? If yes, what is the right strategy to handle this? Is there a way to combine EventTimeTrigger with ProcessingTimeoutTrigger? -- Regards, Deepthi -- Regards, Deepthi
Re: unsubscribe
To unsubscribe, you should send an email to user-unsubscr...@flink.apache.org, which has been documented in Flink offical website[1]. [1] https://flink.apache.org/community.html#mailing-lists Best regards, Yuxia 发件人: "Ayush" 收件人: "User" 发送时间: 星期日, 2022年 12 月 11日 下午 9:41:02 主题: unsubscribe unsubscribe
Re: How to set disableChaining like streaming multiple INSERT statements in a StatementSet ?
Could you please post the image of the running job graph in Flink UI? Best regards, Yuxia 发件人: "hjw" 收件人: "User" 发送时间: 星期四, 2022年 12 月 08日 上午 12:05:00 主题: How to set disableChaining like streaming multiple INSERT statements in a StatementSet ? Hi, I create a StatementSet that contains multiple INSERT statements. I found that multiple INSERT tasks will be organized in a operator chain when StatementSet.execute() is invoked. How to set disableChaining like streaming multiple INSERT statements in a StatementSet api ? env: Flink version:1.14.4 -- Best, Hjw
Re: Registering serializer for RowData
Hi, what's the type of the input for the SortOperator? I mean what's the TypeInformation? For example, PojoTypeInfo or RowTypeInfo? Best regards, Yuxia 发件人: "Ken Krugler" 收件人: "User" 发送时间: 星期三, 2022年 12 月 07日 上午 9:11:17 主题: Registering serializer for RowData Hi there, I’m using the Hudi sink to write data, in bulk insert mode, and running into an issue where Hudi is unhappy because (I think) Flink is using the Kryo serializer for RowData records, instead of something that extends AbstractRowDataSerializer. It’s this bit of (Hudi) code in SortOperator.java that fails: AbstractRowDataSerializer inputSerializer = (AbstractRowDataSerializer) getOperatorConfig(). getTypeSerializerIn1 (getUserCodeClassloader()); this . binarySerializer = new BinaryRowDataSerializer( inputSerializer .getArity()); And I get: Caused by: java.lang.ClassCastException: class org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer cannot be cast to class org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer (org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer and org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer are in unnamed module of loader 'app') at org.apache.hudi.sink.bulk.sort.SortOperator.open(SortOperator.java:73) at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107) … So I’m wondering if the Flink table code configures this serializer, and I need to do the same in my Java API-based workflow. Thanks, — Ken PS - This is with Flink 1.15.1 and Hudi 0.12.0 -- Ken Krugler [ http://www.scaleunlimited.com/ | http://www.scaleunlimited.com ] Custom big data solutions Flink, Pinot, Solr, Elasticsearch
Re: Flink Table Kinesis sink not failing when sink fails
Which code line the error message happens? Maybe it will swallow the exception and then log the error message, in which case Flink job won't fail since it seems like no exception happens. Best regards, Yuxia 发件人: "Dan Hill" 收件人: "User" 发送时间: 星期三, 2022年 11 月 30日 上午 8:06:52 主题: Flink Table Kinesis sink not failing when sink fails I set up a simple Flink SQL job (Flink v1.14.4) that writes to Kinesis . The job looks healthy but the records are not being written. I did not give enough IAM permissions to write to Kinesis . However, the Flink SQL job acts like it's healthy and checkpoints even though the Kinesis PutRecords call fails. I'd expect this error to kill the Flink job. I looked through Flink Jira and the Flink user group but didn't see a similar issue. Is the silent failure a known issue? If the Flink job doesn't fail, it'll be hard to detect production issues. ``` 2022 - 11 - 29 23 : 30 : 27 , 587 ERROR org.apache.flink. kinesis .shaded. com . amazonaws . services . kinesis . producer . LogInputStreamReader [] - [ 2022 - 11 - 29 23 : 30 : 27.578072 ] [ 0 x1e3b][ 0 x7f12ef8fc700] [error] [AWS Log: ERROR](AWSClient)HTTP response code: 400 Exception name: AccessDeniedException Error message: User : arn:aws:sts:: 055315558257 :assumed-role/dev-workers-us-east-1b-202203101433138915000a/i-09e4f747a4bdbb1f0 is not authorized to perform: kinesis :ListShards on resource: arn:aws: kinesis :us-east-1: 055315558257 :stream/dan-dev-content-metrics because no identity -based policy allows the kinesis :ListShards action 6 response headers: connection : close content-length : 379 content-type : application/x-amz-json-1. 1 date : Tue, 29 Nov 2022 23 : 30 : 27 GMT x-amz-id-2 : q8kuplUOMJILzVU97YA+TYSyy6aozeoST+yws26rOkyzEUUZT0zKBdcMWUAjV/ 8 RrnMeed/+em7CbjpwzGYEANgkwCihZWdC x-amzn-requestid : e4a39674-66fa-4dcd-b8a3-0e273e5e628a ```
Re: Weird Flink SQL error
Hi, Dan. I'm wondering what type of error you expect. IMO, I think most engines throw parse error in such way which tell you encounter an unexpected token. Best regards, Yuxia 发件人: "Dan Hill" 收件人: "User" 发送时间: 星期三, 2022年 11 月 23日 下午 1:55:20 主题: Weird Flink SQL error Hi. I'm hitting an obfuscated Flink SQL parser error. Is there a way to get better errors for Flink SQL? I'm hitting it when I wrap some of the fields on an inner Row. Works CREATE TEMPORARY VIEW `test_content_metrics_view` AS SELECT DATE_FORMAT(TUMBLE_ROWTIME(rowtime, INTERVAL '1' DAY ), '-MM-dd' ), platform_id, content_id FROM content_event GROUP BY platform_id, content_id, TUMBLE(rowtime, INTERVAL '1' DAY ) CREATE TABLE test_content_metrics ( dt STRING NOT NULL , `platform_id` BIGINT, `content_id` STRING ) PARTITIONED BY (dt) WITH ( 'connector' = 'filesystem' , 'path' = 'etl/test_content_metrics' , 'format' = 'json' , ) INSERT INTO `test_content_metrics` SELECT * FROM `test_content_metrics_view` Fails Wrapping a couple parameters in a Row causes the following exception. Caused by : org.apache.flink. sql .parser.impl.ParseException: Encountered "." at line 1 , column 119 . Was expecting one of : ")" ... "," ... org.apache.flink. sql .parser.impl.FlinkSqlParserImpl.generateParseException(FlinkSqlParserImpl.java: 40981 ) org.apache.flink. sql .parser.impl.FlinkSqlParserImpl.jj_consume_token(FlinkSqlParserImpl.java: 40792 ) org.apache.flink. sql .parser.impl.FlinkSqlParserImpl.ParenthesizedSimpleIdentifierList(FlinkSqlParserImpl.java: 25220 ) org.apache.flink. sql .parser.impl.FlinkSqlParserImpl.Expression3(FlinkSqlParserImpl.java: 19925 ) org.apache.flink. sql .parser.impl.FlinkSqlParserImpl.Expression2b(FlinkSqlParserImpl.java: 19581 ) [...] CREATE TEMPORARY VIEW `test_content_metrics_view` AS SELECT DATE_FORMAT(TUMBLE_ROWTIME(rowtime, INTERVAL '1' DAY ), '-MM-dd' ), ROW( platform_id, content_id ) FROM content_event GROUP BY platform_id, content_id, TUMBLE(rowtime, INTERVAL '1' DAY ) CREATE TABLE test_content_metrics ( dt STRING NOT NULL , `body` ROW( `platform_id` BIGINT, `content_id` STRING ) ) PARTITIONED BY (dt) WITH ( 'connector' = 'filesystem' , 'path' = 'etl/test_content_metrics' , 'format' = 'json' , ) INSERT INTO `test_content_metrics` SELECT * FROM `test_content_metrics_view`
Re: Flink SQL JSON
Hi! Maybe you can map it to Flink's BINARY / VARBINARY type [1]. [1] https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/json/#data-type-mapping Best regards, Yuxia 发件人: "Timothy Bess" 收件人: "User" 发送时间: 星期六, 2022年 11 月 19日 上午 4:10:41 主题: Flink SQL JSON Hi Flink Users, I use Flink SQL to ETL Kafka event data into tables we use for analysis, and I'm currently trying to use it to basically extract a few fields out of some JSON, but leave the original blob in a subdocument using the elasticsearch connector. I see that there is `ROW` and `MAP` but I don't see any data types for unstructured JSON blobs. This would be really useful for both `jsonb` columns in Postgres and subdocuments in Elasticsearch. Has anyone found a good solution for this? Thanks, Tim
Re: How to use lookup join sql hint
Could you please show us the detail exception? You can find it in FLINK_HOME/log Best regards, Yuxia 发件人: "Si-li Liu" 收件人: "User" 发送时间: 星期六, 2022年 11 月 12日 下午 4:27:54 主题: How to use lookup join sql hint I try to use this sql to insert my data to doris, and my Flink version is 1.16.0. After check [ https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/sql/queries/hints/#join-hints | https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/sql/queries/hints/#join-hints ] . Table scrm_admin_dept_user and table scrm_admin_dept use jdbc conector and scrm_admin_user use mysql-cdc connector. insert into dim_scrm_user_dept_doris_sink select /*+ LOOKUP('table'='scrm_admin_dept_user'),LOOKUP('table'='scrm_admin_dept') */ [ http://a.id/ | a.id ] , b.dept_id, [ http://a.name/ | a.name ] as name, [ http://c.name/ | c.name ] as dept_name, a.tenant_id from scrm_admin_user a join scrm_admin_dept_user b on [ http://a.id/ | a.id ] = b.user_id join scrm_admin_dept c on b.dept_id = [ http://c.id/ | c.id ] where a.deleted = false and b.deleted = false and c.deleted = false; But flink reject my sql with this error message: [ERROR] Could not execute SQL statement. Reason: org.apache.flink.table.client.gateway.SqlExecutionException: Could not execute SQL statement. Afer I remove this sql hint, the sql can be submitted successfully. Did I get anything wrong with this lookup join hint? And also, I'm new to Flink SQL, does this pipeline diagram mean I already use lookup join? -- Best regards Sili Liu
Re: [blog article] Howto migrate a real-life batch pipeline from the DataSet API to the DataStream API
Wow, cool! Thanks for your work. It'll be definitely helpful for the users that want to migrate their batch job from DataSet API to DataStream API. Best regards, Yuxia - 原始邮件 - 发件人: "Etienne Chauchot" 收件人: "dev" , "User" 发送时间: 星期一, 2022年 11 月 07日 下午 10:29:54 主题: [blog article] Howto migrate a real-life batch pipeline from the DataSet API to the DataStream API Hi everyone, In case some of you are interested, I just posted a blog article about migrating a real-life batch pipeline from the DataSet API to the DataStream API: https://echauchot.blogspot.com/2022/11/flink-howto-migrate-real-life-batch.html Best Etienne
Re: Question about UDF randomly processed input row twice
Thanks for your explanation. The execute plan for the sql `INSERT INTO print_table SELECT * FROM ( SELECT RandomUdf(`id`) AS `id_in_bytes`, `id` FROM datagenTable ) AS ET WHERE ET.`id_in_bytes` IS NOT NULL` is : ` StreamPhysicalSink(table=[default_catalog.default_database.print_table], fields=[id_in_bytes, id]) StreamPhysicalCalc(select=[RandomUdf(id) AS id_in_bytes, id], where=[IS NOT NULL(RandomUdf(id))]) StreamPhysicalTableSourceScan(table=[[default_catalog, default_database, datagenTable]], fields=[id]) ` and from the plan, we can see it'll call the udf for twice in the StreamPhysicalCalc, as of result of which, it seems the one row will be processed for twice. Best regards, Yuxia 发件人: "Xinyi Yan" 收件人: "yuxia" 抄送: "User" 发送时间: 星期五, 2022年 11 月 04日 上午 5:28:30 主题: Re: Question about UDF randomly processed input row twice Ok. The datagen with sequence option can produce this issue easily, and it also resulted in an incorrect result. I have a sequence generated by datagen that starts from 1 to 5 and let the UDF randomly either return null or bytes. Surprisingly, not only the UDF has been executed twice but also the where clause did not handle the ` IS NOT NULL `. This is a big shock from my side, the where clause `IS NOT NULL` condition is a fundamental SQL feature and it should not break. I have updated my finding in [ https://issues.apache.org/jira/browse/FLINK-29855 | FLINK-29855 ] , and here are the repro steps: Query: INSERT INTO print_table SELECT * FROM ( SELECT RandomUdf(`id`) AS `id_in_bytes`, `id` FROM datagenTable ) AS ET WHERE ET.`id_in_bytes` IS NOT NULL " Result: +I[ null , 1] +I[[50], 2] +I[ null , 4] UDF public @DataTypeHint( "Bytes" ) byte [] eval(@DataTypeHint( "INT" ) Integer intputNum) { byte [] results = intputNum.toString().getBytes(StandardCharsets.UTF_8); int randomNumber = (( int ) ( Math .random() * (10 - 1))) + 1; LOG.info( "[*][*][*] input num is {} and random number is {}. [*][*][*]" , intputNum, randomNumber); if (randomNumber % 2 == 0) { LOG.info( "### ### input bytes {} and num {}. ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ### " , results, intputNum); return results; } LOG.info( "*** *** input bytes {} and num {}." , results, intputNum); return null ; } Log: 2022-11-02 13:38:56,765 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### input bytes [45, 49, 51, 50, 52, 56, 51, 54, 53, 48, 50] and num -1324836502. ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ### 2022-11-02 13:38:56,766 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### input bytes [45, 49, 51, 50, 52, 56, 51, 54, 53, 48, 50] and num -1324836502. ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ### 2022-11-02 13:38:57,761 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### input bytes [49, 48, 56, 53, 52, 53, 54, 53, 52, 50] and num 1085456542. ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ### 2022-11-02 13:38:57,763 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### input bytes [49, 48, 56, 53, 52, 53, 54, 53, 52, 50] and num 1085456542. ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ### 2022-11-02 13:38:58,760 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### input bytes [49, 53, 48, 54, 51, 49, 49, 57, 53, 52] and num 1506311954. ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ### 2022-11-02 13:38:58,761 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### input bytes [49, 53, 48, 54, 51, 49, 49, 57, 53, 52] and num 1506311954. ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ### 2022-11-02 13:38:59,759 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - *** *** input bytes [45, 49, 56, 48, 48, 54, 57, 48, 52, 51, 55] and num -1800690437. 2022-11-02 13:39:00,761 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - *** *** input bytes [49, 52, 50, 56, 56, 55, 55, 52, 56, 51] and num 1428877483. 2022-11-02 13:39:01,761 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### input bytes [45, 49, 55, 57, 52, 50, 54, 51, 54, 56, 54] and num -1794263686. ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ### 2022-11-02 13:39:01,761 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### input bytes [45, 49, 55, 57, 52, 50, 54, 51, 54, 56, 54] and num -1794263686. ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ### 2022-11-02 13:39:02,760 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### input bytes [45, 49, 49, 54, 54, 56, 57, 56, 53, 52, 50] and num -1166898542.
Re: Question about UDF randomly processed input row twice
The dategen may produce rows with same values. >From my side, in Flink, the udf shouldn't process one row for twice, >otherwise, it should be a critical bug. Best regards, Yuxia 发件人: "Xinyi Yan" 收件人: "User" 发送时间: 星期四, 2022年 11 月 03日 上午 6:59:20 主题: Question about UDF randomly processed input row twice Hi all, I found a weird UDF behavior, and it's a single thread that processes UDF twice, see [ https://issues.apache.org/jira/browse/FLINK-29855 | FLINK-29855 ] for more details. Basically, I created a datagen table with a random integer (1 row per second) and passed this value into the UDF. Inside UDF, I just simply mod the input number, convert the integer to a byte array, and then logged it for debugging purposes. As you can see, some of the rows have been called twice inside UDF. Not sure if this duplicated UDF call is expected, and not sure why it doesn't constantly produce duplicated calls for all rows. In any case of concern about the local env setup, I only have 1 task manager and 1 task slot in my local Flink cluster. Thanks! UDF public @DataTypeHint( "Bytes" ) byte [] eval(@DataTypeHint( "INT" ) Integer intputNum) { byte [] results = intputNum.toString().getBytes(StandardCharsets.UTF_8); if (intputNum % 2 == 0) { LOG.info( "### ### input bytes {} and num {}. ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ### " , results, intputNum); return results; } LOG.info( "*** *** input bytes {} and num {}." , results, intputNum); return null ; } Main class DDLs tEnv.executeSql( "CREATE FUNCTION IntInputUdf AS 'org.apache.flink.playgrounds.spendreport.IntInputUdf' " ); tEnv.executeSql( "CREATE TABLE datagenTable (\n" + " id INT\n" + ") WITH (\n" + " 'connector' = 'datagen' ,\n" + " 'number-of-rows' = '100' ,\n" + " 'rows-per-second' = '1' \n" + ")" ); tEnv.executeSql( "CREATE TABLE print_table (\n" + " id_in_bytes VARBINARY,\n" + " id INT\n" + ") WITH (\n" + " 'connector' = 'print' \n" + ")" ); tEnv.executeSql( "INSERT INTO print_table SELECT * FROM ( SELECT IntInputUdf(`id`) AS `id_in_bytes`, `id` FROM datagenTable ) AS ET WHERE ET.`id_in_bytes` IS NOT NULL" ); Logging 2022-11-02 13:38:58,760 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### input bytes [49, 53, 48, 54, 51, 49, 49, 57, 53, 52] and num 1506311954 . ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ### 2022-11-02 13:38:58,761 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### input bytes [49, 53, 48, 54, 51, 49, 49, 57, 53, 52] and num 1506311954 . ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ### 2022-11-02 13:38:59,759 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - *** *** input bytes [45, 49, 56, 48, 48, 54, 57, 48, 52, 51, 55] and num -1800690437 . 2022-11-02 13:39:00,761 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - *** *** input bytes [49, 52, 50, 56, 56, 55, 55, 52, 56, 51] and num 1428877483 . 2022-11-02 13:39:01,761 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### input bytes [45, 49, 55, 57, 52, 50, 54, 51, 54, 56, 54] and num -1794263686 . ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ### 2022-11-02 13:39:01,761 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### input bytes [45, 49, 55, 57, 52, 50, 54, 51, 54, 56, 54] and num -1794263686 . ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ###
Re: why select limit so slow on yarn cluster
Such sql should be finished quickly. Could you please look at which part cost much time? Starting the job or running the job or other stuff. And how about the network? Btw, what's the logic plan and physical plan? You can use `explain select * from scrm_admin_role limit 10` to show the plan. Best regards, Yuxia 发件人: "Si-li Liu" 收件人: "User" 发送时间: 星期三, 2022年 11 月 02日 上午 12:17:52 主题: why select limit so slow on yarn cluster I created a table using Flink SQL on yarn session. CREATE TEMPORARY TABLE `scrm_admin_role` ( > `id` bigint, > `role_name` string, > `sort` int, > `type` tinyint, > `status` tinyint, > `tenant_id` bigint, > `deleted` BOOLEAN, > `create_time` TIMESTAMP, > `update_time` TIMESTAMP, > PRIMARY KEY (`id`) NOT ENFORCED > ) WITH ( > 'connector' = 'jdbc', > 'url'='jdbc:mysql:// [ http://172.17.16.45:3306/willing_base | > 172.17.16.45:3306/willing_base ] ', > 'username' = '*', > 'password' = *', > 'table-name' = 'scrm_admin_role' > ); The origin table is very small, about 10k lines. Then I tried to select to check the data, select * from scrm_admin_role limit 10; It can retrieve the result, but very slow, maybe took about 2~3 minutes. Cloud anyone tell me why and how cloud I make it fast. -- Best regards Sili Liu
Re: Could not find any factory for identifier 'filesystem'
the dependency flink-connector-files is needed. Best regards, Yuxia 发件人: "Pavel Penkov" 收件人: "User" 发送时间: 星期二, 2022年 11 月 01日 下午 6:06:43 主题: Could not find any factory for identifier 'filesystem' I'm trying to run a Flink job as a standalone program and getting the following error. Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'filesystem' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath. Available factory identifiers are: blackhole datagen kafka print upsert-kafka Here's a list of dependencies (Flink version is 1.16.0) libraryDependencies ++= Seq( // Flink "org.apache.flink" %% "flink-streaming-scala" % versions.flink, "org.apache.flink" %% "flink-table-api-scala" % versions.flink, "org.apache.flink" %% "flink-table-api-scala-bridge" % versions.flink, "org.apache.flink" % "flink-connector-kafka" % versions.flink, "org.apache.flink" % "flink-avro-confluent-registry" % versions.flink, "org.apache.flink" %% "flink-table-planner" % versions.flink, "org.apache.flink" % "flink-avro" % versions.flink, "org.apache.flink" % "flink-clients" % versions.flink, "org.apache.flink" % "flink-runtime" % versions.flink, "org.apache.flink" % "flink-runtime-web" % versions.flink, "org.apache.flink" % "flink-parquet" % versions.flink, // Hadoop "org.apache.hadoop" % "hadoop-client" % versions.hadoop, // Misc "org.rogach" %% "scallop" % "4.1.0", "ch.qos.logback" % "logback-classic" % "1.4.4" ) I've also tried to run it on a host that has Hadoop installed setting HADOOP_CLASSPATH and HADOOP_CONF_DIR but the result is the same.
Re: SQL Lookup join on nested field
AFAK,there's no any plan/ticket for it. If you think it's needed to be supported, you can create a tiket in jira[1] for it. [1] https://issues.apache.org/jira/projects/FLINK/summary Best regards, Yuxia 发件人: "Krzysztof Chmielewski" 收件人: "User" 发送时间: 星期二, 2022年 10 月 18日 下午 3:28:26 主题: SQL Lookup join on nested field Hi all, I have found an old thread [1] where there was a question about SQL joins on nested fields. The conclusion was that (quote): "temporal table join doesn't support join on a nested join. In blink planner, a temporal table join will be translated into lookup join which will use the equality condition fields as the lookup keys." The thread is from 2020 and I was wondering if there is any plan/ticket about adding support for nested fields to lookup join/temporal join I've checked this use case on Flink 1.15.2 and it seems still not supported. [1] [ https://lists.apache.org/thread/o3fc5lrqf6dbkl9pm0rp2mqyt7mcnsv3 | https://lists.apache.org/thread/o3fc5lrqf6dbkl9pm0rp2mqyt7mcnsv3 ] Best Regards, Krzysztof Chmielewski SQL join on nested field
Re: Flink 1.15 Interval Join error after Deduplicate
The view A try to do de-duplication using event time, which will still produce update rows. if you using proc time to do de-duplication.Then the view A should only produce append only rows. Best regards, Yuxia > 2022年10月15日 上午9:50,liebin...@whu.edu.cn 写道: > > I had a problem with Interval Join after using Deduplicate. I'm using Flink > version 1.15. > > I want to use Flink's Interval Join for double-stream association, and my > first table needs to be de-duplicated. Here is my sample code. > > ``` > CREATE TEMPORARY TABLE `source` ( > id INT, > name STRING, > event_time TIMESTAMP(3), > WATERMARK FOR event_time AS event_time > ) WITH ( > 'connector' = 'datagen' > ); > > > CREATE TEMPORARY TABLE B ( > id INT, > `start` INT, > `end` INT, > event_time TIMESTAMP(3), > WATERMARK FOR event_time AS event_time > ) WITH ( > 'connector' = 'datagen' > ); > > create TEMPORARY view A as > select id, name, event_time from ( > select id, name, event_time, > row_number() over(partition by id, name, event_time order by event_time asc) > as rn > from source > ) > where rn = 1; > > SELECT * > FROM A, B > WHERE >A.id = B.id AND A.id >= B.`start` AND A.id <= B.`end` AND >A.event_time BETWEEN B.event_time - INTERVAL '10' SECOND AND >B.event_time + INTERVAL '10' SECOND; > ``` > > I used to preserve the first row of data for the de-duplication, so view A > should only produce insert rows, but running the SQL above would produce the > following error. > > ``` > [ERROR] Could not execute SQL statement. Reason: > org.apache.flink.table.api.TableException: StreamPhysicalIntervalJoin doesn't > support consuming update and delete changes which is produced by node > Deduplicate(keep=[FirstRow], key=[id, name, event_time], order=[ROWTIME]) > ``` > > How to perform Interval Join after using Deduplicate?
Re: Flink 1.15 Interval Join error after Deduplicate
> view A should only produce insert rows No, the view A will still produce update/delete rows. Best regards, Yuxia > 2022年10月15日 上午9:50,liebin...@whu.edu.cn 写道: > > view A should only produce insert rows
Re: SQL Changes between 1.14 and 1.15?
Thanks for raising it. It seems a bug that introduced by this pr [1]. I have created [FLINK-29651] to trace it. [1] https://github.com/apache/flink/pull/19001 <https://github.com/apache/flink/pull/19001> [2] https://issues.apache.org/jira/browse/FLINK-26520 <https://issues.apache.org/jira/browse/FLINK-26520> Best regards, Yuxia > 2022年10月14日 下午9:19,PACE, JAMES 写道: > > We’ve noticed the following difference in sql when upgrading from flink > 1.14.5 to 1.15.2 around characters that are escaped in an sql statement: > > This statement: > tableEnvironment.executeSql("select * from testTable WHERE lower(field1) > LIKE 'b\"cd\"e%'"); > produces a runtime error in flink 1.15.2, but executes properly in flink > 1.14.5 > > This can be worked around by escaping the backslash, changing the statement > to: > tableEnvironment.executeSql("select * from testTable WHERE lower(field1) > LIKE 'b\\\"cd\\\"e%'"); > > This code illustrates the issue: > > import org.apache.flink.streaming.api.datastream.DataStream; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.table.api.Schema; > import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; > > public class TestCase3 { > public static void main(String[] args) throws Exception { > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setParallelism(1); > > TestData testData = new TestData(); > testData.setField1("b\"cd\"e"); > DataStream stream = env.fromElements(testData); > stream.print(); > final StreamTableEnvironment tableEnvironment = > StreamTableEnvironment.create(env); > tableEnvironment.createTemporaryView("testTable", stream, > Schema.newBuilder().build()); > > // Works with Flink 1.14.x, flink runtime errors in 1.15.2. > Uncomment to see runtime trace > //tableEnvironment.executeSql("select *, '1' as run from testTable > WHERE lower(field1) LIKE 'b\"cd\"e%'").print(); > // Works with 1.15.2 > tableEnvironment.executeSql("select * from testTable WHERE > lower(field1) LIKE 'b\\\"cd\\\"e%'").print(); > > env.execute("TestCase"); > } > > public static class TestData { > private String field1; > > public String getField1() { return field1; } > public void setField1(String field1) { this.field1 = field1; } > } > } > > Thanks > Jim
Re: Sorting by source event time
You can change to "order by eventTIme". And it should work. You can sort on event time, but it must be time-ascending-order without 'limit'. If you still want to a descending order, I think you can try to set the internal configuration `__table.exec.sort.non-temporal.enabled__` to true. But remember it's just experimental, which may bring unexpect behavior. Best regards, Yuxia - 原始邮件 - 发件人: "Noel OConnor" 收件人: "User" 发送时间: 星期二, 2022年 9 月 27日 上午 2:10:36 主题: Sorting by source event time Hi, I have a temporary view created from a datastream. tableEnv.createTemporaryView("productDetailsView", productStream, Schema.newBuilder() .columnByMetadata("eventTime", "TIMESTAMP_LTZ(3)", "rowtime", Boolean.TRUE) .watermark("eventTime", "SOURCE_WATERMARK()") .build()); and i'm trying to sort it using the following Table resultTable2 = tableEnv.sqlQuery( "SELECT * FROM productDetailsView ORDER BY eventTime DESC"); but I get the following error Caused by: org.apache.flink.table.api.TableException: Sort on a non-time-attribute field is not supported. Can you sort on event time or does it have to be part of the actual payload data type? Have I missed something obvious ? cheers Noel
Re: Insert into JDBC table
"tEnv.executeSql("INSERT INTO Customers (customer_number, pid_no, name) VALUES (4000, 100, 'customer')");" should work. If not work, it seems to be a bug. >> "Flink dynamic table is just a link to real data" Yes, it's. >> Is there any way to create empty table? Or table with some values defined in >> Flink? Maybe you can try create table with Hive dialect[1] which enable you create a table in Hive using Flink SQL. Or you can try flink-table-store. AFAK, seems we can't create a table with some values defined directly. [1] https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/hive/hive_dialect/ [2] https://nightlies.apache.org/flink/flink-table-store-docs-master/ . Or Best regards, Yuxia - 原始邮件 - 发件人: "podunk" 收件人: "User" 发送时间: 星期一, 2022年 9 月 12日 下午 8:36:54 主题: Re: Insert into JDBC table I see I can only insert into JDBC table with select from another table, something like: tEnv.executeSql("INSERT INTO Customers SELECT customer_number, pid_no, name FROM another_table"); But what if I want to insert row that I created within Flink? For instance I made some calculation and I want to insert completely new row into table (it does not exist in any table)? Something like: tEnv.executeSql("INSERT INTO Customers (customer_number, pid_no, name) VALUES (4000, 100, 'customer')"); ? One more question - Flink dynamic table is just a link to real data (right?). Is there any way to create empty table? Or table with some values defined in Flink? Thanks for help, M. Sent: Friday, September 09, 2022 at 3:03 PM From: pod...@gmx.com To: user@flink.apache.org Subject: Insert into JDBC table Why this INSERT does not insert row in table (jdbc connection works, I can create 'Customers' table from MySQL table)? tEnv.executeSql("CREATE TABLE Customers (" + " customer_number INT, " + " pid_no INT, " + " name STRING, " + " PRIMARY KEY (customer_number) NOT ENFORCED" + " ) WITH ( " + " 'connector' = 'jdbc', " + " 'url' = 'jdbc:mysql://localhost:3306/test', " + " 'username' = 'some_user', " + " 'table-name' = 'customers', " + " 'password' = ''" + ")"); //This insert does nothing (not even error) tEnv.executeSql("INSERT INTO Customers (customer_number, pid_no, name) VALUES (4000, 100, 'customer')"); According to documentation (https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/jdbc/) it should work. Regards, Mike
Re: Access to Table environent properties/Job arguents from DynamicTableFactory
Have you ever checked the DynamicTableFactory.Context#getConfiguration? Is it something that you're looking for? Best regards, Yuxia 发件人: "Krzysztof Chmielewski" 收件人: "User" 发送时间: 星期六, 2022年 9 月 10日 上午 12:51:09 主题: Access to Table environent properties/Job arguents from DynamicTableFactory Hi, is there a way to access a Table Environment configuration or Job arguments from DynamicTableFactory (Sink/Source)? I'm guessing no but I just want to double check that I'm not missing anything here. For my understanding we have access only to Table definition. Thanks, Krzysztof Chmielewski
Re: get NoSuchMethodError when using flink flink-sql-connector-hive-2.2.0_2.11-1.14.4.jar
How do you use `flink-sql-connector-hive-2.2.0_2.11-1.14.4.jar`? Do you use sql client ? Do you put it in FLINK_HOME/lib? If it's for sql client, I think you can remove the jar from FLINK_HOME/lib, but add it in Flink SQL client using `add jar 'flink-sql-connector-hive-2.2.0_2.11-1.14.4.jar' `, and set 'org.apache.commons.' the to parent-first[1] But I think the better way is to relocate the class. [1] https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#classloader-parent-first-patterns-default Best regards, Yuxia 发件人: "Liting Liu (litiliu)" 收件人: "User" 发送时间: 星期三, 2022年 8 月 31日 下午 5:14:35 主题: get NoSuchMethodError when using flink flink-sql-connector-hive-2.2.0_2.11-1.14.4.jar Hi, i got NoSuchMethodError when using flink flink-sql-connector-hive-2.2.0_2.11-1.14.4.jar. Exception in thread "main" org.apache.flink.table.client.SqlClientException: Unexpected exception. This is a bug. Please consider filing an issue. at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:201) at org.apache.flink.table.client.SqlClient.main(SqlClient.java:161) Caused by: java.lang.NoSuchMethodError: org.apache.commons.lang3.StringUtils.join([IC)Ljava/lang/String; at org.apache.flink.table.planner.plan.utils.RankProcessStrategy$UpdateFastStrategy.toString(RankProcessStrategy.java:129) at java.lang.String.valueOf(String.java:2994) at java.lang.StringBuilder.append(StringBuilder.java:136) at org.apache.flink.table.planner.plan.utils.RelDescriptionWriterImpl.explain(RelDescriptionWriterImpl.java:67) at org.apache.flink.table.planner.plan.utils.RelDescriptionWriterImpl.done(RelDescriptionWriterImpl.java:96) at org.apache.calcite.rel.AbstractRelNode.explain(AbstractRelNode.java:246) at org.apache.flink.table.planner.plan.nodes.FlinkRelNode.getRelDetailedDescription(FlinkRelNode.scala:50) at org.apache.flink.table.planner.plan.nodes.FlinkRelNode.getRelDetailedDescription$(FlinkRelNode.scala:46) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalRank.getRelDetailedDescription(StreamPhysicalRank.scala:41) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyUpdateKindTraitVisitor.createNewNode(FlinkChangelogModeInferenceProgram.scala:701) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyUpdateKindTraitVisitor.$anonfun$visitRankStrategies$1(FlinkChangelogModeInferenceProgram.scala:738) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyUpdateKindTraitVisitor.$anonfun$visitRankStrategies$1$adapted(FlinkChangelogModeInferenceProgram.scala:730) at scala.collection.Iterator.foreach(Iterator.scala:937) at scala.collection.Iterator.foreach$(Iterator.scala:937) at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) at scala.collection.IterableLike.foreach(IterableLike.scala:70) at scala.collection.IterableLike.foreach$(IterableLike.scala:69) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyUpdateKindTraitVisitor.visitRankStrategies(FlinkChangelogModeInferenceProgram.scala:730) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyUpdateKindTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:489) Seems there is an embeded StringUtils in flink-sql-connector-hive-2.2.0_2.11-1.14.4.jar. which confilict with other class. What should i do? Do I have to manually excude StringUtils.class in flink-sql-connector-hive-2.2.0_2.11-1.14.4.jar?
Re: Failing to maven compile install Flink 1.15
which mvn version do you use? It's recommanded to use maven 3.2.5 Best regards, Yuxia 发件人: "hjw" <1010445...@qq.com> 收件人: "User" 发送时间: 星期四, 2022年 8 月 18日 下午 10:48:57 主题: Failing to maven compile install Flink 1.15 I try to maven clean install Flink 1.15 parent,but fail. A Error happened in compiling flink-clients. Error Log: Failed to execute goal org.apache.maven.plugins:maven-assembly-plugin:2.4:single (create-test-dependency) on project flink-clients: Error reading assemblies: Error locating assembly descriptor: src/test/assembly/test-assembly.xml [1] [INFO] Searching for file location: D:\learn\Code\Flink\FlinkSourceCode\Flink-1.15\flink\flink-clients\target\src\test\assembly\test-assembly.xml [2] [INFO] File: D:\learn\Code\Flink\FlinkSourceCode\Flink-1.15\flink\flink-clients\target\src\test\assembly\test-assembly.xml does not exist. [3] [INFO] File: D:\learn\Code\Flink\FlinkSourceCode\Flink-1.15\flink\src\test\assembly\test-assembly.xml does not exist. However, mvn clean package Flink 1.15 parent and flink-client alone are successful.
Re: Failing to compile Flink 1.9 with Scala 2.12
At least for Flink 1.15, it's recommended to use maven 3.2.5. So I guess maybe you can try use a lower version of maven. Best regards, Yuxia 发件人: "Milind Vaidya" 收件人: "Weihua Hu" 抄送: "User" 发送时间: 星期五, 2022年 8 月 19日 上午 1:26:45 主题: Re: Failing to compile Flink 1.9 with Scala 2.12 Hi Weihua, Thanks for the update. I do understand that, but right now it is not possible to update immediately to 1.15, so wanted to know what is the way out. - Milind On Thu, Aug 18, 2022 at 7:19 AM Weihua Hu < [ mailto:huweihua@gmail.com | huweihua@gmail.com ] > wrote: Hi Flink 1.9 is not updated since 2020-04-24, it's recommended to use the latest stable version 1.15.1. Best, Weihua On Thu, Aug 18, 2022 at 5:36 AM Milind Vaidya < [ mailto:kava...@gmail.com | kava...@gmail.com ] > wrote: BQ_BEGIN Hi Trying to compile and build Flink jars based on Scala 2.12. Settings : Java 8 Maven 3.6.3 / 3.8.6 Many online posts suggest using Java 8 which is already in place. Building using Jenkins. Any clues as to how to get rid of it? net.alchim31.maven scala-maven-plugin 3.3.2 -nobootcp -Xss2m Exception : Failed to execute goal net.alchim31.maven:scala-maven-plugin:3.3.2:compile (scala-compile-first) on project flink-table-planner-blink_2.12: wrap: org.apache.commons.exec.ExecuteException: Process exited with an error: 1 (Exit value: 1) -> [Help 1] org.apache.maven.lifecycle.LifecycleExecutionException: Failed to execute goal net.alchim31.maven:scala-maven-plugin:3.3.2:compile (scala-compile-first) on project flink-table-planner-blink_2.12: wrap: org.apache.commons.exec.ExecuteException: Process exited with an error: 1 (Exit value: 1) at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:212) at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:153) at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:145) at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:116) at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:80) at org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build(SingleThreadedBuilder.java:51) at org.apache.maven.lifecycle.internal.LifecycleStarter.execute(LifecycleStarter.java:128) at org.apache.maven.DefaultMaven.doExecute(DefaultMaven.java:307) at org.apache.maven.DefaultMaven.doExecute(DefaultMaven.java:193) at org.apache.maven.DefaultMaven.execute(DefaultMaven.java:106) at org.apache.maven.cli.MavenCli.execute(MavenCli.java:863) at org.apache.maven.cli.MavenCli.doMain(MavenCli.java:288) at org.apache.maven.cli.MavenCli.main(MavenCli.java:199) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced(Launcher.java:289) at org.codehaus.plexus.classworlds.launcher.Launcher.launch(Launcher.java:229) at org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode(Launcher.java:415) at org.codehaus.plexus.classworlds.launcher.Launcher.main(Launcher.java:356) Caused by: org.apache.maven.plugin.MojoExecutionException: wrap: org.apache.commons.exec.ExecuteException: Process exited with an error: 1 (Exit value: 1) at scala_maven.ScalaMojoSupport.execute(ScalaMojoSupport.java:593) at org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo(DefaultBuildPluginManager.java:134) at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:207) ... 20 more Caused by: org.apache.commons.exec.ExecuteException: Process exited with an error: 1 (Exit value: 1) at org.apache.commons.exec.DefaultExecutor.executeInternal(DefaultExecutor.java:377) at org.apache.commons.exec.DefaultExecutor.execute(DefaultExecutor.java:160) at org.apache.commons.exec.DefaultExecutor.execute(DefaultExecutor.java:147) at scala_maven_executions.JavaMainCallerByFork.run(JavaMainCallerByFork.java:89) at scala_maven.ScalaCompilerSupport.compile(ScalaCompilerSupport.java:161) at scala_maven.ScalaCompilerSupport.doExecute(ScalaCompilerSupport.java:99) at scala_maven.ScalaMojoSupport.execute(ScalaMojoSupport.java:585) BQ_END
Re: get state from window
Sorry for misleading. After some investigation, seems UDTAGG can only used in flink table spi. Best regards, Yuxia - 原始邮件 - 发件人: "yuxia" 收件人: "user-zh" 抄送: "User" 发送时间: 星期四, 2022年 8 月 18日 上午 10:21:12 主题: Re: get state from window > does flink sql support UDTAGG? Yes, Flink sql support UDTAGG. Best regards, Yuxia - 原始邮件 - 发件人: "曲洋" 收件人: "user-zh" , "User" 发送时间: 星期四, 2022年 8 月 18日 上午 10:03:24 主题: get state from window Hi dear engineers, I have one question: does flink streaming support getting the state.I override the open method in the map processor,initializing Hi dear engineers, One small question: does flink sql support UDTAGG? (user-defined table aggregate function), seems only supported in flink table api? If not supported in flink sql, how can I define an aggregated udf which could output multiple rows to kafka. Thanks for your help! Thanks for your help!
Re: get state from window
> does flink sql support UDTAGG? Yes, Flink sql support UDTAGG. Best regards, Yuxia - 原始邮件 - 发件人: "曲洋" 收件人: "user-zh" , "User" 发送时间: 星期四, 2022年 8 月 18日 上午 10:03:24 主题: get state from window Hi dear engineers, I have one question: does flink streaming support getting the state.I override the open method in the map processor,initializing Hi dear engineers, One small question: does flink sql support UDTAGG? (user-defined table aggregate function), seems only supported in flink table api? If not supported in flink sql, how can I define an aggregated udf which could output multiple rows to kafka. Thanks for your help! Thanks for your help!
Re: without DISTINCT unique lines show up many times in FLINK SQL
Seems it's the same problem to the problem discussed in [1] [1]:https://lists.apache.org/thread/3lvkd8hryb1zdxs3o8z65mrjyoqzs88l Best regards, Yuxia - 原始邮件 - 发件人: "Marco Villalobos" 收件人: "User" 发送时间: 星期三, 2022年 8 月 17日 下午 12:56:44 主题: without DISTINCT unique lines show up many times in FLINK SQL Hello everybody, When I perform this simple set of queries, a unique line from the source file shows up many times. I have verified many times that a unique line in the source shows up as much as 100 times in the select statement. Is this the correct behavior for Flink 1.15.1? FYI, it does show the correct results when I perform a DISTINCT query. Here is the SQL: CREATE TABLE historical_raw_source_template( `file.path` STRING NOT NULL METADATA, `file.name` STRING NOT NULL METADATA, `file.size` BIGINT NOT NULL METADATA, `file.modification-time` TIMESTAMP_LTZ(3) NOT NULL METADATA, lineSTRING ) WITH ( 'connector' = 'filesystem', -- required: specify the connector 'format' = 'raw' -- required: file system connector requires to specify a format ); CREATE TABLE historical_raw_source WITH ( 'path' = 's3://raw/' -- required: path to a directory ) LIKE historical_raw_source_template; SELECT `file.modification-time` AS modification_time, `file.path` AS file_path, line FROM historical_raw_source
Re: Is this a Batch SQL Bug?
Thanks for raising it. Yes, you're right. It's indeed a bug. The problem is the RowData produced by LineBytesInputFormat is reused, but DeserializationSchemaAdapter#Reader only do shallow copy of the produced data, so that the finnal result will always be the last row value. Could you please help create a jira to track it? Best regards, Yuxia - 原始邮件 - 发件人: "Marco Villalobos" 收件人: "User" 发送时间: 星期四, 2022年 8 月 18日 上午 6:08:33 主题: Is this a Batch SQL Bug? Given this program: ```java package mvillalobos.bug; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import static org.apache.flink.table.api.Expressions.$; public class IsThisABatchSQLBug { public static void main(String[] args) { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.BATCH); final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); tableEnv.executeSql("CREATE TABLE historical_raw_source_template(\n" + "`file.path` STRING NOT NULL METADATA,\n" + "`file.name` STRING NOT NULL METADATA,\n" + "`file.size` BIGINT NOT NULL METADATA,\n" + "`file.modification-time` TIMESTAMP_LTZ(3) NOT NULL METADATA,\n" + "lineSTRING\n" + " ) WITH (\n" + "'connector' = 'filesystem', \n" + "'format' = 'raw'\n" + " );"); tableEnv.executeSql("CREATE TABLE historical_raw_source\n" + " WITH (\n" + "'path' = '/Users/minmay/dev/mvillalobos/historical/data'\n" + " ) LIKE historical_raw_source_template;"); final TableResult output = tableEnv.from("historical_raw_source").select($("line")).execute(); output.print(); } } ``` and this sample.csv file in the '/Users/minmay/dev/mvillalobos/historical/data' directory: ```text one two three four five six seven eight nine ten ``` The print results are: ```text +++ | +I |ten | | +I |ten | | +I |ten | | +I |ten | | +I |ten | | +I |ten | | +I |ten | | +I |ten | | +I |ten | | +I |ten | +++ 10 rows in set ``` I was expecting all rows to print. If this is not a bug, then what am I misunderstanding? I do noticre that the transient field: private transient RecordCollector collector; in org.apache.flink.connector.file.table.DeserializationSchemaAdapter.LineBytesInputFormat becomes empty on each iteration, as though it failed to serialize correctly. Regardless, I don't know what's wrong. Any advice would deeply help. Marco A. Villalobos
Re: Metrics & Monitoring in Flink SQL
I'm afraid of there's no document for it. But there's a FLIP[1] define the metrics that the connector (source / sink ) should expose. And I think the offical connectors will cover most of these metrics if possbile. [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics Best regards, Yuxia 发件人: "casel.chen" 收件人: "yuxia" 抄送: "Salva Alcántara" , "User" 发送时间: 星期日, 2022年 7 月 17日 下午 12:00:11 主题: Re:Re: Metrics & Monitoring in Flink SQL How to get all metrics of those connectors shipped inline with flink release? any document? At 2022-07-13 11:05:43, "yuxia" wrote: With Flink SQL, You can define your own source/sink metrics [1], but you can't define the metrics for the intermediate operators. [1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sourcessinks/ Best regards, Yuxia 发件人: "Salva Alcántara" 收件人: "User" 发送时间: 星期三, 2022年 7 月 13日 上午 1:33:30 主题: Metrics & Monitoring in Flink SQL I have a question regarding Flink SQL, which I'm lately getting into. So far, my experience is with the DataStream API mostly. In that context, it's easy for me to generate metrics for my operators. However, I'm just wondering which level of control there is regarding monitoring & metrics when working with Flink SQL. Is it possible to define "metrics for your queries"? Whatever that means and assuming that it makes any sense :laughing:. At least I should be able to generate typical metrics for common connectors, e.g., messages read/produced & things like accumulated lag for the case of the kafka connector, to put an example. Sorry for the vagueness, but I could not find a section for metrics & monitoring within the Flink SQL docs. Any guidance would be appreciated! Salva
Re: Metrics & Monitoring in Flink SQL
With Flink SQL, You can define your own source/sink metrics [1], but you can't define the metrics for the intermediate operators. [1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sourcessinks/ Best regards, Yuxia 发件人: "Salva Alcántara" 收件人: "User" 发送时间: 星期三, 2022年 7 月 13日 上午 1:33:30 主题: Metrics & Monitoring in Flink SQL I have a question regarding Flink SQL, which I'm lately getting into. So far, my experience is with the DataStream API mostly. In that context, it's easy for me to generate metrics for my operators. However, I'm just wondering which level of control there is regarding monitoring & metrics when working with Flink SQL. Is it possible to define "metrics for your queries"? Whatever that means and assuming that it makes any sense :laughing:. At least I should be able to generate typical metrics for common connectors, e.g., messages read/produced & things like accumulated lag for the case of the kafka connector, to put an example. Sorry for the vagueness, but I could not find a section for metrics & monitoring within the Flink SQL docs. Any guidance would be appreciated! Salva
Re: Parsing a JSON array string as a Flink SQL Array data type
I'm afraid of there's no build-in function on the hand. But you can write a UDF[1] to convert the JSON array string to Flink's array. [1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/ Best regards, Yuxia - 原始邮件 - 发件人: "Abhishek Rai" 收件人: "User" , "Jordan Hannel" 发送时间: 星期三, 2022年 7 月 13日 上午 3:02:15 主题: Parsing a JSON array string as a Flink SQL Array data type Hello! I'm trying to use the new JSON functions in Flink 1.15 to parse JSON input data. In particular, using JSON_QUERY, I'm able to extract out JSON array elements from a larger JSON record. However, this function returns the JSON array as a string. I'd like to run this array through the SQL UNNEST operator, which takes an ARRAY data type as an input, not a string. So how do I convert a string encoded JSON array to a Flink SQL Array data type, so I can use it with UNNEST? Unfortunately, I'm not able to locate any documentation on this. Thanks for your help! Abhishek
Re: Does Table API connector, csv, has some option to ignore some columns
For Json format, you only need to define the parital columns to be selected in Flink DDL. But for csv format, it's not supported. In csv file, if there's no header, how can you mapping the incomplete columns defined in Flink DDL to the origin fields in the csv file? Thus, you need to write the all columns so that we can do the mapping. If there's a header, we can do the mapping, and it should meet your requirement. However, the current implementation haven't consider such case. Best regards, Yuxia 发件人: "podunk" 收件人: "User" 发送时间: 星期二, 2022年 7 月 12日 下午 5:13:05 主题: Re: Re: Does Table API connector, csv, has some option to ignore some columns This is really surprising. When you import data from a file, you really rarely need to import everything from that file. Most often it is several columns. So the program that reads the file should be able to do this - this is the ABC of working with data. Often the suggestion is "you can write your script". Sure. I can. I can write the entire program here - from scratch. But I use a ready-made program to avoid writing my scripts. Sent: Tuesday, July 12, 2022 at 12:24 AM From: "Alexander Fedulov" To: pod...@gmx.com Cc: "user" Subject: Re: Re: Does Table API connector, csv, has some option to ignore some columns Hi podunk, no, this is currently not possible: > Currently, the CSV schema is derived from table schema. [1] So the Table schema is used to define how Jackson CSV parses the lines and hence needs to be complete. [1] [ https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/csv/ | https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/csv/ ] Best, Alexander Fedulov On Mon, Jul 11, 2022 at 5:43 PM < [ mailto:pod...@gmx.com | pod...@gmx.com ] > wrote: No, I did not mean. I said 'Does Table API connector, CSV, has some option to ignore some columns in source file?' Sent: Monday, July 11, 2022 at 5:28 PM From: "Xuyang" < [ mailto:xyzhong...@163.com | xyzhong...@163.com ] > To: [ mailto:pod...@gmx.com | pod...@gmx.com ] Cc: [ mailto:user@flink.apache.org | user@flink.apache.org ] Subject: Re:Re: Does Table API connector, csv, has some option to ignore some columns Hi, did you mean `insert into table1 select col1, col2, col3 ... from table2`? If this doesn't meet your requirement, what about using UDF to custom what you want in runtime. -- Best! Xuyang 在 2022-07-11 16:10:00, [ mailto:pod...@gmx.com | pod...@gmx.com ] 写道: BQ_BEGIN I want to control what I insert in table not what I get from table. Sent: Monday, July 11, 2022 at 3:37 AM From: "Shengkai Fang" < [ mailto:fskm...@gmail.com | fskm...@gmail.com ] > To: [ mailto:pod...@gmx.com | pod...@gmx.com ] Cc: "user" < [ mailto:user@flink.apache.org | user@flink.apache.org ] > Subject: Re: Does Table API connector, csv, has some option to ignore some columns Hi. In Flink SQL, you can select the column that you wants in the query. For example, you can use ``` SELECT col_a, col_b FROM some_table; ``` Best, Shengkai < [ mailto:pod...@gmx.com | pod...@gmx.com ] > 于2022年7月9日周六 01:48写道: BQ_BEGIN Does Table API connector, CSV, has some option to ignore some columns in source file? For instance read only first, second, nine... but not the others? Or any other trick? CREATE TABLE some_table ( some_id BIGINT , ... ) WITH ( 'format' = 'csv' , ... ) BQ_END BQ_END
Re: How can I convert a DataSet into a Table?
I'm afraid we have no way to do such conversion in Flink 1.15. But for you case, which is to read from csv file in table api. You can try as follows: tableEnv.createTemporaryTable(" csvInput ", TableDescriptor.forConnector("filesystem") .schema(schema) .option("path", "/path/to/file") .format(FormatDescriptor.forFormat("csv") .option("field-delimiter", "|") .build()) .build()) Table table1 = tEnv.from(" csvInput ").xxx See more in the Flink doc[1] [1]: [ https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/common/#table-api | https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/common/#table-api ] Best regards, Yuxia 发件人: "podunk" 收件人: "User" 发送时间: 星期三, 2022年 7 月 06日 上午 5:09:54 主题: How can I convert a DataSet into a Table? My code is: package flinkTest2; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; public class flinkTest2 { public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // read a CSV file with five fields, taking only two of them DataSet> csvInput = env.readCsvFile("c:/CSV/file") .includeFields("10010") // take the first and the fourth field .types(String.class, Double.class); //register and create table EnvironmentSettings settings = EnvironmentSettings .newInstance() //.inStreamingMode() .inBatchMode() .build(); TableEnvironment tEnv = TableEnvironment.create(settings); //Insert CSV content into table, define column names and read some rows from it } } What to do create table, insert DataSet csvInput into table and read some rows from it (into text file)? Thanks for help Mike
Re: How can I convert a DataSet into a Table?
What's the version of Flink you are using? In Flink 1.13, you can use BatchTableEnvironment#fromDataSet() to do that. But since Flink 1.14, the method has been removed. [1] https://nightlies.apache.org/flink/flink-docs-release-1.13/api/java/org/apache/flink/table/api/bridge/java/BatchTableEnvironment.html#fromDataSet-org.apache.flink.api.java.DataSet-java.lang.String- Best regards, Yuxia 发件人: pod...@gmx.com 收件人: "User" 发送时间: 星期三, 2022年 7 月 06日 上午 5:09:54 主题: How can I convert a DataSet into a Table? My code is: package flinkTest2; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; public class flinkTest2 { public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // read a CSV file with five fields, taking only two of them DataSet> csvInput = env.readCsvFile("c:/CSV/file") .includeFields("10010") // take the first and the fourth field .types(String.class, Double.class); //register and create table EnvironmentSettings settings = EnvironmentSettings .newInstance() //.inStreamingMode() .inBatchMode() .build(); TableEnvironment tEnv = TableEnvironment.create(settings); //Insert CSV content into table, define column names and read some rows from it } } What to do create table, insert DataSet csvInput into table and read some rows from it (into text file)? Thanks for help Mike
Re: ContinuousFileMonitoringFunction retrieved invalid state.
I'm not sure why it happened. But from the Flink source code, it seems try to restore from an invalid state. Seems the state actually contains more that one value, but Flink expected the state should contains one or zero value. Best regards, Yuxia 发件人: "Vishal Surana" 收件人: "User" 发送时间: 星期五, 2022年 7 月 01日 上午 5:28:07 主题: ContinuousFileMonitoringFunction retrieved invalid state. My job is unable to restore state after savepoint due to the following exception. Seems to be a rare exception as I haven't found any forum discussing it. Please advise. java.lang.IllegalArgumentException: ContinuousFileMonitoringFunction retrieved invalid state. at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138) ~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT] at org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.initializeState(ContinuousFileMonitoringFunction.java:167) ~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT] at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:189) ~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT] at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:171) ~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT] at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:94) ~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT] at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122) ~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT] at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:286) ~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106) ~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700) ~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100) ~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676) ~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643) ~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT] at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) ~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT] at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917) ~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) ~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) ~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT] at java.lang.Thread.run(Thread.java:829) ~[?:?] -- Regards, Vishal
Re: StreamingFileSink & checkpoint tuning
Streaming file sink will write to s3 when processing element. But it's just temporary file. Only after one successful checkpoint (more exactly, once recieve a notification for successful checkpoint), will it commit these temporary files written since last successful checkpoint . Best regards, Yuxia 发件人: "Xin Ma" 收件人: "User" 发送时间: 星期四, 2022年 6 月 30日 下午 11:05:51 主题: StreamingFileSink & checkpoint tuning Hi, I recently encountered an issue while using StreamingFileSink. I have a flink job consuming records from various sources and write to s3 with streaming file sink. But the job sometimes fails due to checkpoint timeout, and the root cause is checkpoint alignment failure as there is data skewness between different data sources. I don't want to enable unaligned checkpointing but prefer to do some checkpoint tuning first. My current checkpoint interval is 1 min and timeout is also 1 min. I wanna increase tolerable checkpoint failure number to 5, as I believe the unaligned subtasks will definitely update their watermark in 5 minutes. My question is, will streaming file sink still writes to s3 even if the checkpoint fails or just wait until next successful checkpoint? (as if we don't tolerate checkpoint failure, the job will simply restart from last successful checkpoint) Thanks. Best, Kevin
Re: The methodlogy behind the join in Table API and Datastream
> any way I can both receive the message of both update. I think you may need outer join[1] [1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#outer-equi-join Best regards, Yuxia 发件人: "lan tran" 收件人: "User" 发送时间: 星期三, 2022年 6 月 29日 下午 6:04:30 主题: The methodlogy behind the join in Table API and Datastream Hi team, I have the question about the methodology behind the joining using SQL-Client and DataStream. I have some scenario like this: I have two tables: t1 and t2 and I consume the WAL log from it and send to Kafka. Next, I will join two tables above together and convert this table in changelog stream. Therefore, if one of the tables is updated, there will be the messages. This is how it works if I use the SQL-Client to join two tables together. However, according to the doc since DataStream runs behind the background of Table API, I wonder what it will be looked like if I use DataStream instead of Table API. In Datastream API, I currently using connect to join two stream. And convert t2 into broadcast Stream and t1 as the main stream. When I update the t1 -> there is the output of the updated record but when I update t2, there is no update for the broadcast state (even though it update in state). Therefore, is there any way I can both receive the message of both update ? Do I have to save state for the t1 (main stream) or I have to change the way I joined ? Best, Quynh Sent from [ https://go.microsoft.com/fwlink/?LinkId=550986 | Mail ] for Windows
Re: Overhead on managing Timers with large number of keys
The short answer is yes. In any case, flink wil spend time/cpu to invoke the timer. Best regards, Yuxia 发件人: "Surendra Lalwani" 收件人: "User" 发送时间: 星期三, 2022年 6 月 29日 下午 3:52:32 主题: Overhead on managing Timers with large number of keys Hi Team, I am working on the States and using KeyedStreams and Process Functions. I want to store the number of customers in the state and also I am registering onTimer for all the customer_ids. I wanted to understand if we register something around 70 Million Keys and we have onTimer registered for those keys. Will there be any overhead and if yes what will be the overhead, will it impact processing time? Thanks and Regards , Surendra Lalwani IMPORTANT NOTICE: This e-mail, including any attachments, may contain confidential information and is intended only for the addressee(s) named above. If you are not the intended recipient(s), you should not disseminate, distribute, or copy this e-mail. Please notify the sender by reply e-mail immediately if you have received this e-mail in error and permanently delete all copies of the original message from your system. E-mail transmission cannot be guaranteed to be secure as it could be intercepted, corrupted, lost, destroyed, arrive late or incomplete, or contain viruses. Company accepts no liability for any damage or loss of confidential information caused by this email or due to any virus transmitted by this email or otherwise.
Re: How to make current application cdc
> I mean CDC should be handled on the Kafka side. What do you mean about that? Do you mean the the Kafka should store the message with the cdc format like debezium[1], Canal[2], MaxWell[3], OGG[4]? > Or should I need to use Table API I'm afraid not. Seems you can still use Flink Datastream API as Table API makes no difference for your case. BTW, you can try flink cdc [5] [1] [ https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/debezium/ | https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/debezium/ ] [2] [ https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/canal/ | https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/canal/ ] [3] [ https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/maxwell/ | https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/maxwell/ ] [4] [ https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/ogg/ | https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/ogg/ ] [5] [ https://ververica.github.io/flink-cdc-connectors/ | https://ververica.github.io/flink-cdc-connectors/ ] Best regards, Yuxia 发件人: "Sid" 收件人: "User" 发送时间: 星期六, 2022年 6 月 25日 下午 6:32:22 主题: How to make current application cdc Hello, I have a current flow where the data from the Flink-Kafka connector is captured and processed using Flink Datastream API and stored in Kafka topics. However, I would like to make it CDC enabled. I went through an article where it was mentioned that it should be handled on the Kafka side while capturing the data. I mean CDC should be handled on the Kafka side. Or should I need to use Table API? So, any ideas/links are much appreciated as I am trying to understand these concepts. TIA, Sid
Re: Apache Flink - Reading data from Scylla DB
Seems you may need implement a custom connector for Scylla DB as I haven't found a connector on hand. Hope the doc[1][2] can help you implement your own connector. [1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sourcessinks/ [2] https://flink.apache.org/2021/09/07/connector-table-sql-api-part1.html Best regards, Yuxia 发件人: "Himanshu Sareen" 收件人: "User" 发送时间: 星期二, 2022年 6 月 14日 上午 11:29:38 主题: Apache Flink - Reading data from Scylla DB Team, I'm looking for a solution to Consume/Read data from Scylla DB into Apache Flink. If anyone can guide me or share pointers it will be helpful. Regards, Himanshu
Re: Could not find a suitable table factory for 'org.apache.flink.table.planner.delegation.ParserFactory' in the classpath.
Have you ever unzip your project jar and make sure the class HiveParserFactory exist? Best regards, Yuxia 发件人: "顾斌杰" 收件人: luoyu...@alumni.sjtu.edu.cn 抄送: "User" 发送时间: 星期三, 2022年 6 月 08日 下午 5:11:33 主题: Re: Could not find a suitable table factory for 'org.apache.flink.table.planner.delegation.ParserFactory' in the classpath. can refer to this: [ https://stackoverflow.com/questions/52500048/flink-could-not-find-a-suitable-table-factory-for-org-apache-flink-table-facto | https://stackoverflow.com/questions/52500048/flink-could-not-find-a-suitable-table-factory-for-org-apache-flink-table-facto ] On 6/8/2022 16:04 , [ mailto:luoyu...@alumni.sjtu.edu.cn | yuxia ] wrote: Have you ever put the flink-sql-connector-hive into you FLINK_HOME/lib? And make sure your JM/TM also contains the jar. Best regards, Yuxia 发件人: "顾斌杰" 收件人: "User" 发送时间: 星期三, 2022年 6 月 08日 下午 3:19:19 主题: Re: Could not find a suitable table factory for 'org.apache.flink.table.planner.delegation.ParserFactory' in the classpath. The following is part of the code : String createKafkaSql = "create table if not exists x" + "(x\n" + ",update_time timestamp(3) comment '11'\n" + ",watermark for update_time as update_time - interval '20' second)\n" + "with ('connector' = 'kafka'\n" + ",'topic' = '" + topic + "'\n" + ",'properties.bootstrap.servers' = '" + bootstrapServers + "'\n" + ",'properties.group.id' = 'flink_sql_tyc_company_info'\n" + ",'scan.startup.mode' = 'earliest-offset'\n" + ",'format' = 'json','json.fail-on-missing-field' = 'false','json.ignore-parse-errors' = 'true')"; tEnv.executeSql(createKafkaSql); tEnv.getConfig().setSqlDialect(SqlDialect.HIVE); String CreateHiveSql = "create table if not exists " + "()\n" + "partitioned by (op_day string comment '111')\n" + "stored as orc\n" + "tblproperties('partition.time-extractor.timestamp-pattern'='$op_day'\n" + ",'sink.partition-commit.trigger'='partition-time'\n" + ",'sink.partition-commit.delay'='1h'\n" + ",'sink.partition-commit.policy.kind'='metastore,success-file')"; tEnv.executeSql(CreateHiveSql); tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); String insert = "insert into x\n" + "select `x" + ",date_format(update_time,'-MM-dd')\n" + "from x"; tEnv.executeSql(insert); On 6/8/2022 15:14 , [ mailto:binjie...@paat.com | 顾斌杰 ] wrote: BQ_BEGIN Flink version: 1.13 When executed in the local environment (windows), there is no exception. When starting the project with flink web ui, I get the following error: Server Response: org.apache.flink.runtime.rest.handler.RestHandlerException: Could not execute application. at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$1(JarRunHandler.java:108) at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1595) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.util.concurrent.CompletionException: org.apache.flink.util.FlinkRuntimeException: Could not execute application. at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592) ... 7 more Caused by: org.apache.flink.util.FlinkRuntimeException: Could not execute application. at org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:88) at org.apache.flink.client.
Re: Could not find a suitable table factory for 'org.apache.flink.table.planner.delegation.ParserFactory' in the classpath.
Have you ever put the flink-sql-connector-hive into you FLINK_HOME/lib? And make sure your JM/TM also contains the jar. Best regards, Yuxia 发件人: "顾斌杰" 收件人: "User" 发送时间: 星期三, 2022年 6 月 08日 下午 3:19:19 主题: Re: Could not find a suitable table factory for 'org.apache.flink.table.planner.delegation.ParserFactory' in the classpath. The following is part of the code : String createKafkaSql = "create table if not exists x" + "(x\n" + ",update_time timestamp(3) comment '11'\n" + ",watermark for update_time as update_time - interval '20' second)\n" + "with ('connector' = 'kafka'\n" + ",'topic' = '" + topic + "'\n" + ",'properties.bootstrap.servers' = '" + bootstrapServers + "'\n" + ",'properties.group.id' = 'flink_sql_tyc_company_info'\n" + ",'scan.startup.mode' = 'earliest-offset'\n" + ",'format' = 'json','json.fail-on-missing-field' = 'false','json.ignore-parse-errors' = 'true')"; tEnv.executeSql(createKafkaSql); tEnv.getConfig().setSqlDialect(SqlDialect.HIVE); String CreateHiveSql = "create table if not exists " + "()\n" + "partitioned by (op_day string comment '111')\n" + "stored as orc\n" + "tblproperties('partition.time-extractor.timestamp-pattern'='$op_day'\n" + ",'sink.partition-commit.trigger'='partition-time'\n" + ",'sink.partition-commit.delay'='1h'\n" + ",'sink.partition-commit.policy.kind'='metastore,success-file')"; tEnv.executeSql(CreateHiveSql); tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); String insert = "insert into x\n" + "select `x" + ",date_format(update_time,'-MM-dd')\n" + "from x"; tEnv.executeSql(insert); On 6/8/2022 15:14 , [ mailto:binjie...@paat.com | 顾斌杰 ] wrote: Flink version: 1.13 When executed in the local environment (windows), there is no exception. When starting the project with flink web ui, I get the following error: Server Response: org.apache.flink.runtime.rest.handler.RestHandlerException: Could not execute application. at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$1(JarRunHandler.java:108) at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1595) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.util.concurrent.CompletionException: org.apache.flink.util.FlinkRuntimeException: Could not execute application. at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592) ... 7 more Caused by: org.apache.flink.util.FlinkRuntimeException: Could not execute application. at org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:88) at org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:70) at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:102) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) ... 7 more Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Could not find a suitable table factory for 'org.apache.flink.table.planner.delegation.ParserFactory' in the classpath. Reason: Required context properties mismatch. The following properties are requested: table.sql-dialect=hive The following factories have been considered: org.apache.flink.table.planner.delegation.Defau
Re: slack invite link
I have send the invitation to the email address shmily...@gmail.com. Please check you email! Look forward your joining. Best regards, Yuxia 发件人: "shmily" 收件人: "User" 发送时间: 星期日, 2022年 6 月 05日 下午 4:55:11 主题: slack invite link hi, can someone please send me a slack invite link, the one provided by the community has expired~ many thanks!
Re: Can we resume a job from a savepoint from Java api?
Hope the unit test SavepointITCase#testCanRestoreWithModifiedStatelessOperators[1] in Flink repo can help you. [1] [ https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java#L1228 | https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java#L1228 ] Best regards, Yuxia 发件人: "Qing Lim" 收件人: "User" 发送时间: 星期三, 2022年 6 月 01日 下午 7:46:59 主题: Can we resume a job from a savepoint from Java api? Hi, is it possible to resume a job from a savepoint in Java code? I wish to test failure recovery in my test code, I am thinking to simulate failure recovery by saving state to a save point and the recover from it, is this possible with local MiniCluster setup? Kind regards This e-mail and any attachments are confidential to the addressee(s) and may contain information that is legally privileged and/or confidential. If you are not the intended recipient of this e-mail you are hereby notified that any dissemination, distribution, or copying of its content is strictly prohibited. If you have received this message in error, please notify the sender by return e-mail and destroy the message and all copies in your possession. To find out more details about how we may collect, use and share your personal information, please see [ https://www.mwam.com/privacy-policy | https://www.mwam.com/privacy-policy ] . This includes details of how calls you make to us may be recorded in order for us to comply with our legal and regulatory obligations. To the extent that the contents of this email constitutes a financial promotion, please note that it is issued only to and/or directed only at persons who are professional clients or eligible counterparties as defined in the FCA Rules. Any investment products or services described in this email are available only to professional clients and eligible counterparties. Persons who are not professional clients or eligible counterparties should not rely or act on the contents of this email. Marshall Wace LLP is authorised and regulated by the Financial Conduct Authority. Marshall Wace LLP is a limited liability partnership registered in England and Wales with registered number OC302228 and registered office at George House, 131 Sloane Street, London, SW1X 9AT. If you are receiving this e-mail as a client, or an investor in an investment vehicle, managed or advised by Marshall Wace North America L.P., the sender of this e-mail is communicating with you in the sender's capacity as an associated or related person of Marshall Wace North America L.P. ("MWNA"), which is registered with the US Securities and Exchange Commission ("SEC") as an investment adviser. Registration with the SEC does not imply that MWNA or its employees possess a certain level of skill or training.
Re: Status of File Sink Common (flink-file-sink-common)
I'm afraid not. I can still find it in main repository[1]. [1] https://github.com/apache/flink/tree/master/flink-connectors/flink-file-sink-common Best regards, Yuxia - 原始邮件 - 发件人: "Jun Qin" 收件人: "User" 发送时间: 星期二, 2022年 5 月 31日 上午 5:24:10 主题: Status of File Sink Common (flink-file-sink-common) Hi, Has File Sink Common (flink-file-sink-common) been dropped? If so, since which version? I do not seem to find anything related in the release notes of 1.13.x, 1.14.x and 1.15.0. Thanks Jun
Re: Large backpressure and slow checkpoints in StateFun
May be you can use jstack or flame graph to analyze what's the bottleneck. BTW, about generating flame graph, arthas[1] is a good tool. [1] https://github.com/alibaba/arthas Best regards, Yuxia 发件人: "Christopher Gustafson" 收件人: "User" 发送时间: 星期一, 2022年 5 月 30日 下午 2:29:19 主题: Large backpressure and slow checkpoints in StateFun Hi, I am running some benchmarks using StateFun and have encountered a problem with backpressure and slow checkpoints that I can't figure out the reason for, and was hoping that someone might have an idea of what is causing it. My setup is the following: I am running the Shopping Cart application from the StateFun playground. The job is submitted as an uber jar to an existing Flink Cluster with 3 TaskManagers and 1 JobManager. The functions are served using the Undertow example from the documentation and I am using Kafka ingresses and egresses. My workload is only at 1000 events/s. Everything is run in separate GCP VMs. The issue is with very long checkpoints, which I assume is caused by a backpressured ingress caused by the function dispatcher operator not being able to handle the workload. The only thing that has helped so far is to increase the parallelism of the job, but it feels like the still is some other bottleneck that is causing the issues. I have seen other benchmarks reaching much higher throughput than 1000 events/s, without more CPU or memory resources than I am using. Any ideas of bottlenecks or ways to figure them out are greatly appreciated. Best Regards, Christopher Gustafson
Re: Exception when running Java UDF with Blink table planner
It seems an exception thrown when Flink try to deserialize the object outputed by your udf. So is the obejct produced by your udf serializable? Does it contain any lambda function in the object/class? Best regards, Yuxia 发件人: "Tom Thornton" 收件人: "User" 发送时间: 星期五, 2022年 5 月 27日 上午 6:47:04 主题: Exception when running Java UDF with Blink table planner We are migrating from the legacy table planner to the Blink table planner. Previously we had a UDF defined like this that worked without issue: public class ListToString extends DPScalarFunction { public String eval (List list) { return "foo" ; } Since moving to the Blink table planner and receiving this error: Caused by: org.apache.flink.table.api.ValidationException: Given parameters of function 'ListToString' do not match any signature. Actual: (java.lang.String[]) Expected: (java.util.List) We refactored the UDF to take as input an Object[] to match what is received from Blink: public class ListToString extends DPScalarFunction { public String eval (Object[] arr) { return "foo" ; } } Now the UDF always fails (including for the simplified example above where we return a constant string regardless of input). For example, when we run on a query like this one: SELECT ListToString(`col1`) as col1_string FROM `table` Produces an IndexOutOfBoundsException: Caused by: java.lang.IndexOutOfBoundsException: Index 115 out of bounds for length 0 at java.base/jdk.internal.util.Preconditions.outOfBounds(Preconditions.java:64) at java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Preconditions.java:70) at java.base/jdk.internal.util.Preconditions.checkIndex(Preconditions.java:248) at java.base/java.util.Objects.checkIndex(Objects.java:372) at java.base/java.util.ArrayList.get(ArrayList.java:459) at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42) at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:354) at org.apache.flink.util.InstantiationUtil.deserializeFromByteArray(InstantiationUtil.java:570) at org.apache.flink.table.data.binary.BinaryRawValueData.toObject(BinaryRawValueData.java:64) at org.apache.flink.table.data.util.DataFormatConverters$GenericConverter.toExternalImpl(DataFormatConverters.java:700) at org.apache.flink.table.data.util.DataFormatConverters$GenericConverter.toExternalImpl(DataFormatConverters.java:683) at org.apache.flink.table.data.util.DataFormatConverters.arrayDataToJavaArray(DataFormatConverters.java:1175) at org.apache.flink.table.data.util.DataFormatConverters.access$200(DataFormatConverters.java:104) at org.apache.flink.table.data.util.DataFormatConverters$ObjectArrayConverter.toExternalImpl(DataFormatConverters.java:1128) at org.apache.flink.table.data.util.DataFormatConverters$ObjectArrayConverter.toExternalImpl(DataFormatConverters.java:1070) at org.apache.flink.table.data.util.DataFormatConverters$DataFormatConverter.toExternal(DataFormatConverters.java:406) at StreamExecCalc$337.processElement(Unknown Source) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:757) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:732) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:712) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) at SourceConversion$328.processElement(Unknown Source) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:757) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:732) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:712) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:757) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:732) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:712) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) at org.apache.fl
Re: length value for some classes extending LogicalType.
IMO, the behaviors depends on how you convert your string data from extern system to Flink's intern data or, conversely. I think it's more like a hint to tell how to convert the string data between extern system including source and sink. Best regards, Yuxia 发件人: "Krzysztof Chmielewski" 收件人: "User" 发送时间: 星期三, 2022年 5 月 25日 下午 5:29:10 主题: length value for some classes extending LogicalType. Hi, some classes extending LogicalType.java such as VarCharType, BinaryType, CharType and few others have an optional argument "length". If not specified, length is set to default value which is 1. I would like to ask, what are the implications of that? What can happen if I use the default length value 1 but the actual length of the data will be bigger than 1? For example: RowType.of("col1", new CharType()) <- this will use default length value 1. Regards, Krzysztof Chmielewski
Re: OutputTag alternative with pyflink 1.15.0
Yes, you're right. Hopefully, the master branch supported it [1]. But It haven't been released. If you want to use output tag in python in 1.15, you can apply this patch[1] to your Flink 1.15 and build it by yourself[3]. BTW, if you don't want to bother to build. You can use java/scala api. [1] [ https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/side_output/ | https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/side_output/ ] [2] [ https://github.com/apache/flink/pull/19453 | https://github.com/apache/flink/pull/19453 ] [3] [ https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/ | https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/ ] Best regards, Yuxia 发件人: "Lakshya Garg" 收件人: "User" 发送时间: 星期一, 2022年 5 月 23日 下午 12:02:26 主题: OutputTag alternative with pyflink 1.15.0 Hi Everyone, I see that in pyflink 1.15.0 there isnt support for Output tag to redirect the messages to other output streams. Is this understanding right? If yes, What can be the alternative for this? any example or reference link would be helpful. Lakshya
Re: Incorrect checkpoint id used when job is recovering
There's a simliar issue FLINK-19816[1] [1] [ https://issues.apache.org/jira/browse/FLINK-19816 | https://issues.apache.org/jira/browse/FLINK-19816 ] Best regards, Yuxia 发件人: "tao xiao" 收件人: "User" 发送时间: 星期四, 2022年 5 月 19日 下午 9:16:34 主题: Re: Incorrect checkpoint id used when job is recovering Hi team, Can anyone shed some light? On Sat, May 14, 2022 at 8:56 AM tao xiao < [ mailto:xiaotao...@gmail.com | xiaotao...@gmail.com ] > wrote: Hi team, Does anyone have any ideas? On Thu, May 12, 2022 at 9:20 PM tao xiao < [ mailto:xiaotao...@gmail.com | xiaotao...@gmail.com ] > wrote: BQ_BEGIN Forgot to mention the Flink version is 1.13.2 and we use kubernetes native mode On Thu, May 12, 2022 at 9:18 PM tao xiao < [ mailto:xiaotao...@gmail.com | xiaotao...@gmail.com ] > wrote: BQ_BEGIN Hi team, I met a weird issue when a job tries to recover from JM failure. The success checkpoint before JM crashed is 41205 ``` {"log":"2022-05-10 14:55:40,663 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Completed checkpoint 41205 for job (9453840 bytes in 1922 ms).\n","stream":"stdout","time":"2022-05-10T14:55:40.663286893Z"} ``` However JM tries to recover the job with an old checkpoint 41051 which doesn't exist that leads to unrecoverable state ``` "2022-05-10 14:59:38,949 INFO org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - Trying to retrieve checkpoint 41051.\n" ``` Full log attached -- Regards, Tao -- Regards, Tao BQ_END -- Regards, Tao BQ_END -- Regards, Tao
Re: Incompatible data types while using firehose sink
Firehose implements sink2 which is introduced in Flink 1.15. But the method inputStream#sinkTo(xxx) only accepts sink1 in Flink 1.13. If you still want to use Firehose in Flink 1.13, I guess you may need to implement a SinkV2Adapter Or to t ranslates Sink V2 into Sink V1 like SinkV1Adapter in Flink 1.15 or rewrite some code of Firehose connector to migrate it to sink1. Best regards, Yuxia 发件人: "Zain Haider Nemati" 收件人: "Martijn Visser" 抄送: "yu'an huang" , "User" 发送时间: 星期四, 2022年 5 月 12日 下午 3:36:46 主题: Re: Incompatible data types while using firehose sink Hi, Appreciate your response. My flink version is 1.13. Is there any other way to sink data to kinesis without having to update to 1.15 On Thu, May 12, 2022 at 12:25 PM Martijn Visser < [ mailto:martijnvis...@apache.org | martijnvis...@apache.org ] > wrote: I'm guessing this must be Flink 1.15 since Firehose was added in that version :) On Thu, 12 May 2022 at 08:41, yu'an huang < [ mailto:h.yuan...@gmail.com | h.yuan...@gmail.com ] > wrote: BQ_BEGIN Hi, Your code is working fine in my computer. What is the Flink version you are using. BQ_BEGIN On 12 May 2022, at 3:39 AM, Zain Haider Nemati < [ mailto:zain.hai...@retailo.co | zain.hai...@retailo.co ] > wrote: Hi Folks, Getting this error when sinking data to a firehosesink, would really appreciate some help ! DataStream inputStream = env.addSource(new FlinkKafkaConsumer<>("xxx", new SimpleStringSchema(), properties)); Properties sinkProperties = new Properties(); sinkProperties.put(AWSConfigConstants.AWS_REGION, "xxx"); sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "xxx"); sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "xxx"); KinesisFirehoseSink kdfSink = KinesisFirehoseSink.builder() .setFirehoseClientProperties(sinkProperties) .setSerializationSchema(new SimpleStringSchema()) .setDeliveryStreamName("xxx") .setMaxBatchSize(350) .build(); inputStream.sinkTo(kdfSink); incompatible types: org.apache.flink.connector.firehose.sink.KinesisFirehoseSink cannot be converted to org.apache.flink.api.connector.sink.Sink BQ_END BQ_END
Re: http stream as input data source
The quick answer is no. There's no http data stream on hand. You can implement one by yourself. Here[1] is a guidance about how to implemet user-defined source & sink Btw, there's a jira for http sink[2] but is marked as won't fix. [1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sourcessinks/ [2]: https://issues.apache.org/jira/browse/FLINK-8047. Best regards, Yuxia 发件人: "Harald Busch" 收件人: "User" 发送时间: 星期四, 2022年 5 月 12日 上午 11:37:29 主题: http stream as input data source Hi, is there a http data stream as data source ? I only see socketTextStream and other predefined stream sources. It seems that I have to use fromCollection, fromElements ... and prepare the collection for myself. Thanks Regards
Re: unsubscribe
To unsubscribe, you can send email to user-unsubscr...@flink.apache.org with any object. Best regards, Yuxia 发件人: "Henry Cai" 收件人: "User" 发送时间: 星期四, 2022年 5 月 12日 上午 1:14:43 主题: unsubscribe unsubscribe
Re: How can I set job parameter in flink sql
Hi, AFAK, you can't get the parameter setted via Flink SQL client in udf. If you still want to get the parameters in your udf, you can use the following code to set the parameter: env = StreamExecutionEnvironment.getExecutionEnvironment parameter = new HashMap(); parameter .put(" black_list_path ", "") env.getConfig.setGlobalJobParameters(Configuration.fromMap(m)) Then, you can get the parameter using context.getJobParameter("black_list_path", "/config/list.properties"); in udf. Best regards, Yuxia 发件人: "wang" <24248...@163.com> 收件人: "User" , "user-zh" 发送时间: 星期三, 2022年 5 月 11日 下午 2:44:20 主题: How can I set job parameter in flink sql Hi dear engineer, I want to override the function open() in my UDF, like: public class BlackListConvertFunction extends ScalarFunction { @Override public void open(FunctionContext context) throws Exception { String path = context.getJobParameter("black_list_path", "/config/list.properties"); System.out.println(path); } public Double eval(String scores) { // some logics return 0.0; } } In open() function, I want to fetch the configred value "black_list_path", then simply print that value out. And I config this value in ./sql-client.sh console: SET black_list_path = /root/list.properties Then I run this UDF, but what printed is /config/list.properties (this is the default value as I set in context.getJobParameter("black_list_path", " /config/list/properties ")) , not /root/list.properties which I set in ./sql-client.sh console. So could you please show me the correct way to set black_list_path is sql ? Thanks so much! Thanks && Reards, Hunk
Re: Unable to start sql-client when putting flink-table-planner_2.12-1.15.0.jar to lib folder
Not exactly, the flink distribution just doesn't include the scala api by default. For using scala, you can pack in your job jar both flink-table-api-scala and flink-table-api-scala-bridge and keep the flink distribution structure, which mean flink-table-planner_2.12-1.15.0.jar is under opt folder and flink-table-planner-loader is under /lib. in such way, you can choose whaterever scala version you prefer. Also, you can swap flink-table-planner_2.12-1.15.0.jar and flink-table-planner-loader. But with such way, you are bind on Flink's 2.12. scala version, Best regards, Yuxia 发件人: "Jeff Zhang" 收件人: "yuxia" 抄送: "User" 发送时间: 星期日, 2022年 5 月 08日 下午 3:50:33 主题: Re: Unable to start sql-client when putting flink-table-planner_2.12-1.15.0.jar to lib folder Thanks Yuxia, that works. Does that mean for one flink distribution, I can either use java or use scala ? If so, it seems not user friendly. On Sun, May 8, 2022 at 10:40 AM yuxia < [ mailto:luoyu...@alumni.sjtu.edu.cn | luoyu...@alumni.sjtu.edu.cn ] > wrote: Hi, you can move the flink-table-planner-loader to the /opt. See more in [ https://issues.apache.org/jira/browse/FLINK-25128 | https://issues.apache.org/jira/browse/FLINK-25128 ] Best regards, Yuxia 发件人: "Jeff Zhang" < [ mailto:zjf...@gmail.com | zjf...@gmail.com ] > 收件人: "User" < [ mailto:user@flink.apache.org | user@flink.apache.org ] > 发送时间: 星期六, 2022年 5 月 07日 下午 10:05:55 主题: Unable to start sql-client when putting flink-table-planner_2.12-1.15.0.jar to lib folder Hi folks, It looks like flink 1.15 changes its binary distribution because of scala free. The flink-table-planner_2.12-1.15.0.jar is put under the opt folder. Now I would like to use it for my scala flink app, so I move it to the lib folder, but after that, I can not start sql-client. Is it expected ? Here's the error I see - Exception in thread "main" org.apache.flink.table.client.SqlClientException: Unexpected exception. This is a bug. Please consider filing an issue. at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:201) at org.apache.flink.table.client.SqlClient.main(SqlClient.java:161) Caused by: org.apache.flink.table.api.TableException: Could not instantiate the executor. Make sure a planner module is on the classpath at org.apache.flink.table.client.gateway.context.ExecutionContext.lookupExecutor(ExecutionContext.java:163) at org.apache.flink.table.client.gateway.context.ExecutionContext.createTableEnvironment(ExecutionContext.java:111) at org.apache.flink.table.client.gateway.context.ExecutionContext.(ExecutionContext.java:66) at org.apache.flink.table.client.gateway.context.SessionContext.create(SessionContext.java:247) at org.apache.flink.table.client.gateway.local.LocalContextUtils.buildSessionContext(LocalContextUtils.java:87) at org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:87) at org.apache.flink.table.client.SqlClient.start(SqlClient.java:88) at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:187) ... 1 more Caused by: org.apache.flink.table.api.ValidationException: Multiple factories for identifier 'default' that implement 'org.apache.flink.table.delegation.ExecutorFactory' found in the classpath. Ambiguous factory classes are: org.apache.flink.table.planner.delegation.DefaultExecutorFactory org.apache.flink.table.planner.loader.DelegateExecutorFactory at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:553) at org.apache.flink.table.client.gateway.context.ExecutionContext.lookupExecutor(ExecutionContext.java:154) ... 8 more -- Best Regards Jeff Zhang -- Best Regards Jeff Zhang
Re: Unable to start sql-client when putting flink-table-planner_2.12-1.15.0.jar to lib folder
Hi, you can move the flink-table-planner-loader to the /opt. See more in [ https://issues.apache.org/jira/browse/FLINK-25128 | https://issues.apache.org/jira/browse/FLINK-25128 ] Best regards, Yuxia 发件人: "Jeff Zhang" 收件人: "User" 发送时间: 星期六, 2022年 5 月 07日 下午 10:05:55 主题: Unable to start sql-client when putting flink-table-planner_2.12-1.15.0.jar to lib folder Hi folks, It looks like flink 1.15 changes its binary distribution because of scala free. The flink-table-planner_2.12-1.15.0.jar is put under the opt folder. Now I would like to use it for my scala flink app, so I move it to the lib folder, but after that, I can not start sql-client. Is it expected ? Here's the error I see - Exception in thread "main" org.apache.flink.table.client.SqlClientException: Unexpected exception. This is a bug. Please consider filing an issue. at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:201) at org.apache.flink.table.client.SqlClient.main(SqlClient.java:161) Caused by: org.apache.flink.table.api.TableException: Could not instantiate the executor. Make sure a planner module is on the classpath at org.apache.flink.table.client.gateway.context.ExecutionContext.lookupExecutor(ExecutionContext.java:163) at org.apache.flink.table.client.gateway.context.ExecutionContext.createTableEnvironment(ExecutionContext.java:111) at org.apache.flink.table.client.gateway.context.ExecutionContext.(ExecutionContext.java:66) at org.apache.flink.table.client.gateway.context.SessionContext.create(SessionContext.java:247) at org.apache.flink.table.client.gateway.local.LocalContextUtils.buildSessionContext(LocalContextUtils.java:87) at org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:87) at org.apache.flink.table.client.SqlClient.start(SqlClient.java:88) at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:187) ... 1 more Caused by: org.apache.flink.table.api.ValidationException: Multiple factories for identifier 'default' that implement 'org.apache.flink.table.delegation.ExecutorFactory' found in the classpath. Ambiguous factory classes are: org.apache.flink.table.planner.delegation.DefaultExecutorFactory org.apache.flink.table.planner.loader.DelegateExecutorFactory at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:553) at org.apache.flink.table.client.gateway.context.ExecutionContext.lookupExecutor(ExecutionContext.java:154) ... 8 more -- Best Regards Jeff Zhang
Re: How to return JSON Object from UDF
Does the DatatypeHint with bridgedTo can meet your requirements? For example: ' public @DataTypeHint( value = "RAW", bridgedTo = JSONObject .class, rawSerializer = JSONObject Serializer.class) JSONObject eval(String str) { return JSONObject .parse(str); } ' You may need to provide a class like JSONObject Serializer that extends TypeSerializerSingleton. Best regards, Yuxia 发件人: "Surendra Lalwani" 收件人: "User" 发送时间: 星期五, 2022年 5 月 06日 下午 4:40:19 主题: How to return JSON Object from UDF Hi Team, I am using Flink 1.13.6 and I have created a UDF and I want to return JSONObject from that UDF or basically an Object but it doesn't seems to work as there is no datatype hint compatible to Object. in earlier flink versions when DataTypeHint wasn't there, it used to work. Any help would be appreciated. Thanks and Regards , Surendra Lalwani IMPORTANT NOTICE: This e-mail, including any attachments, may contain confidential information and is intended only for the addressee(s) named above. If you are not the intended recipient(s), you should not disseminate, distribute, or copy this e-mail. Please notify the sender by reply e-mail immediately if you have received this e-mail in error and permanently delete all copies of the original message from your system. E-mail transmission cannot be guaranteed to be secure as it could be intercepted, corrupted, lost, destroyed, arrive late or incomplete, or contain viruses. Company accepts no liability for any damage or loss of confidential information caused by this email or due to any virus transmitted by this email or otherwise.