SQL null 过滤的问题

2021-08-11 文章 silence
flink 版本:1.12
列:col varchar
使用where col is null时可以过滤出col为null的记录
使用where col is null or col = ''时就不可以
同时试了下另外一种写法
where (case when col is null then true else false end) 可以过滤出来
where (case when col is null then true when col = '' then true else false end) 
过滤不出来

请问这个bug吗,还是语法有问题



Re: flink sql聚合后collect收集数据问题

2021-08-11 文章 Caizhi Weng
Hi!

1 & 2. multiset 不能转成 array。可以考虑使用 listagg + split_index + limit
语句达成需要的效果。当然更方便的还是写一个 UDF。

3. window top-n 可以使用 1.13 新引入的 window tvf:
https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/sql/queries/window-topn/

casel.chen  于2021年8月11日周三 下午5:06写道:

> 源表三个字段  name, color, ts
> 按时间窗口聚合后想根据name group by取colors数组
>
>
> create table source_table (
>name  STRING,
>color  STRING,
>ts TIMESTAMP,
>WATERMARK ts for ts
> )
>
>
> create table sink_table (
>   name  STRING,
>colors  ARRAY
> )
>
>
> 1. 请问这个select语句要怎么写?
> select name, collect(color) as colors from source_table group by
> tumble(ts, interval '5' seconds)
> 这里collect(color)返回的是multiset类型,怎样转成Array类型呢?
>
>
> 2. 如果array元素很多,我只想取其中N个,该怎么写flink sql?
>
> 3, 若取出现次数最多的前N个,又该怎么写flink sql?
> select name, collect(color) as colors from (
> select name, color from (
>   select *, ROW_NUMBER() OVER (PARTITION BY name ORDER BY color_cnt desc)
> AS row_num from (
> select name, color, count(*) as color_cnt group by name, color, tumble(ts,
> interval '5' seconds)
>   )
> ) where row_num < 5
> );
> 是这样写么?


Flink HIve 文件,上游Kafak数据很大的时候,无法完成checkpoint

2021-08-11 文章 周瑞
您好,Flink Hive 当上游的Kafka数据量特别大的时候,发现checkpoint一直无法完成,5分钟后报错了。请问这个问题要怎么解决

flink sql聚合后collect收集数据问题

2021-08-11 文章 casel.chen
源表三个字段  name, color, ts
按时间窗口聚合后想根据name group by取colors数组


create table source_table (
   name  STRING,
   color  STRING,
   ts TIMESTAMP,
   WATERMARK ts for ts
) 


create table sink_table (
  name  STRING,
   colors  ARRAY
)


1. 请问这个select语句要怎么写?
select name, collect(color) as colors from source_table group by tumble(ts, 
interval '5' seconds)
这里collect(color)返回的是multiset类型,怎样转成Array类型呢?


2. 如果array元素很多,我只想取其中N个,该怎么写flink sql? 

3, 若取出现次数最多的前N个,又该怎么写flink sql?
select name, collect(color) as colors from (
select name, color from (
  select *, ROW_NUMBER() OVER (PARTITION BY name ORDER BY color_cnt desc) AS 
row_num from (
select name, color, count(*) as color_cnt group by name, color, tumble(ts, 
interval '5' seconds)
  ) 
) where row_num < 5
);
是这样写么?

有没有flink on k8s operator相关的中文使用资料

2021-08-11 文章 casel.chen
最近想了解怎么使用flink on k8s operator,查了下业内有lyft和google的,有没有这方面的中文使用资料?
另外想知道ververica platform是不是也是基于flink on k8s operator开发的?社区版是否开源以支持二次开发?

Re: Flink HIve 文件压缩报错

2021-08-11 文章 Rui Li
目前没有办法让作业继续跑,只能重跑了

这里有几个不同的问题:
1. 文件是如何被删掉的,两种可能:被Flink误删(需要在Flink这边修复)、或者被Flink之外的进程删掉
2. 文件丢失意味着exactly once语义无法保证了,这种情况是否可以接受
3. 如果可以接受丢失数据,如何让Flink作业可以继续跑(需要Flink的改进)

On Wed, Aug 11, 2021 at 7:52 PM 周瑞  wrote:

> 您好:
>  这个文件确实不存在了,这种情况目前怎样设置可以让作业继续跑
> 
> 
> --Original--
> From: "Rui Li" Date: Wed, Aug 11, 2021 07:49 PM
> To: "user-zh"
> Subject: Re: Flink HIve 文件压缩报错
>
> 
>
> 这个文件是确实不在了么?是不是被别的进程删掉了呢,可以通过hdfs的audit log来判断一下。
>
> 目前flink这边写文件的exactly
> once语义是依赖HDFS的一致性保证的,如果之前写到HDFS的数据丢掉了就会破坏这个语义了(不过我们可以考虑在这种情况下让作业能继续跑)。
>
> On Tue, Aug 10, 2021 at 7:45 PM 周瑞 
>  您好:Flink
> 
> 写入Hive的时候,在压缩文件的时候有个待压缩的文件丢失了,导致Flink程序一直在不断重启,请问文件丢失是什么原因导致的,这种情况怎么能够让Flink程序正常启动
>  2021-08-10 19:34:19 java.io.UncheckedIOException:
>  java.io.FileNotFoundException: File does not exist:
> 
> hdfs://mycluster/user/hive/warehouse/test.db/offer_69/pt_dt=2021-8-10-72/.uncompacted-part-b2108114-b92b-4c37-b204-45f0150236f4-0-3
>  at
> 
> org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.lambda$coordinate$1(CompactCoordinator.java:163)
>  at
> 
> org.apache.flink.table.runtime.util.BinPacking.pack(BinPacking.java:38)
>  at
> 
> org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.lambda$coordinate$2(CompactCoordinator.java:173)
>  at
> java.util.HashMap.forEach(HashMap.java:1288)
> at
> 
> org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.coordinate(CompactCoordinator.java:169)
>  at
> 
> org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.commitUpToCheckpoint(CompactCoordinator.java:151)
>  at
> 
> org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.processElement(CompactCoordinator.java:141)
>  at
> 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205)
>  at org.apache.flink.streaming.runtime.io
> .AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
>  at
> org.apache.flink.streaming.runtime.io
> .AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
>  at org.apache.flink.streaming.
> runtime.io
> .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
>  at
> 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
>  at
> 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
>  at
> 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
>  at
> 
> org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
>  at
> 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
>  at
> 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
>  at
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
>  at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
> at
>  java.lang.Thread.run(Thread.java:748) Caused by:
>  java.io.FileNotFoundException: File does not exist:
> 
> hdfs://mycluster/user/hive/warehouse/test.db/offer_69/pt_dt=2021-8-10-72/.uncompacted-part-b2108114-b92b-4c37-b204-45f0150236f4-0-3
>  at
> 
> org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1583)
>  at
> 
> org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1576)
>  at
> 
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
>  at
> 
> org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1591)
>  at
> 
> org.apache.flink.hive.shaded.fs.hdfs.HadoopFileSystem.getFileStatus(HadoopFileSystem.java:85)
>  at
> 
> org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.lambda$coordinate$1(CompactCoordinator.java:161)
>  ... 19 more
>
>
>
> --
> Best regards!
> Rui Li



-- 
Best regards!
Rui Li


Re: Flink HIve 文件压缩报错

2021-08-11 文章 周瑞
您好:
 这个文件确实不存在了,这种情况目前怎样设置可以让作业继续跑


--Original--
From: "Rui Li"

Re: Flink HIve 文件压缩报错

2021-08-11 文章 Rui Li
这个文件是确实不在了么?是不是被别的进程删掉了呢,可以通过hdfs的audit log来判断一下。

目前flink这边写文件的exactly
once语义是依赖HDFS的一致性保证的,如果之前写到HDFS的数据丢掉了就会破坏这个语义了(不过我们可以考虑在这种情况下让作业能继续跑)。

On Tue, Aug 10, 2021 at 7:45 PM 周瑞  wrote:

> 您好:Flink
> 写入Hive的时候,在压缩文件的时候有个待压缩的文件丢失了,导致Flink程序一直在不断重启,请问文件丢失是什么原因导致的,这种情况怎么能够让Flink程序正常启动
> 2021-08-10 19:34:19 java.io.UncheckedIOException:
> java.io.FileNotFoundException: File does not exist:
> hdfs://mycluster/user/hive/warehouse/test.db/offer_69/pt_dt=2021-8-10-72/.uncompacted-part-b2108114-b92b-4c37-b204-45f0150236f4-0-3
>  at
> org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.lambda$coordinate$1(CompactCoordinator.java:163)
>at
> org.apache.flink.table.runtime.util.BinPacking.pack(BinPacking.java:38)
>   at
> org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.lambda$coordinate$2(CompactCoordinator.java:173)
>at java.util.HashMap.forEach(HashMap.java:1288) at
> org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.coordinate(CompactCoordinator.java:169)
> at
> org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.commitUpToCheckpoint(CompactCoordinator.java:151)
>   at
> org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.processElement(CompactCoordinator.java:141)
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205)
> at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
>at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
>  at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
>   at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
>  at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
>   at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) at
> java.lang.Thread.run(Thread.java:748) Caused by:
> java.io.FileNotFoundException: File does not exist:
> hdfs://mycluster/user/hive/warehouse/test.db/offer_69/pt_dt=2021-8-10-72/.uncompacted-part-b2108114-b92b-4c37-b204-45f0150236f4-0-3
>at
> org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1583)
> at
> org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1576)
> at
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
> at
> org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1591)
> at
> org.apache.flink.hive.shaded.fs.hdfs.HadoopFileSystem.getFileStatus(HadoopFileSystem.java:85)
>   at
> org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.lambda$coordinate$1(CompactCoordinator.java:161)
>... 19 more



-- 
Best regards!
Rui Li


Re:Re: Flink SQL向下兼容吗?

2021-08-11 文章 casel.chen
如果只是数据同步作业,例如从kafka消费将数据存入下游db,这种弱“状态”作业能跨版本兼容么?

















在 2021-08-11 16:54:56,"Leonard Xu"  写道:
>这里的SQL是指DDL还是DML,通常 DML都是兼容的,且一般不会有不兼容的升级,
>DDL 语法 各家 SQL 方言都有自己的语法,这个比较灵活,FLINK SQL 的DDL 各个版本稍有不同,但 Flink SQL 新版本都是兼容老的 
>DDL的,
>只是新版本上的DDL语法如果提供了更丰富的功能,那么老版本的DDL则不能提供 。
>
>所以我理解你关心的兼容性问题是不存在的,但请注意如果你的SQL作业是有状态的,需要带状态升级,这些状态都是跨版本不兼容的。
>
>祝好,
>Leonard
>
>> 在 2021年8月10日,11:44,Jason Lee  写道:
>> 
>> 各位大佬好,
>> 
>> 
>> 请教一个问题,我们最近打算升级Flink 版本,想问一下升级之后的已有任务的SQL会兼容到新版本吗?
>> 比如我升级到1.13,那我1.10的SQL语法能被兼容吗?
>> 
>> 
>> 感恩
>> 
>> 
>> | |
>> Chuang Li
>> |
>> |
>> jasonlee1...@163.com
>> |
>> 签名由网易邮箱大师定制
>> 
>


flink sql聚合后collect收集数据问题

2021-08-11 文章 casel.chen
源表三个字段  name, color, ts
按时间窗口聚合后想根据name group by取colors数组


create table source_table (
   name  STRING,
   color  STRING,
   ts TIMESTAMP,
   WATERMARK ts for ts
) 


create table sink_table (
  name  STRING,
   colors  ARRAY
)


1. 请问这个select语句要怎么写?
select name, collect(color) as colors from source_table group by tumble(ts, 
interval '5' seconds)
这里collect(color)返回的是multiset类型,怎样转成Array类型呢?


2. 如果array元素很多,我只想取其中N个,该怎么写flink sql? 

3, 若取出现次数最多的前N个,又该怎么写flink sql?
select name, collect(color) as colors from (
select name, color from (
  select *, ROW_NUMBER() OVER (PARTITION BY name ORDER BY color_cnt desc) AS 
row_num from (
select name, color, count(*) as color_cnt group by name, color, tumble(ts, 
interval '5' seconds)
  ) 
) where row_num < 5
);
是这样写么?

Re: Flink SQL向下兼容吗?

2021-08-11 文章 Leonard Xu
这里的SQL是指DDL还是DML,通常 DML都是兼容的,且一般不会有不兼容的升级,
DDL 语法 各家 SQL 方言都有自己的语法,这个比较灵活,FLINK SQL 的DDL 各个版本稍有不同,但 Flink SQL 新版本都是兼容老的 
DDL的,
只是新版本上的DDL语法如果提供了更丰富的功能,那么老版本的DDL则不能提供 。

所以我理解你关心的兼容性问题是不存在的,但请注意如果你的SQL作业是有状态的,需要带状态升级,这些状态都是跨版本不兼容的。

祝好,
Leonard

> 在 2021年8月10日,11:44,Jason Lee  写道:
> 
> 各位大佬好,
> 
> 
> 请教一个问题,我们最近打算升级Flink 版本,想问一下升级之后的已有任务的SQL会兼容到新版本吗?
> 比如我升级到1.13,那我1.10的SQL语法能被兼容吗?
> 
> 
> 感恩
> 
> 
> | |
> Chuang Li
> |
> |
> jasonlee1...@163.com
> |
> 签名由网易邮箱大师定制
> 



Re: 退订

2021-08-11 文章 Leonard Xu
如果需要取消订阅 user-zh@flink.apache.org 邮件组,请发送任意内容的邮件到 
user-zh-unsubscr...@flink.apache.org 
Best,
Leonard

> 在 2021年8月6日,10:49,汪嘉富  写道:
> 
> 退订
> 



Re: 退订

2021-08-11 文章 Leonard Xu
如果需要取消订阅 user-zh@flink.apache.org 邮件组,请发送任意内容的邮件到 
user-zh-unsubscr...@flink.apache.org 

Best,
Leonard

> 在 2021年8月11日,08:16,Lee2097  写道:
> 
> 退订



????

2021-08-11 文章 Lee2097


Flink HIve 数据写入后查询无数据

2021-08-11 文章 周瑞
您好:Flink 
写入Hive的时候数据已经写进去了,但是中间发生了异常导致这里的文件没有compact,Hive数据表查不出数据,后续的Flink程序启动后由于数据不再写入这个分区。导致该分区的数据一直无法compact.请问这种异常需要怎么解决,有手动修复的方法么?

Re:Re:Re: cumulate 不能和比较函数连用

2021-08-11 文章 李航飞
抱歉,sql语句是我手打的,没注意到,我确定和这个descriptor没关系。我去掉where条件就能正常运行,同时,我测试in,not in 
函数的时候,会报同样的错误。
At 2021-08-11 13:51:16, "李航飞"  wrote:
>org.apache.flink.client.deployment.application.DetachedApplicationRunner [] - 
>Could not execute application: 
>org.apache.flink.client.program.ProgramInvocationException: The main 
>method caused an error: Currently Flink doesn't support individual window 
>table-valued function CUMULATE(time_col=[ts], max_size=[10 min], step=[1 
>min]). Please use window table-valued function with aggregate together 
>using window_start and window_end as group keys.at 
>org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
> ~[flink-clients_2.12-1.13.1.jar:1.13.1]at 
>org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
> ~[flink-clients_2.12-1.13.1.jar:1.13.1] at 
>org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) 
>~[flink-clients_2.12-1.13.1.jar:1.13.1]at 
>org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:84)
> ~
>在 2021-08-11 12:44:38,"Caizhi Weng"  写道:
>>Hi!
>>
>>descriptor 拼错了吧。我在本地没有复现这个问题,Flink 版本是多少呢?
>>
>>李航飞  于2021年8月11日周三 上午11:41写道:
>>
>>> sql语句如下:
>>> select count(clicknum) as num
>>>
>>> from table(
>>>
>>> cumulate(table testTable, desctiptor(crtTime),interval '1'minutes,
>>> interval '10' minutes))
>>>
>>> where clicknum <>'-99'
>>>
>>> group by window_start,window_end
>>>
>>>
>>> 报错 信息:
>>> Flink doesn't support individual window table-valued function
>>> cumulate(time_col=[app_date],max_size=[10 min],step=[1 min]...
>>>
>>>
>>> 请问如何解决,谢谢