SQL null 过滤的问题

2021-08-11 Thread silence
flink 版本:1.12
列:col varchar
使用where col is null时可以过滤出col为null的记录
使用where col is null or col = ''时就不可以
同时试了下另外一种写法
where (case when col is null then true else false end) 可以过滤出来
where (case when col is null then true when col = '' then true else false end) 
过滤不出来

请问这个bug吗,还是语法有问题



回复:回复:场景题:Flink SQL 不支持 INSERT INTO… ON DUPLICATE KEY UPDATE ?

2021-08-02 Thread silence
用的什么版本这个应该已经在1.13中修复了https://issues.apache.org/jira/browse/FLINK-18726
不行的话可以在ddl中限制列的数量


--
发件人:Ye Chen 
发送时间:2021年8月2日(星期一) 11:37
收件人:user-zh ; silence 
主 题:Re:回复:场景题:Flink SQL 不支持 INSERT INTO… ON DUPLICATE KEY UPDATE ?

你好,我试了一下,如果表的ddl是三个字段,但是insert只指定两个字段的话,会报错:
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Column types of query result 
and sink for registered table 'default_catalog.default_database.t' do not match.
Cause: Different number of columns.
我们的需求是想根据主键更新部分字段
-
需求:现有table 
CREATE TABLE t (
 abigint,
 bbigint,
 cbigint,
  PRIMARY KEY (a) NOT ENFORCED
) WITH (
...
);
我们的场景只想根据主键a更新部分字段b,其余的字段c保持不变,
例如mysql 支持   insert into t(a,b,c) select '1','2','3' on duplicate key update 
b='4';主键重复的时候只更新字段b,字段c的值不变。
我在官方文档中没找到这个用法,sql-client也测试了一下也不支持 on duplicate key update,会报错。
请问这种根据主键更新部分字段的场景 使用flink sql应该怎么处理?


在 2021-08-02 10:47:55,"silence"  写道:
>如果只想更新部分字段的话可以试下
>insert into t(a,b) select a,b from x
>
>
>--
>发件人:Ye Chen 
>发送时间:2021年7月30日(星期五) 17:57
>收件人:user-zh 
>主 题:场景题:Flink SQL 不支持 INSERT INTO… ON DUPLICATE KEY UPDATE ?
>
>现有table 
>CREATE TABLE t (
> abigint,
> bbigint,
> cbigint,
>  PRIMARY KEY (a) NOT ENFORCED
>) WITH (
>...
>);
>
>
>我们的场景只想根据主键a更新部分字段b,其余的字段保持不变,例如
>mysql 支持   insert into t(a,b,c) select '1','2','3' on duplicate key update 
>b='4';
>主键重复的时候只更新字段b,字段c的值不变
>
>
>我在官方文档中没找到这个用法,sql-client也测试了一下也不支持 on duplicate key update,会报错。
>请问这种部分字段更新的场景 使用flink sql应该怎么处理?
>
>





回复:场景题:Flink SQL 不支持 INSERT INTO… ON DUPLICATE KEY UPDATE ?

2021-08-01 Thread silence
如果只想更新部分字段的话可以试下
insert into t(a,b) select a,b from x


--
发件人:Ye Chen 
发送时间:2021年7月30日(星期五) 17:57
收件人:user-zh 
主 题:场景题:Flink SQL 不支持 INSERT INTO… ON DUPLICATE KEY UPDATE ?

现有table 
CREATE TABLE t (
 abigint,
 bbigint,
 cbigint,
  PRIMARY KEY (a) NOT ENFORCED
) WITH (
...
);


我们的场景只想根据主键a更新部分字段b,其余的字段保持不变,例如
mysql 支持   insert into t(a,b,c) select '1','2','3' on duplicate key update 
b='4';
主键重复的时候只更新字段b,字段c的值不变


我在官方文档中没找到这个用法,sql-client也测试了一下也不支持 on duplicate key update,会报错。
请问这种部分字段更新的场景 使用flink sql应该怎么处理?




回复:场景题:Flink SQL 不支持 INSERT INTO… ON DUPLICATE KEY UPDATE ?

2021-08-01 Thread silence
你在你的sink ddl定义了主键会自动的按主键进行upsert的
参考https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/jdbc/#idempotent-writes


--
发件人:Ye Chen 
发送时间:2021年7月30日(星期五) 17:57
收件人:user-zh 
主 题:场景题:Flink SQL 不支持 INSERT INTO… ON DUPLICATE KEY UPDATE ?

现有table 
CREATE TABLE t (
 abigint,
 bbigint,
 cbigint,
  PRIMARY KEY (a) NOT ENFORCED
) WITH (
...
);


我们的场景只想根据主键a更新部分字段b,其余的字段保持不变,例如
mysql 支持   insert into t(a,b,c) select '1','2','3' on duplicate key update 
b='4';
主键重复的时候只更新字段b,字段c的值不变


我在官方文档中没找到这个用法,sql-client也测试了一下也不支持 on duplicate key update,会报错。
请问这种部分字段更新的场景 使用flink sql应该怎么处理?




回复:回复:flink sql 依赖隔离

2021-07-25 Thread silence
就是单独引用的啊,但任务逻辑比较复杂时会同时混用多个udf这个是没法避免的啊


--
发件人:Michael Ran 
发送时间:2021年7月23日(星期五) 17:42
收件人:user-zh ; silence 
主 题:Re:回复:flink sql 依赖隔离

建议上传的时候单独放,提交任务的时候  拉下来单独引用
在 2021-07-23 11:01:59,"silence"  写道:
>
>这边目前主要还是yarn,目前痛点是同一个sql任务会用到多个udf,udf通过-C http://xxx.jar进行加载
>udf和sql jar之间、udf和udf之间都可能会有依赖冲突,
>目前初步想法是每个udf绑定一个jarpath,使用独立的classloader进行加载,避免和主jar以及其他udf之间的依赖冲突
>--
>发件人:Michael Ran 
>发送时间:2021年7月22日(星期四) 20:07
>收件人:user-zh ; silence 
>主 题:Re:flink sql 依赖隔离
>
>通过任务进行隔离引用呗。你们美团已经是k8s了吧?
>在 2021-07-05 14:06:53,"silence"  写道:
>>请教大家目前flink sql有没有办法做到依赖隔离
>>比如connector,format,udf(这个最重要)等,
>>很多情况下不同用户定义的组件会经常混用,会有很多的依赖冲突问题难以解决。
>>目前有没有办法做到udf的依赖隔离(比如不同的udf使用独立的jar和classloader),或者社区对此有没有什么规划
>



回复:flink sql 依赖隔离

2021-07-22 Thread silence

这边目前主要还是yarn,目前痛点是同一个sql任务会用到多个udf,udf通过-C http://xxx.jar进行加载
udf和sql jar之间、udf和udf之间都可能会有依赖冲突,
目前初步想法是每个udf绑定一个jarpath,使用独立的classloader进行加载,避免和主jar以及其他udf之间的依赖冲突
--
发件人:Michael Ran 
发送时间:2021年7月22日(星期四) 20:07
收件人:user-zh ; silence 
主 题:Re:flink sql 依赖隔离

通过任务进行隔离引用呗。你们美团已经是k8s了吧?
在 2021-07-05 14:06:53,"silence"  写道:
>请教大家目前flink sql有没有办法做到依赖隔离
>比如connector,format,udf(这个最重要)等,
>很多情况下不同用户定义的组件会经常混用,会有很多的依赖冲突问题难以解决。
>目前有没有办法做到udf的依赖隔离(比如不同的udf使用独立的jar和classloader),或者社区对此有没有什么规划



回复:flinksql问题请教

2021-07-06 Thread silence
已解决 where 条件始终为假。
--
发件人:silence 
发送时间:2021年7月7日(星期三) 12:05
收件人:user-zh 
主 题:flinksql问题请教


请教一下下面的sql为什么会被翻译成有限数据集?源表是个kafka source,用了row_number进行开窗和两次Table Function join
flink版本:1.12.2

Stage 1 : Data Source
content : Source: Values(tuples=[[]])
Stage 2 : Operator
content : Correlate(invocation=[LateralArray($cor3.gift_list)], 
correlate=[table(LateralArray($cor3.gift_list))], 
select=[order_id,stack_id,order_item_id,sku_id,sku_name,quantity,product_type,original_price,unit_original_price,promotion_reduce_price,coupon_reduce_price,total_pay,unit_total_pay,is_gift,promotion_id,promotion_type,promotion_quantity,promotion_discount,gift_list,process_time,EXPR$0],
 rowType=[RecordType(BIGINT order_id, BIGINT stack_id, BIGINT order_item_id, 
BIGINT sku_id, VARCHAR(2147483647) sku_name, INTEGER quantity, INTEGER 
product_type, BIGINT original_price, BIGINT unit_original_price, BIGINT 
promotion_reduce_price, BIGINT coupon_reduce_price, BIGINT total_pay, BIGINT 
unit_total_pay, INTEGER is_gift, BIGINT promotion_id, INTEGER promotion_type, 
INTEGER promotion_quantity, BIGINT promotion_discount, VARCHAR(2147483647) 
gift_list, TIME ATTRIBUTE(PROCTIME) process_time, VARCHAR(2147483647) EXPR$0)], 
joinType=[LEFT])
select
  order_id,
  stack_id,
  order_item_id,
  sku_id,
  sku_name,
  quantity,
  product_type,
  original_price,
  unit_original_price,
  promotion_reduce_price,
  coupon_reduce_price,
  total_pay,
  unit_total_pay,
  is_gift,
  promotion_id,
  promotion_type,
  promotion_quantity,
  promotion_discount,
  process_time
from
  (
select
  order_id,
  stack_id,
  order_item_id,
  sku_id,
  sku_name,
  quantity,
  product_type,
  original_price,
  unit_original_price,
  promotion_reduce_price,
  coupon_reduce_price,
  total_pay,
  unit_total_pay,
  is_gift,
  promotion_id,
  promotion_type,
  if(
is_gift = 0,
promotion_quantity,
if(
  gift_item is null,
  promotion_quantity,
  if(
product_type = 1,
cast(
  JsonValue(gift_item, '$.quantity') as INTEGER
),
cast(1 as INTEGER)
  )
)
  ) as promotion_quantity,
  -- 赠品用内部属性复写
  if(
is_gift = 0,
promotion_discount,
if(
  gift_item is null,
  promotion_discount,
  cast(
JsonValue(gift_item, '$.discount') as BIGINT
  )
)
  ) as promotion_discount,
  -- 赠品用内部属性复写
  process_time
from
  (
select
  order_id,
  stack_id,
  order_item_id,
  cast(JsonValue(sku_info, '$.skuId') as BIGINT) as sku_id,
  JsonValue(sku_info, '$.name') as sku_name,
  if(
cast(JsonValue(sku_info, '$.productType') as INTEGER) = 1,
cast(JsonValue(sku_info, '$.quantity') as INTEGER),
1
  ) as quantity,
  -- 1.标品, 2.散装
  cast(JsonValue(sku_info, '$.productType') as INTEGER) as product_type,
  original_price,
  original_price / if(
cast(JsonValue(sku_info, '$.productType') as INTEGER) = 1,
cast(JsonValue(sku_info, '$.quantity') as INTEGER),
1
  ) as unit_original_price,
  promotion_reduce_price,
  coupon_reduce_price,
  total_pay,
  total_pay / if(
cast(JsonValue(sku_info, '$.productType') as INTEGER) = 1,
cast(JsonValue(sku_info, '$.quantity') as INTEGER),
1
  ) as unit_total_pay,
  -- 单位商品支付金额
  cast(is_gift as INTEGER) as is_gift,
  if(
promotion_detail is null,
cast(-1 as BIGINT),
cast(
  JsonValue(promotion_detail, '$.promotionId') as BIGINT
)
  ) as promotion_id,
  if(
promotion_detail is null,
cast(-1 as INTEGER),
cast(
  JsonValue(promotion_detail, '$.promotionType') as INTEGER
)
  ) as promotion_type,
  if(
promotion_detail is null,
cast(0 as INTEGER),
if(
  cast(JsonValue(sku_info, '$.productType') as INTEGER) = 1,
  cast(
JsonValue(promotion_detail, '$.quantity') as INTEGER
  ),
  cast(1 as INTEGER)
)
  ) as promotion_quantity,
  if(
promotion_detail is null,
cast(0 as BIGINT),
cast(
  JsonValue(promotion_detail, '$.discount') as BIGINT
)
  ) as promotion_discount,
  JsonValue(promotion_detail, '$.giftList') as gift_list,
  process_time
from
  (
select
  *
from
  (
select
  *,
  row_number

flinksql问题请教

2021-07-06 Thread silence

请教一下下面的sql为什么会被翻译成有限数据集?源表是个kafka source,用了row_number进行开窗和两次Table Function join
flink版本:1.12.2

Stage 1 : Data Source
content : Source: Values(tuples=[[]])
Stage 2 : Operator
content : Correlate(invocation=[LateralArray($cor3.gift_list)], 
correlate=[table(LateralArray($cor3.gift_list))], 
select=[order_id,stack_id,order_item_id,sku_id,sku_name,quantity,product_type,original_price,unit_original_price,promotion_reduce_price,coupon_reduce_price,total_pay,unit_total_pay,is_gift,promotion_id,promotion_type,promotion_quantity,promotion_discount,gift_list,process_time,EXPR$0],
 rowType=[RecordType(BIGINT order_id, BIGINT stack_id, BIGINT order_item_id, 
BIGINT sku_id, VARCHAR(2147483647) sku_name, INTEGER quantity, INTEGER 
product_type, BIGINT original_price, BIGINT unit_original_price, BIGINT 
promotion_reduce_price, BIGINT coupon_reduce_price, BIGINT total_pay, BIGINT 
unit_total_pay, INTEGER is_gift, BIGINT promotion_id, INTEGER promotion_type, 
INTEGER promotion_quantity, BIGINT promotion_discount, VARCHAR(2147483647) 
gift_list, TIME ATTRIBUTE(PROCTIME) process_time, VARCHAR(2147483647) EXPR$0)], 
joinType=[LEFT])
select
  order_id,
  stack_id,
  order_item_id,
  sku_id,
  sku_name,
  quantity,
  product_type,
  original_price,
  unit_original_price,
  promotion_reduce_price,
  coupon_reduce_price,
  total_pay,
  unit_total_pay,
  is_gift,
  promotion_id,
  promotion_type,
  promotion_quantity,
  promotion_discount,
  process_time
from
  (
select
  order_id,
  stack_id,
  order_item_id,
  sku_id,
  sku_name,
  quantity,
  product_type,
  original_price,
  unit_original_price,
  promotion_reduce_price,
  coupon_reduce_price,
  total_pay,
  unit_total_pay,
  is_gift,
  promotion_id,
  promotion_type,
  if(
is_gift = 0,
promotion_quantity,
if(
  gift_item is null,
  promotion_quantity,
  if(
product_type = 1,
cast(
  JsonValue(gift_item, '$.quantity') as INTEGER
),
cast(1 as INTEGER)
  )
)
  ) as promotion_quantity,
  -- 赠品用内部属性复写
  if(
is_gift = 0,
promotion_discount,
if(
  gift_item is null,
  promotion_discount,
  cast(
JsonValue(gift_item, '$.discount') as BIGINT
  )
)
  ) as promotion_discount,
  -- 赠品用内部属性复写
  process_time
from
  (
select
  order_id,
  stack_id,
  order_item_id,
  cast(JsonValue(sku_info, '$.skuId') as BIGINT) as sku_id,
  JsonValue(sku_info, '$.name') as sku_name,
  if(
cast(JsonValue(sku_info, '$.productType') as INTEGER) = 1,
cast(JsonValue(sku_info, '$.quantity') as INTEGER),
1
  ) as quantity,
  -- 1.标品, 2.散装
  cast(JsonValue(sku_info, '$.productType') as INTEGER) as product_type,
  original_price,
  original_price / if(
cast(JsonValue(sku_info, '$.productType') as INTEGER) = 1,
cast(JsonValue(sku_info, '$.quantity') as INTEGER),
1
  ) as unit_original_price,
  promotion_reduce_price,
  coupon_reduce_price,
  total_pay,
  total_pay / if(
cast(JsonValue(sku_info, '$.productType') as INTEGER) = 1,
cast(JsonValue(sku_info, '$.quantity') as INTEGER),
1
  ) as unit_total_pay,
  -- 单位商品支付金额
  cast(is_gift as INTEGER) as is_gift,
  if(
promotion_detail is null,
cast(-1 as BIGINT),
cast(
  JsonValue(promotion_detail, '$.promotionId') as BIGINT
)
  ) as promotion_id,
  if(
promotion_detail is null,
cast(-1 as INTEGER),
cast(
  JsonValue(promotion_detail, '$.promotionType') as INTEGER
)
  ) as promotion_type,
  if(
promotion_detail is null,
cast(0 as INTEGER),
if(
  cast(JsonValue(sku_info, '$.productType') as INTEGER) = 1,
  cast(
JsonValue(promotion_detail, '$.quantity') as INTEGER
  ),
  cast(1 as INTEGER)
)
  ) as promotion_quantity,
  if(
promotion_detail is null,
cast(0 as BIGINT),
cast(
  JsonValue(promotion_detail, '$.discount') as BIGINT
)
  ) as promotion_discount,
  JsonValue(promotion_detail, '$.giftList') as gift_list,
  process_time
from
  (
select
  *
from
  (
select
  *,
  row_number() over (
partition by order_id,
order_item_id,
stack_id
order by
 

回复:flink sql 依赖隔离

2021-07-05 Thread silence

没用放在lib下,是启动时通过-C动态添加udf jar,一个sql作业可能会用到很多udf,可能是不同的用户写的,所以经常会出现依赖冲突
--
发件人:yzhhui 
发送时间:2021年7月5日(星期一) 14:09
收件人:user-zh@flink.apache.org ; silence 

抄 送:user-zh 
主 题:回复:flink sql 依赖隔离

 
提交任务的时候提交自己的jar就好了,这个不要放公共lib下 就OK

在2021年07月5日 14:07,silence 写道:  请教大家目前flink 
sql有没有办法做到依赖隔离
比如connector,format,udf(这个最重要)等,
很多情况下不同用户定义的组件会经常混用,会有很多的依赖冲突问题难以解决。
目前有没有办法做到udf的依赖隔离(比如不同的udf使用独立的jar和classloader),或者社区对此有没有什么规划



flink sql 依赖隔离

2021-07-05 Thread silence
请教大家目前flink sql有没有办法做到依赖隔离
比如connector,format,udf(这个最重要)等,
很多情况下不同用户定义的组件会经常混用,会有很多的依赖冲突问题难以解决。
目前有没有办法做到udf的依赖隔离(比如不同的udf使用独立的jar和classloader),或者社区对此有没有什么规划

回复:普通表join版本表,怎么得到append表

2021-06-30 Thread silence
目前interval join和维表的时态join不会进行回撤,其他场景会产生回撤数据


--
发件人:杨光跃 
发送时间:2021年6月30日(星期三) 17:47
收件人:user-zh@flink.apache.org 
主 题:普通表join版本表,怎么得到append表

大佬们,请教个问题,
insert into sink_2 
select a.`time`,c.cust,b.mobile
from case2_TOPIC_A a
left join card_data b on a.card = b.card
left join view_new_card_info c on a.card = c.card;




case2_TOPIC_A  是一个普通表,view_new_card_info 是维表, 我要插入的 sink_2 其实应该是一个apped表。  
为什么提交的时候要求 
please declare primary key for sink table when query contains update/delete 
record.


我这个只需要追加就可以了吧,该怎么处理呢?


| |
杨光跃
|
|
yangguangyuem...@163.com
|
签名由网易邮箱大师定制



回复:flink sql 空闲数据源场景如何配置

2021-06-30 Thread silence
可参考 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/config/#table-exec-source-idle-timeout


--
发件人:杨光跃 
发送时间:2021年6月30日(星期三) 10:54
收件人:user-zh@flink.apache.org 
主 题:flink sql 空闲数据源场景如何配置

在代码中可以通过  .withIdleness(Duration.ofMinutes(1)); 指定空闲数据源也触发水印,那么sql中怎么表示呢


| |
杨光跃
|
|
yangguangyuem...@163.com
|
签名由网易邮箱大师定制

Serializer consumed more bytes than the record had

2021-06-11 Thread silence
flink 版本1.12
异常如下:
java.io.IOException: Can't get next record for channel
InputChannelInfo{gateIdx=0, inputChannelIdx=0}
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:166)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:396)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Serializer consumed more bytes than the
record had. This indicates broken serialization. If you are using custom
serialization types (Value or Writable), check their serialization methods.
If you are using a Kryo-serialized type, check the corresponding Kryo
serializer.
at
org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:339)
at
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:111)
at
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:86)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:163)
... 8 more
Caused by: java.lang.IndexOutOfBoundsException: pos: 140427053897089,
length: 16805746, index: 17, offset: 0
at
org.apache.flink.core.memory.HybridMemorySegment.get(HybridMemorySegment.java:224)
at
org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readFully(NonSpanningWrapper.java:101)
at
org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readFully(NonSpanningWrapper.java:92)
at
org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer.deserialize(BinaryRowDataSerializer.java:100)
at
org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:108)
at
org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:48)
at
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:191)
at
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
at
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:53)
at
org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:337)
... 11 more

大概是什么原因



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 如何统计n小时内,flink成功从kafka消费的数据量?

2021-05-18 Thread silence
可以在metrics 上报时或落地前对source两次上报间隔的numRecordsOut值进行相减,最后呈现的时候按时间段累计就可以了



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Table-api sql 预检查

2021-04-29 Thread silence
可以用explain



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink sql如何从远程加载jar包中的udf

2021-03-11 Thread silence
启动时通过-C加到classpath里试试



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 提交两个SQL任务,其中一个不生效。

2021-03-10 Thread silence
多个insert的话要用statementset去提交



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink Job 如何集成到自己的系统,方便管理

2021-03-09 Thread silence
个人也维护了个flink平台的开源项目,希望可以帮助到你
https://github.com/hairless/plink



--
Sent from: http://apache-flink.147419.n8.nabble.com/


[sql]TimeStamp和异常格式的字符串进行比较时会报空指针

2021-03-05 Thread silence
问题描述:
TimeStamp类型和异常格式的字符串进行比较时会在任务运行时报空指针
像这种错误虽然是用户书写错误导致的,但运行时才能发现问题,且sql太长时不好定位具体原因
是否可以在编译期进行类型的验证,尽早发现问题并给出sql的文本坐标

例:where CURRENT_TIMESTAMP=''
  where CURRENT_TIMESTAMP='19700101'

java.lang.NullPointerException: null
at
org.apache.flink.table.data.TimestampData.compareTo(TimestampData.java:112)
at StreamExecCalc$4.processElement(Unknown Source)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
at
org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:76)
at
org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:32)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394)
at 
java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
at
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at
java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
at
java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at 
java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 通过普通ddl来读写hive

2021-02-23 Thread silence
那用自定义的catalog怎么定义hive表来读写hive呢



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 通过普通ddl来读写hive

2021-02-23 Thread silence
我理解各个公司都会有自己的元数据管理平台,hive表的创建修改都需要经过严格的权限控制在平台上进行操作,包括调度任务、实时写入任务、数据血缘等。
我个人觉得理想的方式是单个flink
sql的所有的connector通过自维护的元数据进行生成,不需要引入hivecatalog,使用默认的MemoryCatalog即可。
总结一下就是不希望引入HiveCatalog来进行hive表的读写



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 通过普通ddl来读写hive

2021-02-23 Thread silence
你好
感谢回复
主要有以下几点原因:
1、直接使用hive catalog进行hive表的创建修改风险太高,更希望在平台层限制hive表的创建和修改
2、connector的配置是保存在hive表的DBPROPERTIES里的,这是否就意味着想通过flink往现有hive表里写数据需要先通过alter语句修改hive表的属性配置,这里不希望对用户直接暴露alter
hive的能力
3、使用普通的ddl可以与现有connector的定义统一风格,不需要来回切换方言
4、可以不用将配置信息持久化,通过GenericInMemoryCatalog使用即可



--
Sent from: http://apache-flink.147419.n8.nabble.com/

通过普通ddl来读写hive

2021-02-22 Thread silence
问一下社区有没有计划支持普通的ddl(不用hive的catalog)来进行读写hive表吗
现在不支持是有什么考虑吗



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink1.12 不能同时在一个工程消费jdbc和kafka CDC数据

2021-02-18 Thread silence


可以尝试在shade插件里加个transformer




--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink-1.12 通过-t指定模式后无法指定yarn参数

2021-01-27 Thread silence
flink1.12后所有的yarn相关的参数通过-D进行指定
例:-D yarn.application.name=xxx 替代以前的-ynm xxx
更多配置参考文档https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html#yarn



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 请教关于Flink yarnship的使用

2021-01-22 Thread silence
 你可以尝试同时指定-C "file:///path/to/conf/cmp_online.cfg" 以及 -yt /path/to/conf 来进行测试
然后代码里这么获取this.getClass().getResourceAsStream("cmp_online.cfg")



--
Sent from: http://apache-flink.147419.n8.nabble.com/


flink sql 数组下标问题

2020-12-31 Thread silence
flink sql官方文档中数组的取值方式如下定义
array ‘[’ integer ‘]’  Returns the element at position integer in array. The
index starts from 1.
参考链接
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/functions/systemFunctions.html#collection-functions

主要问题就是数组的下标是从1开始的,这不符合数组从0开始的常识,也和hive sql不兼容,在实时和离线开发中经常会导致很多数据问题排查起来很困难。
因此,在考虑兼容历史flink sql版本的情况下能否通过增加配置来设置数组开始的下标,来兼容数组下标从0开始的使用习惯

谢谢。



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: yarn application模式提交任务失败

2020-12-20 Thread silence
应该是-D不是-yD



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink-shaded-hadoop-2-uber*-* 版本确定问题

2020-12-13 Thread silence
flink已经不建议将hadoop的jar放到lib里了

可以通过
export HADOOP_CLASSPATH=`hadoop classpath` 
加载hadoop的依赖
参考链接:
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/hadoop.html#providing-hadoop-classes



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink使用多个keytab

2020-12-06 Thread silence
这个问题我们也遇到过,目前这个issue在跟进,https://issues.apache.org/jira/browse/FLINK-12130



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: FlinkSQL如何定义JsonObject数据的字段类型

2020-12-06 Thread silence
可以用string



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 请教一下目前flink submit能不能指定额外的依赖jar

2020-11-30 Thread silence
你好:

这个原因最开始已经说明了,main jar就是将传入的sql参数进行解析封装,而sql里用到的udf、connector之类的类型希望可以做到动态指定
一方面可以做到灵活的依赖控制,减少main jar的大小
另一方吧可以减少不同connector和udf,或不同版本connector和udf的依赖冲突的可能性

ps:假如平台有数十种connector和数百个udf都打到一个fast jar里想想都觉得不太优雅吧



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 请教一下目前flink submit能不能指定额外的依赖jar

2020-11-30 Thread silence
看了很多同学回复yarn的解决方案

我这再补充一下:
还是希望可以提供更通用的submit参数来解决此问题,
包括提交到standalone集群时可以额外指定本地依赖jar

有没有cli相关的同学可以跟进下建议
谢谢



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Re:请教一下目前flink submit能不能指定额外的依赖jar

2020-11-05 Thread silence
感谢回复,还是希望可以从submit上解决这个问题,不能添加依赖限制了很多应用场景,特别是针对平台来说



--
Sent from: http://apache-flink.147419.n8.nabble.com/

请教一下目前flink submit能不能指定额外的依赖jar

2020-11-05 Thread silence
大家好

由于目前用了flink SQL封装了jar包,sql是作为参数动态传入的,
因此需要动态的引入一下依赖jar,比如udf jar,connector的jar等,
由于不同任务的依赖jar是不同的,不适合都放在flink lib目录下(可能会冲突)
因此想请教一下有没有办法在submit时才指定任务依赖的jar包,类似spark submit的--jars
没有的话有没有相关的issue可以跟进这个问题

谢谢



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink任务挂掉后自动重启

2020-11-01 Thread silence
说一下我们平台的实现方式
1、自定义metricReporter,假如任务开启了checkpoint,reporter会自动的将最新完成的checkpoint路径进行上报
   
可参考https://ci.apache.org/projects/flink/flink-docs-release-1.11/monitoring/metrics.html#checkpointing
2、平台会有是否重试和是否基于checkpoint进行恢复的选项
3、假如上述两选项都开启了之后,可以对运行失败的任务基于最新的checkpoint进行拉起



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 官方后续会有支持kafka lag metric的计划吗

2020-10-28 Thread silence
hi zhisheng
我找到两篇相关的参考博客你看一下
https://blog.csdn.net/a1240466196/article/details/107853926
https://www.jianshu.com/p/c7515bdde1f7



--
Sent from: http://apache-flink.147419.n8.nabble.com/


官方后续会有支持kafka lag metric的计划吗

2020-10-28 Thread silence
目前消费kafka会有lag的情况发生,因此想基于flink metric进行上报监控kakfa的消费延时情况
主要是两种情况:
1、当前group消费的offset和对应topic最大offset之间的差值,也就是积压的数据量
2、当前消费的最新记录的timestamp和系统时间之间的差值,也就是消费的时间延时

kafka lag的监控对实时任务的稳定运行有着非常重要的作用,
网上也检索到了一些基于源码修改的实现,但修改源码的话也不利于后面flink版本的升级,还是希望官方可以考虑支持一下



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink 1.11里如何parse出未解析的执行计划

2020-10-21 Thread silence
我简单写了一下仅供参考

import org.apache.calcite.config.Lex;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.parser.SqlParseException;
import org.apache.calcite.sql.parser.SqlParser;
import org.apache.flink.sql.parser.impl.FlinkSqlParserImpl;
import org.apache.flink.sql.parser.validate.FlinkSqlConformance;

/**
 * @author: silence
 * @date: 2020/10/22
 */
public class Test {
public static void main(String[] args) throws SqlParseException {
String sql = "xxx";
SqlParser.Config sqlParserConfig = SqlParser
.configBuilder()
.setParserFactory(FlinkSqlParserImpl.FACTORY)
.setConformance(FlinkSqlConformance.DEFAULT)
.setLex(Lex.JAVA)
.setIdentifierMaxLength(256)
.build();
SqlParser sqlParser = SqlParser.create(sql, sqlParserConfig);
SqlNodeList sqlNodes = sqlParser.parseStmtList();
for (SqlNode sqlNode : sqlNodes) {
//do something
}
}
}



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: FlinkSQL是否支持设置窗口trigger实现continuious trigger呢

2020-09-27 Thread silence
也可以通过普通的非窗口聚合进行实现吧,minibatch设大点



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: [SQL] parse table name from sql statement

2020-09-21 Thread silence
我写过一个类似的可以参考一下

private static List lookupSelectTable(SqlNode sqlNode) {
List list = new ArrayList<>();
if (sqlNode instanceof SqlSelect) {
SqlNode from = ((SqlSelect) sqlNode).getFrom();
list.addAll(lookupSelectTable(from));
} else if (sqlNode instanceof SqlJoin) {
SqlJoin sqlJoin = (SqlJoin) sqlNode;
list.addAll(lookupSelectTable(sqlJoin.getLeft()));
list.addAll(lookupSelectTable(sqlJoin.getRight()));
} else if (sqlNode instanceof SqlBasicCall) {
SqlBasicCall sqlBasicCall = (SqlBasicCall) sqlNode;
SqlOperator operator = sqlBasicCall.getOperator();
if (SqlKind.AS.equals(operator.getKind())) {
   
list.addAll(lookupSelectTable(sqlBasicCall.getOperands()[0]));
} else if (SqlKind.UNION.equals(operator.getKind())) {
for (SqlNode operandSqlNode : sqlBasicCall.getOperands()) {
list.addAll(lookupSelectTable(operandSqlNode));
}
} else {
throw new RuntimeException("operator " + operator.getKind()
+ " not support");
}
} else if (sqlNode instanceof SqlIdentifier) {
list.add(((SqlIdentifier) sqlNode).getSimple());
} else {
throw new RuntimeException("operator " + sqlNode.getClass() + "
not support");
}
return list;
}



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: [SQL] parse table name from sql statement

2020-09-21 Thread silence
写过一个类似的可以参考一下

private static List lookupSelectTable(SqlNode sqlNode) {
List list = new ArrayList<>();
if (sqlNode instanceof SqlSelect) {
SqlNode from = ((SqlSelect) sqlNode).getFrom();
list.addAll(lookupSelectTable(from));
} else if (sqlNode instanceof SqlJoin) {
SqlJoin sqlJoin = (SqlJoin) sqlNode;
list.addAll(lookupSelectTable(sqlJoin.getLeft()));
list.addAll(lookupSelectTable(sqlJoin.getRight()));
} else if (sqlNode instanceof SqlBasicCall) {
SqlBasicCall sqlBasicCall = (SqlBasicCall) sqlNode;
SqlOperator operator = sqlBasicCall.getOperator();
if (SqlKind.AS.equals(operator.getKind())) {
   
list.addAll(lookupSelectTable(sqlBasicCall.getOperands()[0]));
} else if (SqlKind.UNION.equals(operator.getKind())) {
for (SqlNode operandSqlNode : sqlBasicCall.getOperands()) {
list.addAll(lookupSelectTable(operandSqlNode));
}
} else {
throw new RuntimeException("operator " + operator.getKind()
+ " not support");
}
} else if (sqlNode instanceof SqlIdentifier) {
list.add(((SqlIdentifier) sqlNode).getSimple());
} else {
throw new RuntimeException("operator " + sqlNode.getClass() + "
not support");
}
return list;
}



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink-CDC client 一对多问题

2020-09-21 Thread silence
可以写一个group_array的udaf
select  *  from aa as a left join (
select userId,group_array(row(userId, userBankNo, userBankNo)) from bb
group by userId
) as b where a.userId=b.userId




--
Sent from: http://apache-flink.147419.n8.nabble.com/


请教大家如何注册支持多返回值类型的UDAF

2020-09-16 Thread silence
如题,最近想实现一些类似于LAST_VALUE之类的UDAF,看了官网文档自己写了一下目前有以下一些疑问:

1、聚合结果需要重写AggregateFunction的getValue方法,而该方法需要返回固定的数据类型,如果要实现不同返回值的UDAF是否需要进行多个实现?

​2、如果是需要多个实现类的话如何注册到同一个方法名上?测试发现后注册的UDAF会覆盖之前的注册,也就是只有最后注册的UDAF生效,还是只能支持一种数据类型

​3、看了源码中的aggFuction的注册过程,发现也是对不同的数据类型进行了多次实现,然后在使用时根据参数的类型进行不同的实现类的创建,最后的疑问就是现有基于现有的flink
 api如果实现类似的效果呢?
感谢大佬们的解答



Re: flink sql执行sql语句无法执行的错误-No operators defined in streaming topology. Cannot execute.

2020-09-11 Thread silence
没有insert语句也就是没有sink无法触发计算



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink实时统计GMV,如果订单金额下午变了该怎么处理

2020-09-10 Thread silence
个人理解有几种实现方案
1、通过主键加LAST_VALUE()使用最新的记录进行计算
2、通过flink-cdc connector source
3、自己根据操作类型写计算逻辑




--
Sent from: http://apache-flink.147419.n8.nabble.com/

回复: flink-sql 1.11版本都还没完全支持checkpoint吗

2020-09-08 Thread silence
手动停止再恢复的话需要启动时通过 (-s 上一次checkpoint的mate路径)进行恢复
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/check
points.html#resuming-from-a-retained-checkpoint
-邮件原件-
发件人: 凌天荣 <466792...@qq.com> 
发送时间: 2020年9月8日 15:50
收件人: user-zh 
主题: flink-sql 1.11版本都还没完全支持checkpoint吗

代码里设置了enableCheckpointing,任务停掉后,重启,还是没能消费停掉期间的数
据,也就是checkpoint没生效