SQL null 过滤的问题
flink 版本:1.12 列:col varchar 使用where col is null时可以过滤出col为null的记录 使用where col is null or col = ''时就不可以 同时试了下另外一种写法 where (case when col is null then true else false end) 可以过滤出来 where (case when col is null then true when col = '' then true else false end) 过滤不出来 请问这个bug吗,还是语法有问题
Re: flink sql聚合后collect收集数据问题
Hi! 1 & 2. multiset 不能转成 array。可以考虑使用 listagg + split_index + limit 语句达成需要的效果。当然更方便的还是写一个 UDF。 3. window top-n 可以使用 1.13 新引入的 window tvf: https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/sql/queries/window-topn/ casel.chen 于2021年8月11日周三 下午5:06写道: > 源表三个字段 name, color, ts > 按时间窗口聚合后想根据name group by取colors数组 > > > create table source_table ( >name STRING, >color STRING, >ts TIMESTAMP, >WATERMARK ts for ts > ) > > > create table sink_table ( > name STRING, >colors ARRAY > ) > > > 1. 请问这个select语句要怎么写? > select name, collect(color) as colors from source_table group by > tumble(ts, interval '5' seconds) > 这里collect(color)返回的是multiset类型,怎样转成Array类型呢? > > > 2. 如果array元素很多,我只想取其中N个,该怎么写flink sql? > > 3, 若取出现次数最多的前N个,又该怎么写flink sql? > select name, collect(color) as colors from ( > select name, color from ( > select *, ROW_NUMBER() OVER (PARTITION BY name ORDER BY color_cnt desc) > AS row_num from ( > select name, color, count(*) as color_cnt group by name, color, tumble(ts, > interval '5' seconds) > ) > ) where row_num < 5 > ); > 是这样写么?
Flink HIve 文件,上游Kafak数据很大的时候,无法完成checkpoint
您好,Flink Hive 当上游的Kafka数据量特别大的时候,发现checkpoint一直无法完成,5分钟后报错了。请问这个问题要怎么解决
flink sql聚合后collect收集数据问题
源表三个字段 name, color, ts 按时间窗口聚合后想根据name group by取colors数组 create table source_table ( name STRING, color STRING, ts TIMESTAMP, WATERMARK ts for ts ) create table sink_table ( name STRING, colors ARRAY ) 1. 请问这个select语句要怎么写? select name, collect(color) as colors from source_table group by tumble(ts, interval '5' seconds) 这里collect(color)返回的是multiset类型,怎样转成Array类型呢? 2. 如果array元素很多,我只想取其中N个,该怎么写flink sql? 3, 若取出现次数最多的前N个,又该怎么写flink sql? select name, collect(color) as colors from ( select name, color from ( select *, ROW_NUMBER() OVER (PARTITION BY name ORDER BY color_cnt desc) AS row_num from ( select name, color, count(*) as color_cnt group by name, color, tumble(ts, interval '5' seconds) ) ) where row_num < 5 ); 是这样写么?
有没有flink on k8s operator相关的中文使用资料
最近想了解怎么使用flink on k8s operator,查了下业内有lyft和google的,有没有这方面的中文使用资料? 另外想知道ververica platform是不是也是基于flink on k8s operator开发的?社区版是否开源以支持二次开发?
Re: Flink HIve 文件压缩报错
目前没有办法让作业继续跑,只能重跑了 这里有几个不同的问题: 1. 文件是如何被删掉的,两种可能:被Flink误删(需要在Flink这边修复)、或者被Flink之外的进程删掉 2. 文件丢失意味着exactly once语义无法保证了,这种情况是否可以接受 3. 如果可以接受丢失数据,如何让Flink作业可以继续跑(需要Flink的改进) On Wed, Aug 11, 2021 at 7:52 PM 周瑞 wrote: > 您好: > 这个文件确实不存在了,这种情况目前怎样设置可以让作业继续跑 > > > --Original-- > From: "Rui Li" Date: Wed, Aug 11, 2021 07:49 PM > To: "user-zh" > Subject: Re: Flink HIve 文件压缩报错 > > > > 这个文件是确实不在了么?是不是被别的进程删掉了呢,可以通过hdfs的audit log来判断一下。 > > 目前flink这边写文件的exactly > once语义是依赖HDFS的一致性保证的,如果之前写到HDFS的数据丢掉了就会破坏这个语义了(不过我们可以考虑在这种情况下让作业能继续跑)。 > > On Tue, Aug 10, 2021 at 7:45 PM 周瑞 > 您好:Flink > > 写入Hive的时候,在压缩文件的时候有个待压缩的文件丢失了,导致Flink程序一直在不断重启,请问文件丢失是什么原因导致的,这种情况怎么能够让Flink程序正常启动 > 2021-08-10 19:34:19 java.io.UncheckedIOException: > java.io.FileNotFoundException: File does not exist: > > hdfs://mycluster/user/hive/warehouse/test.db/offer_69/pt_dt=2021-8-10-72/.uncompacted-part-b2108114-b92b-4c37-b204-45f0150236f4-0-3 > at > > org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.lambda$coordinate$1(CompactCoordinator.java:163) > at > > org.apache.flink.table.runtime.util.BinPacking.pack(BinPacking.java:38) > at > > org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.lambda$coordinate$2(CompactCoordinator.java:173) > at > java.util.HashMap.forEach(HashMap.java:1288) > at > > org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.coordinate(CompactCoordinator.java:169) > at > > org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.commitUpToCheckpoint(CompactCoordinator.java:151) > at > > org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.processElement(CompactCoordinator.java:141) > at > > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205) > at org.apache.flink.streaming.runtime.io > .AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) > at > org.apache.flink.streaming.runtime.io > .AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) > at org.apache.flink.streaming. > runtime.io > .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66) > at > > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423) > at > > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204) > at > > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681) > at > > org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636) > at > > org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647) > at > > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620) > at > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) > at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) > at > java.lang.Thread.run(Thread.java:748) Caused by: > java.io.FileNotFoundException: File does not exist: > > hdfs://mycluster/user/hive/warehouse/test.db/offer_69/pt_dt=2021-8-10-72/.uncompacted-part-b2108114-b92b-4c37-b204-45f0150236f4-0-3 > at > > org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1583) > at > > org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1576) > at > > org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) > at > > org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1591) > at > > org.apache.flink.hive.shaded.fs.hdfs.HadoopFileSystem.getFileStatus(HadoopFileSystem.java:85) > at > > org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.lambda$coordinate$1(CompactCoordinator.java:161) > ... 19 more > > > > -- > Best regards! > Rui Li -- Best regards! Rui Li
Re: Flink HIve 文件压缩报错
您好: 这个文件确实不存在了,这种情况目前怎样设置可以让作业继续跑 --Original-- From: "Rui Li"
Re: Flink HIve 文件压缩报错
这个文件是确实不在了么?是不是被别的进程删掉了呢,可以通过hdfs的audit log来判断一下。 目前flink这边写文件的exactly once语义是依赖HDFS的一致性保证的,如果之前写到HDFS的数据丢掉了就会破坏这个语义了(不过我们可以考虑在这种情况下让作业能继续跑)。 On Tue, Aug 10, 2021 at 7:45 PM 周瑞 wrote: > 您好:Flink > 写入Hive的时候,在压缩文件的时候有个待压缩的文件丢失了,导致Flink程序一直在不断重启,请问文件丢失是什么原因导致的,这种情况怎么能够让Flink程序正常启动 > 2021-08-10 19:34:19 java.io.UncheckedIOException: > java.io.FileNotFoundException: File does not exist: > hdfs://mycluster/user/hive/warehouse/test.db/offer_69/pt_dt=2021-8-10-72/.uncompacted-part-b2108114-b92b-4c37-b204-45f0150236f4-0-3 > at > org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.lambda$coordinate$1(CompactCoordinator.java:163) >at > org.apache.flink.table.runtime.util.BinPacking.pack(BinPacking.java:38) > at > org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.lambda$coordinate$2(CompactCoordinator.java:173) >at java.util.HashMap.forEach(HashMap.java:1288) at > org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.coordinate(CompactCoordinator.java:169) > at > org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.commitUpToCheckpoint(CompactCoordinator.java:151) > at > org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.processElement(CompactCoordinator.java:141) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205) > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) >at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) at > java.lang.Thread.run(Thread.java:748) Caused by: > java.io.FileNotFoundException: File does not exist: > hdfs://mycluster/user/hive/warehouse/test.db/offer_69/pt_dt=2021-8-10-72/.uncompacted-part-b2108114-b92b-4c37-b204-45f0150236f4-0-3 >at > org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1583) > at > org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1576) > at > org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) > at > org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1591) > at > org.apache.flink.hive.shaded.fs.hdfs.HadoopFileSystem.getFileStatus(HadoopFileSystem.java:85) > at > org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.lambda$coordinate$1(CompactCoordinator.java:161) >... 19 more -- Best regards! Rui Li
Re:Re: Flink SQL向下兼容吗?
如果只是数据同步作业,例如从kafka消费将数据存入下游db,这种弱“状态”作业能跨版本兼容么? 在 2021-08-11 16:54:56,"Leonard Xu" 写道: >这里的SQL是指DDL还是DML,通常 DML都是兼容的,且一般不会有不兼容的升级, >DDL 语法 各家 SQL 方言都有自己的语法,这个比较灵活,FLINK SQL 的DDL 各个版本稍有不同,但 Flink SQL 新版本都是兼容老的 >DDL的, >只是新版本上的DDL语法如果提供了更丰富的功能,那么老版本的DDL则不能提供 。 > >所以我理解你关心的兼容性问题是不存在的,但请注意如果你的SQL作业是有状态的,需要带状态升级,这些状态都是跨版本不兼容的。 > >祝好, >Leonard > >> 在 2021年8月10日,11:44,Jason Lee 写道: >> >> 各位大佬好, >> >> >> 请教一个问题,我们最近打算升级Flink 版本,想问一下升级之后的已有任务的SQL会兼容到新版本吗? >> 比如我升级到1.13,那我1.10的SQL语法能被兼容吗? >> >> >> 感恩 >> >> >> | | >> Chuang Li >> | >> | >> jasonlee1...@163.com >> | >> 签名由网易邮箱大师定制 >> >
flink sql聚合后collect收集数据问题
源表三个字段 name, color, ts 按时间窗口聚合后想根据name group by取colors数组 create table source_table ( name STRING, color STRING, ts TIMESTAMP, WATERMARK ts for ts ) create table sink_table ( name STRING, colors ARRAY ) 1. 请问这个select语句要怎么写? select name, collect(color) as colors from source_table group by tumble(ts, interval '5' seconds) 这里collect(color)返回的是multiset类型,怎样转成Array类型呢? 2. 如果array元素很多,我只想取其中N个,该怎么写flink sql? 3, 若取出现次数最多的前N个,又该怎么写flink sql? select name, collect(color) as colors from ( select name, color from ( select *, ROW_NUMBER() OVER (PARTITION BY name ORDER BY color_cnt desc) AS row_num from ( select name, color, count(*) as color_cnt group by name, color, tumble(ts, interval '5' seconds) ) ) where row_num < 5 ); 是这样写么?
Re: Flink SQL向下兼容吗?
这里的SQL是指DDL还是DML,通常 DML都是兼容的,且一般不会有不兼容的升级, DDL 语法 各家 SQL 方言都有自己的语法,这个比较灵活,FLINK SQL 的DDL 各个版本稍有不同,但 Flink SQL 新版本都是兼容老的 DDL的, 只是新版本上的DDL语法如果提供了更丰富的功能,那么老版本的DDL则不能提供 。 所以我理解你关心的兼容性问题是不存在的,但请注意如果你的SQL作业是有状态的,需要带状态升级,这些状态都是跨版本不兼容的。 祝好, Leonard > 在 2021年8月10日,11:44,Jason Lee 写道: > > 各位大佬好, > > > 请教一个问题,我们最近打算升级Flink 版本,想问一下升级之后的已有任务的SQL会兼容到新版本吗? > 比如我升级到1.13,那我1.10的SQL语法能被兼容吗? > > > 感恩 > > > | | > Chuang Li > | > | > jasonlee1...@163.com > | > 签名由网易邮箱大师定制 >
Re: 退订
如果需要取消订阅 user-zh@flink.apache.org 邮件组,请发送任意内容的邮件到 user-zh-unsubscr...@flink.apache.org Best, Leonard > 在 2021年8月6日,10:49,汪嘉富 写道: > > 退订 >
Re: 退订
如果需要取消订阅 user-zh@flink.apache.org 邮件组,请发送任意内容的邮件到 user-zh-unsubscr...@flink.apache.org Best, Leonard > 在 2021年8月11日,08:16,Lee2097 写道: > > 退订
????
Flink HIve 数据写入后查询无数据
您好:Flink 写入Hive的时候数据已经写进去了,但是中间发生了异常导致这里的文件没有compact,Hive数据表查不出数据,后续的Flink程序启动后由于数据不再写入这个分区。导致该分区的数据一直无法compact.请问这种异常需要怎么解决,有手动修复的方法么?
Re:Re:Re: cumulate 不能和比较函数连用
抱歉,sql语句是我手打的,没注意到,我确定和这个descriptor没关系。我去掉where条件就能正常运行,同时,我测试in,not in 函数的时候,会报同样的错误。 At 2021-08-11 13:51:16, "李航飞" wrote: >org.apache.flink.client.deployment.application.DetachedApplicationRunner [] - >Could not execute application: >org.apache.flink.client.program.ProgramInvocationException: The main >method caused an error: Currently Flink doesn't support individual window >table-valued function CUMULATE(time_col=[ts], max_size=[10 min], step=[1 >min]). Please use window table-valued function with aggregate together >using window_start and window_end as group keys.at >org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372) > ~[flink-clients_2.12-1.13.1.jar:1.13.1]at >org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) > ~[flink-clients_2.12-1.13.1.jar:1.13.1] at >org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) >~[flink-clients_2.12-1.13.1.jar:1.13.1]at >org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:84) > ~ >在 2021-08-11 12:44:38,"Caizhi Weng" 写道: >>Hi! >> >>descriptor 拼错了吧。我在本地没有复现这个问题,Flink 版本是多少呢? >> >>李航飞 于2021年8月11日周三 上午11:41写道: >> >>> sql语句如下: >>> select count(clicknum) as num >>> >>> from table( >>> >>> cumulate(table testTable, desctiptor(crtTime),interval '1'minutes, >>> interval '10' minutes)) >>> >>> where clicknum <>'-99' >>> >>> group by window_start,window_end >>> >>> >>> 报错 信息: >>> Flink doesn't support individual window table-valued function >>> cumulate(time_col=[app_date],max_size=[10 min],step=[1 min]... >>> >>> >>> 请问如何解决,谢谢