SQL null 过滤的问题
flink 版本:1.12 列:col varchar 使用where col is null时可以过滤出col为null的记录 使用where col is null or col = ''时就不可以 同时试了下另外一种写法 where (case when col is null then true else false end) 可以过滤出来 where (case when col is null then true when col = '' then true else false end) 过滤不出来 请问这个bug吗,还是语法有问题
回复:回复:场景题:Flink SQL 不支持 INSERT INTO… ON DUPLICATE KEY UPDATE ?
用的什么版本这个应该已经在1.13中修复了https://issues.apache.org/jira/browse/FLINK-18726 不行的话可以在ddl中限制列的数量 -- 发件人:Ye Chen 发送时间:2021年8月2日(星期一) 11:37 收件人:user-zh ; silence 主 题:Re:回复:场景题:Flink SQL 不支持 INSERT INTO… ON DUPLICATE KEY UPDATE ? 你好,我试了一下,如果表的ddl是三个字段,但是insert只指定两个字段的话,会报错: [ERROR] Could not execute SQL statement. Reason: org.apache.flink.table.api.ValidationException: Column types of query result and sink for registered table 'default_catalog.default_database.t' do not match. Cause: Different number of columns. 我们的需求是想根据主键更新部分字段 - 需求:现有table CREATE TABLE t ( abigint, bbigint, cbigint, PRIMARY KEY (a) NOT ENFORCED ) WITH ( ... ); 我们的场景只想根据主键a更新部分字段b,其余的字段c保持不变, 例如mysql 支持 insert into t(a,b,c) select '1','2','3' on duplicate key update b='4';主键重复的时候只更新字段b,字段c的值不变。 我在官方文档中没找到这个用法,sql-client也测试了一下也不支持 on duplicate key update,会报错。 请问这种根据主键更新部分字段的场景 使用flink sql应该怎么处理? 在 2021-08-02 10:47:55,"silence" 写道: >如果只想更新部分字段的话可以试下 >insert into t(a,b) select a,b from x > > >-- >发件人:Ye Chen >发送时间:2021年7月30日(星期五) 17:57 >收件人:user-zh >主 题:场景题:Flink SQL 不支持 INSERT INTO… ON DUPLICATE KEY UPDATE ? > >现有table >CREATE TABLE t ( > abigint, > bbigint, > cbigint, > PRIMARY KEY (a) NOT ENFORCED >) WITH ( >... >); > > >我们的场景只想根据主键a更新部分字段b,其余的字段保持不变,例如 >mysql 支持 insert into t(a,b,c) select '1','2','3' on duplicate key update >b='4'; >主键重复的时候只更新字段b,字段c的值不变 > > >我在官方文档中没找到这个用法,sql-client也测试了一下也不支持 on duplicate key update,会报错。 >请问这种部分字段更新的场景 使用flink sql应该怎么处理? > >
回复:场景题:Flink SQL 不支持 INSERT INTO… ON DUPLICATE KEY UPDATE ?
如果只想更新部分字段的话可以试下 insert into t(a,b) select a,b from x -- 发件人:Ye Chen 发送时间:2021年7月30日(星期五) 17:57 收件人:user-zh 主 题:场景题:Flink SQL 不支持 INSERT INTO… ON DUPLICATE KEY UPDATE ? 现有table CREATE TABLE t ( abigint, bbigint, cbigint, PRIMARY KEY (a) NOT ENFORCED ) WITH ( ... ); 我们的场景只想根据主键a更新部分字段b,其余的字段保持不变,例如 mysql 支持 insert into t(a,b,c) select '1','2','3' on duplicate key update b='4'; 主键重复的时候只更新字段b,字段c的值不变 我在官方文档中没找到这个用法,sql-client也测试了一下也不支持 on duplicate key update,会报错。 请问这种部分字段更新的场景 使用flink sql应该怎么处理?
回复:场景题:Flink SQL 不支持 INSERT INTO… ON DUPLICATE KEY UPDATE ?
你在你的sink ddl定义了主键会自动的按主键进行upsert的 参考https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/jdbc/#idempotent-writes -- 发件人:Ye Chen 发送时间:2021年7月30日(星期五) 17:57 收件人:user-zh 主 题:场景题:Flink SQL 不支持 INSERT INTO… ON DUPLICATE KEY UPDATE ? 现有table CREATE TABLE t ( abigint, bbigint, cbigint, PRIMARY KEY (a) NOT ENFORCED ) WITH ( ... ); 我们的场景只想根据主键a更新部分字段b,其余的字段保持不变,例如 mysql 支持 insert into t(a,b,c) select '1','2','3' on duplicate key update b='4'; 主键重复的时候只更新字段b,字段c的值不变 我在官方文档中没找到这个用法,sql-client也测试了一下也不支持 on duplicate key update,会报错。 请问这种部分字段更新的场景 使用flink sql应该怎么处理?
回复:回复:flink sql 依赖隔离
就是单独引用的啊,但任务逻辑比较复杂时会同时混用多个udf这个是没法避免的啊 -- 发件人:Michael Ran 发送时间:2021年7月23日(星期五) 17:42 收件人:user-zh ; silence 主 题:Re:回复:flink sql 依赖隔离 建议上传的时候单独放,提交任务的时候 拉下来单独引用 在 2021-07-23 11:01:59,"silence" 写道: > >这边目前主要还是yarn,目前痛点是同一个sql任务会用到多个udf,udf通过-C http://xxx.jar进行加载 >udf和sql jar之间、udf和udf之间都可能会有依赖冲突, >目前初步想法是每个udf绑定一个jarpath,使用独立的classloader进行加载,避免和主jar以及其他udf之间的依赖冲突 >-- >发件人:Michael Ran >发送时间:2021年7月22日(星期四) 20:07 >收件人:user-zh ; silence >主 题:Re:flink sql 依赖隔离 > >通过任务进行隔离引用呗。你们美团已经是k8s了吧? >在 2021-07-05 14:06:53,"silence" 写道: >>请教大家目前flink sql有没有办法做到依赖隔离 >>比如connector,format,udf(这个最重要)等, >>很多情况下不同用户定义的组件会经常混用,会有很多的依赖冲突问题难以解决。 >>目前有没有办法做到udf的依赖隔离(比如不同的udf使用独立的jar和classloader),或者社区对此有没有什么规划 >
回复:flink sql 依赖隔离
这边目前主要还是yarn,目前痛点是同一个sql任务会用到多个udf,udf通过-C http://xxx.jar进行加载 udf和sql jar之间、udf和udf之间都可能会有依赖冲突, 目前初步想法是每个udf绑定一个jarpath,使用独立的classloader进行加载,避免和主jar以及其他udf之间的依赖冲突 -- 发件人:Michael Ran 发送时间:2021年7月22日(星期四) 20:07 收件人:user-zh ; silence 主 题:Re:flink sql 依赖隔离 通过任务进行隔离引用呗。你们美团已经是k8s了吧? 在 2021-07-05 14:06:53,"silence" 写道: >请教大家目前flink sql有没有办法做到依赖隔离 >比如connector,format,udf(这个最重要)等, >很多情况下不同用户定义的组件会经常混用,会有很多的依赖冲突问题难以解决。 >目前有没有办法做到udf的依赖隔离(比如不同的udf使用独立的jar和classloader),或者社区对此有没有什么规划
回复:flinksql问题请教
已解决 where 条件始终为假。 -- 发件人:silence 发送时间:2021年7月7日(星期三) 12:05 收件人:user-zh 主 题:flinksql问题请教 请教一下下面的sql为什么会被翻译成有限数据集?源表是个kafka source,用了row_number进行开窗和两次Table Function join flink版本:1.12.2 Stage 1 : Data Source content : Source: Values(tuples=[[]]) Stage 2 : Operator content : Correlate(invocation=[LateralArray($cor3.gift_list)], correlate=[table(LateralArray($cor3.gift_list))], select=[order_id,stack_id,order_item_id,sku_id,sku_name,quantity,product_type,original_price,unit_original_price,promotion_reduce_price,coupon_reduce_price,total_pay,unit_total_pay,is_gift,promotion_id,promotion_type,promotion_quantity,promotion_discount,gift_list,process_time,EXPR$0], rowType=[RecordType(BIGINT order_id, BIGINT stack_id, BIGINT order_item_id, BIGINT sku_id, VARCHAR(2147483647) sku_name, INTEGER quantity, INTEGER product_type, BIGINT original_price, BIGINT unit_original_price, BIGINT promotion_reduce_price, BIGINT coupon_reduce_price, BIGINT total_pay, BIGINT unit_total_pay, INTEGER is_gift, BIGINT promotion_id, INTEGER promotion_type, INTEGER promotion_quantity, BIGINT promotion_discount, VARCHAR(2147483647) gift_list, TIME ATTRIBUTE(PROCTIME) process_time, VARCHAR(2147483647) EXPR$0)], joinType=[LEFT]) select order_id, stack_id, order_item_id, sku_id, sku_name, quantity, product_type, original_price, unit_original_price, promotion_reduce_price, coupon_reduce_price, total_pay, unit_total_pay, is_gift, promotion_id, promotion_type, promotion_quantity, promotion_discount, process_time from ( select order_id, stack_id, order_item_id, sku_id, sku_name, quantity, product_type, original_price, unit_original_price, promotion_reduce_price, coupon_reduce_price, total_pay, unit_total_pay, is_gift, promotion_id, promotion_type, if( is_gift = 0, promotion_quantity, if( gift_item is null, promotion_quantity, if( product_type = 1, cast( JsonValue(gift_item, '$.quantity') as INTEGER ), cast(1 as INTEGER) ) ) ) as promotion_quantity, -- 赠品用内部属性复写 if( is_gift = 0, promotion_discount, if( gift_item is null, promotion_discount, cast( JsonValue(gift_item, '$.discount') as BIGINT ) ) ) as promotion_discount, -- 赠品用内部属性复写 process_time from ( select order_id, stack_id, order_item_id, cast(JsonValue(sku_info, '$.skuId') as BIGINT) as sku_id, JsonValue(sku_info, '$.name') as sku_name, if( cast(JsonValue(sku_info, '$.productType') as INTEGER) = 1, cast(JsonValue(sku_info, '$.quantity') as INTEGER), 1 ) as quantity, -- 1.标品, 2.散装 cast(JsonValue(sku_info, '$.productType') as INTEGER) as product_type, original_price, original_price / if( cast(JsonValue(sku_info, '$.productType') as INTEGER) = 1, cast(JsonValue(sku_info, '$.quantity') as INTEGER), 1 ) as unit_original_price, promotion_reduce_price, coupon_reduce_price, total_pay, total_pay / if( cast(JsonValue(sku_info, '$.productType') as INTEGER) = 1, cast(JsonValue(sku_info, '$.quantity') as INTEGER), 1 ) as unit_total_pay, -- 单位商品支付金额 cast(is_gift as INTEGER) as is_gift, if( promotion_detail is null, cast(-1 as BIGINT), cast( JsonValue(promotion_detail, '$.promotionId') as BIGINT ) ) as promotion_id, if( promotion_detail is null, cast(-1 as INTEGER), cast( JsonValue(promotion_detail, '$.promotionType') as INTEGER ) ) as promotion_type, if( promotion_detail is null, cast(0 as INTEGER), if( cast(JsonValue(sku_info, '$.productType') as INTEGER) = 1, cast( JsonValue(promotion_detail, '$.quantity') as INTEGER ), cast(1 as INTEGER) ) ) as promotion_quantity, if( promotion_detail is null, cast(0 as BIGINT), cast( JsonValue(promotion_detail, '$.discount') as BIGINT ) ) as promotion_discount, JsonValue(promotion_detail, '$.giftList') as gift_list, process_time from ( select * from ( select *, row_number
flinksql问题请教
请教一下下面的sql为什么会被翻译成有限数据集?源表是个kafka source,用了row_number进行开窗和两次Table Function join flink版本:1.12.2 Stage 1 : Data Source content : Source: Values(tuples=[[]]) Stage 2 : Operator content : Correlate(invocation=[LateralArray($cor3.gift_list)], correlate=[table(LateralArray($cor3.gift_list))], select=[order_id,stack_id,order_item_id,sku_id,sku_name,quantity,product_type,original_price,unit_original_price,promotion_reduce_price,coupon_reduce_price,total_pay,unit_total_pay,is_gift,promotion_id,promotion_type,promotion_quantity,promotion_discount,gift_list,process_time,EXPR$0], rowType=[RecordType(BIGINT order_id, BIGINT stack_id, BIGINT order_item_id, BIGINT sku_id, VARCHAR(2147483647) sku_name, INTEGER quantity, INTEGER product_type, BIGINT original_price, BIGINT unit_original_price, BIGINT promotion_reduce_price, BIGINT coupon_reduce_price, BIGINT total_pay, BIGINT unit_total_pay, INTEGER is_gift, BIGINT promotion_id, INTEGER promotion_type, INTEGER promotion_quantity, BIGINT promotion_discount, VARCHAR(2147483647) gift_list, TIME ATTRIBUTE(PROCTIME) process_time, VARCHAR(2147483647) EXPR$0)], joinType=[LEFT]) select order_id, stack_id, order_item_id, sku_id, sku_name, quantity, product_type, original_price, unit_original_price, promotion_reduce_price, coupon_reduce_price, total_pay, unit_total_pay, is_gift, promotion_id, promotion_type, promotion_quantity, promotion_discount, process_time from ( select order_id, stack_id, order_item_id, sku_id, sku_name, quantity, product_type, original_price, unit_original_price, promotion_reduce_price, coupon_reduce_price, total_pay, unit_total_pay, is_gift, promotion_id, promotion_type, if( is_gift = 0, promotion_quantity, if( gift_item is null, promotion_quantity, if( product_type = 1, cast( JsonValue(gift_item, '$.quantity') as INTEGER ), cast(1 as INTEGER) ) ) ) as promotion_quantity, -- 赠品用内部属性复写 if( is_gift = 0, promotion_discount, if( gift_item is null, promotion_discount, cast( JsonValue(gift_item, '$.discount') as BIGINT ) ) ) as promotion_discount, -- 赠品用内部属性复写 process_time from ( select order_id, stack_id, order_item_id, cast(JsonValue(sku_info, '$.skuId') as BIGINT) as sku_id, JsonValue(sku_info, '$.name') as sku_name, if( cast(JsonValue(sku_info, '$.productType') as INTEGER) = 1, cast(JsonValue(sku_info, '$.quantity') as INTEGER), 1 ) as quantity, -- 1.标品, 2.散装 cast(JsonValue(sku_info, '$.productType') as INTEGER) as product_type, original_price, original_price / if( cast(JsonValue(sku_info, '$.productType') as INTEGER) = 1, cast(JsonValue(sku_info, '$.quantity') as INTEGER), 1 ) as unit_original_price, promotion_reduce_price, coupon_reduce_price, total_pay, total_pay / if( cast(JsonValue(sku_info, '$.productType') as INTEGER) = 1, cast(JsonValue(sku_info, '$.quantity') as INTEGER), 1 ) as unit_total_pay, -- 单位商品支付金额 cast(is_gift as INTEGER) as is_gift, if( promotion_detail is null, cast(-1 as BIGINT), cast( JsonValue(promotion_detail, '$.promotionId') as BIGINT ) ) as promotion_id, if( promotion_detail is null, cast(-1 as INTEGER), cast( JsonValue(promotion_detail, '$.promotionType') as INTEGER ) ) as promotion_type, if( promotion_detail is null, cast(0 as INTEGER), if( cast(JsonValue(sku_info, '$.productType') as INTEGER) = 1, cast( JsonValue(promotion_detail, '$.quantity') as INTEGER ), cast(1 as INTEGER) ) ) as promotion_quantity, if( promotion_detail is null, cast(0 as BIGINT), cast( JsonValue(promotion_detail, '$.discount') as BIGINT ) ) as promotion_discount, JsonValue(promotion_detail, '$.giftList') as gift_list, process_time from ( select * from ( select *, row_number() over ( partition by order_id, order_item_id, stack_id order by
回复:flink sql 依赖隔离
没用放在lib下,是启动时通过-C动态添加udf jar,一个sql作业可能会用到很多udf,可能是不同的用户写的,所以经常会出现依赖冲突 -- 发件人:yzhhui 发送时间:2021年7月5日(星期一) 14:09 收件人:user-zh@flink.apache.org ; silence 抄 送:user-zh 主 题:回复:flink sql 依赖隔离 提交任务的时候提交自己的jar就好了,这个不要放公共lib下 就OK 在2021年07月5日 14:07,silence 写道: 请教大家目前flink sql有没有办法做到依赖隔离 比如connector,format,udf(这个最重要)等, 很多情况下不同用户定义的组件会经常混用,会有很多的依赖冲突问题难以解决。 目前有没有办法做到udf的依赖隔离(比如不同的udf使用独立的jar和classloader),或者社区对此有没有什么规划
flink sql 依赖隔离
请教大家目前flink sql有没有办法做到依赖隔离 比如connector,format,udf(这个最重要)等, 很多情况下不同用户定义的组件会经常混用,会有很多的依赖冲突问题难以解决。 目前有没有办法做到udf的依赖隔离(比如不同的udf使用独立的jar和classloader),或者社区对此有没有什么规划
回复:普通表join版本表,怎么得到append表
目前interval join和维表的时态join不会进行回撤,其他场景会产生回撤数据 -- 发件人:杨光跃 发送时间:2021年6月30日(星期三) 17:47 收件人:user-zh@flink.apache.org 主 题:普通表join版本表,怎么得到append表 大佬们,请教个问题, insert into sink_2 select a.`time`,c.cust,b.mobile from case2_TOPIC_A a left join card_data b on a.card = b.card left join view_new_card_info c on a.card = c.card; case2_TOPIC_A 是一个普通表,view_new_card_info 是维表, 我要插入的 sink_2 其实应该是一个apped表。 为什么提交的时候要求 please declare primary key for sink table when query contains update/delete record. 我这个只需要追加就可以了吧,该怎么处理呢? | | 杨光跃 | | yangguangyuem...@163.com | 签名由网易邮箱大师定制
回复:flink sql 空闲数据源场景如何配置
可参考 https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/config/#table-exec-source-idle-timeout -- 发件人:杨光跃 发送时间:2021年6月30日(星期三) 10:54 收件人:user-zh@flink.apache.org 主 题:flink sql 空闲数据源场景如何配置 在代码中可以通过 .withIdleness(Duration.ofMinutes(1)); 指定空闲数据源也触发水印,那么sql中怎么表示呢 | | 杨光跃 | | yangguangyuem...@163.com | 签名由网易邮箱大师定制
Serializer consumed more bytes than the record had
flink 版本1.12 异常如下: java.io.IOException: Can't get next record for channel InputChannelInfo{gateIdx=0, inputChannelIdx=0} at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:166) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:396) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException: Serializer consumed more bytes than the record had. This indicates broken serialization. If you are using custom serialization types (Value or Writable), check their serialization methods. If you are using a Kryo-serialized type, check the corresponding Kryo serializer. at org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:339) at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:111) at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:86) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:163) ... 8 more Caused by: java.lang.IndexOutOfBoundsException: pos: 140427053897089, length: 16805746, index: 17, offset: 0 at org.apache.flink.core.memory.HybridMemorySegment.get(HybridMemorySegment.java:224) at org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readFully(NonSpanningWrapper.java:101) at org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readFully(NonSpanningWrapper.java:92) at org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer.deserialize(BinaryRowDataSerializer.java:100) at org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:108) at org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:48) at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:191) at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46) at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:53) at org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:337) ... 11 more 大概是什么原因 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: 如何统计n小时内,flink成功从kafka消费的数据量?
可以在metrics 上报时或落地前对source两次上报间隔的numRecordsOut值进行相减,最后呈现的时候按时间段累计就可以了 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: Table-api sql 预检查
可以用explain -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink sql如何从远程加载jar包中的udf
启动时通过-C加到classpath里试试 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: 提交两个SQL任务,其中一个不生效。
多个insert的话要用statementset去提交 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: Flink Job 如何集成到自己的系统,方便管理
个人也维护了个flink平台的开源项目,希望可以帮助到你 https://github.com/hairless/plink -- Sent from: http://apache-flink.147419.n8.nabble.com/
[sql]TimeStamp和异常格式的字符串进行比较时会报空指针
问题描述: TimeStamp类型和异常格式的字符串进行比较时会在任务运行时报空指针 像这种错误虽然是用户书写错误导致的,但运行时才能发现问题,且sql太长时不好定位具体原因 是否可以在编译期进行类型的验证,尽早发现问题并给出sql的文本坐标 例:where CURRENT_TIMESTAMP='' where CURRENT_TIMESTAMP='19700101' java.lang.NullPointerException: null at org.apache.flink.table.data.TimestampData.compareTo(TimestampData.java:112) at StreamExecCalc$4.processElement(Unknown Source) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:76) at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:32) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305) at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394) at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151) at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418) -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: 通过普通ddl来读写hive
那用自定义的catalog怎么定义hive表来读写hive呢 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: 通过普通ddl来读写hive
我理解各个公司都会有自己的元数据管理平台,hive表的创建修改都需要经过严格的权限控制在平台上进行操作,包括调度任务、实时写入任务、数据血缘等。 我个人觉得理想的方式是单个flink sql的所有的connector通过自维护的元数据进行生成,不需要引入hivecatalog,使用默认的MemoryCatalog即可。 总结一下就是不希望引入HiveCatalog来进行hive表的读写 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: 通过普通ddl来读写hive
你好 感谢回复 主要有以下几点原因: 1、直接使用hive catalog进行hive表的创建修改风险太高,更希望在平台层限制hive表的创建和修改 2、connector的配置是保存在hive表的DBPROPERTIES里的,这是否就意味着想通过flink往现有hive表里写数据需要先通过alter语句修改hive表的属性配置,这里不希望对用户直接暴露alter hive的能力 3、使用普通的ddl可以与现有connector的定义统一风格,不需要来回切换方言 4、可以不用将配置信息持久化,通过GenericInMemoryCatalog使用即可 -- Sent from: http://apache-flink.147419.n8.nabble.com/
通过普通ddl来读写hive
问一下社区有没有计划支持普通的ddl(不用hive的catalog)来进行读写hive表吗 现在不支持是有什么考虑吗 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink1.12 不能同时在一个工程消费jdbc和kafka CDC数据
可以尝试在shade插件里加个transformer -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink-1.12 通过-t指定模式后无法指定yarn参数
flink1.12后所有的yarn相关的参数通过-D进行指定 例:-D yarn.application.name=xxx 替代以前的-ynm xxx 更多配置参考文档https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html#yarn -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: 请教关于Flink yarnship的使用
你可以尝试同时指定-C "file:///path/to/conf/cmp_online.cfg" 以及 -yt /path/to/conf 来进行测试 然后代码里这么获取this.getClass().getResourceAsStream("cmp_online.cfg") -- Sent from: http://apache-flink.147419.n8.nabble.com/
flink sql 数组下标问题
flink sql官方文档中数组的取值方式如下定义 array ‘[’ integer ‘]’ Returns the element at position integer in array. The index starts from 1. 参考链接 https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/functions/systemFunctions.html#collection-functions 主要问题就是数组的下标是从1开始的,这不符合数组从0开始的常识,也和hive sql不兼容,在实时和离线开发中经常会导致很多数据问题排查起来很困难。 因此,在考虑兼容历史flink sql版本的情况下能否通过增加配置来设置数组开始的下标,来兼容数组下标从0开始的使用习惯 谢谢。 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: yarn application模式提交任务失败
应该是-D不是-yD -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink-shaded-hadoop-2-uber*-* 版本确定问题
flink已经不建议将hadoop的jar放到lib里了 可以通过 export HADOOP_CLASSPATH=`hadoop classpath` 加载hadoop的依赖 参考链接: https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/hadoop.html#providing-hadoop-classes -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink使用多个keytab
这个问题我们也遇到过,目前这个issue在跟进,https://issues.apache.org/jira/browse/FLINK-12130 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: FlinkSQL如何定义JsonObject数据的字段类型
可以用string -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: 请教一下目前flink submit能不能指定额外的依赖jar
你好: 这个原因最开始已经说明了,main jar就是将传入的sql参数进行解析封装,而sql里用到的udf、connector之类的类型希望可以做到动态指定 一方面可以做到灵活的依赖控制,减少main jar的大小 另一方吧可以减少不同connector和udf,或不同版本connector和udf的依赖冲突的可能性 ps:假如平台有数十种connector和数百个udf都打到一个fast jar里想想都觉得不太优雅吧 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: 请教一下目前flink submit能不能指定额外的依赖jar
看了很多同学回复yarn的解决方案 我这再补充一下: 还是希望可以提供更通用的submit参数来解决此问题, 包括提交到standalone集群时可以额外指定本地依赖jar 有没有cli相关的同学可以跟进下建议 谢谢 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: Re:请教一下目前flink submit能不能指定额外的依赖jar
感谢回复,还是希望可以从submit上解决这个问题,不能添加依赖限制了很多应用场景,特别是针对平台来说 -- Sent from: http://apache-flink.147419.n8.nabble.com/
请教一下目前flink submit能不能指定额外的依赖jar
大家好 由于目前用了flink SQL封装了jar包,sql是作为参数动态传入的, 因此需要动态的引入一下依赖jar,比如udf jar,connector的jar等, 由于不同任务的依赖jar是不同的,不适合都放在flink lib目录下(可能会冲突) 因此想请教一下有没有办法在submit时才指定任务依赖的jar包,类似spark submit的--jars 没有的话有没有相关的issue可以跟进这个问题 谢谢 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink任务挂掉后自动重启
说一下我们平台的实现方式 1、自定义metricReporter,假如任务开启了checkpoint,reporter会自动的将最新完成的checkpoint路径进行上报 可参考https://ci.apache.org/projects/flink/flink-docs-release-1.11/monitoring/metrics.html#checkpointing 2、平台会有是否重试和是否基于checkpoint进行恢复的选项 3、假如上述两选项都开启了之后,可以对运行失败的任务基于最新的checkpoint进行拉起 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: 官方后续会有支持kafka lag metric的计划吗
hi zhisheng 我找到两篇相关的参考博客你看一下 https://blog.csdn.net/a1240466196/article/details/107853926 https://www.jianshu.com/p/c7515bdde1f7 -- Sent from: http://apache-flink.147419.n8.nabble.com/
官方后续会有支持kafka lag metric的计划吗
目前消费kafka会有lag的情况发生,因此想基于flink metric进行上报监控kakfa的消费延时情况 主要是两种情况: 1、当前group消费的offset和对应topic最大offset之间的差值,也就是积压的数据量 2、当前消费的最新记录的timestamp和系统时间之间的差值,也就是消费的时间延时 kafka lag的监控对实时任务的稳定运行有着非常重要的作用, 网上也检索到了一些基于源码修改的实现,但修改源码的话也不利于后面flink版本的升级,还是希望官方可以考虑支持一下 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: Flink 1.11里如何parse出未解析的执行计划
我简单写了一下仅供参考 import org.apache.calcite.config.Lex; import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.SqlNodeList; import org.apache.calcite.sql.parser.SqlParseException; import org.apache.calcite.sql.parser.SqlParser; import org.apache.flink.sql.parser.impl.FlinkSqlParserImpl; import org.apache.flink.sql.parser.validate.FlinkSqlConformance; /** * @author: silence * @date: 2020/10/22 */ public class Test { public static void main(String[] args) throws SqlParseException { String sql = "xxx"; SqlParser.Config sqlParserConfig = SqlParser .configBuilder() .setParserFactory(FlinkSqlParserImpl.FACTORY) .setConformance(FlinkSqlConformance.DEFAULT) .setLex(Lex.JAVA) .setIdentifierMaxLength(256) .build(); SqlParser sqlParser = SqlParser.create(sql, sqlParserConfig); SqlNodeList sqlNodes = sqlParser.parseStmtList(); for (SqlNode sqlNode : sqlNodes) { //do something } } } -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: FlinkSQL是否支持设置窗口trigger实现continuious trigger呢
也可以通过普通的非窗口聚合进行实现吧,minibatch设大点 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: [SQL] parse table name from sql statement
我写过一个类似的可以参考一下 private static List lookupSelectTable(SqlNode sqlNode) { List list = new ArrayList<>(); if (sqlNode instanceof SqlSelect) { SqlNode from = ((SqlSelect) sqlNode).getFrom(); list.addAll(lookupSelectTable(from)); } else if (sqlNode instanceof SqlJoin) { SqlJoin sqlJoin = (SqlJoin) sqlNode; list.addAll(lookupSelectTable(sqlJoin.getLeft())); list.addAll(lookupSelectTable(sqlJoin.getRight())); } else if (sqlNode instanceof SqlBasicCall) { SqlBasicCall sqlBasicCall = (SqlBasicCall) sqlNode; SqlOperator operator = sqlBasicCall.getOperator(); if (SqlKind.AS.equals(operator.getKind())) { list.addAll(lookupSelectTable(sqlBasicCall.getOperands()[0])); } else if (SqlKind.UNION.equals(operator.getKind())) { for (SqlNode operandSqlNode : sqlBasicCall.getOperands()) { list.addAll(lookupSelectTable(operandSqlNode)); } } else { throw new RuntimeException("operator " + operator.getKind() + " not support"); } } else if (sqlNode instanceof SqlIdentifier) { list.add(((SqlIdentifier) sqlNode).getSimple()); } else { throw new RuntimeException("operator " + sqlNode.getClass() + " not support"); } return list; } -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: [SQL] parse table name from sql statement
写过一个类似的可以参考一下 private static List lookupSelectTable(SqlNode sqlNode) { List list = new ArrayList<>(); if (sqlNode instanceof SqlSelect) { SqlNode from = ((SqlSelect) sqlNode).getFrom(); list.addAll(lookupSelectTable(from)); } else if (sqlNode instanceof SqlJoin) { SqlJoin sqlJoin = (SqlJoin) sqlNode; list.addAll(lookupSelectTable(sqlJoin.getLeft())); list.addAll(lookupSelectTable(sqlJoin.getRight())); } else if (sqlNode instanceof SqlBasicCall) { SqlBasicCall sqlBasicCall = (SqlBasicCall) sqlNode; SqlOperator operator = sqlBasicCall.getOperator(); if (SqlKind.AS.equals(operator.getKind())) { list.addAll(lookupSelectTable(sqlBasicCall.getOperands()[0])); } else if (SqlKind.UNION.equals(operator.getKind())) { for (SqlNode operandSqlNode : sqlBasicCall.getOperands()) { list.addAll(lookupSelectTable(operandSqlNode)); } } else { throw new RuntimeException("operator " + operator.getKind() + " not support"); } } else if (sqlNode instanceof SqlIdentifier) { list.add(((SqlIdentifier) sqlNode).getSimple()); } else { throw new RuntimeException("operator " + sqlNode.getClass() + " not support"); } return list; } -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink-CDC client 一对多问题
可以写一个group_array的udaf select * from aa as a left join ( select userId,group_array(row(userId, userBankNo, userBankNo)) from bb group by userId ) as b where a.userId=b.userId -- Sent from: http://apache-flink.147419.n8.nabble.com/
请教大家如何注册支持多返回值类型的UDAF
如题,最近想实现一些类似于LAST_VALUE之类的UDAF,看了官网文档自己写了一下目前有以下一些疑问: 1、聚合结果需要重写AggregateFunction的getValue方法,而该方法需要返回固定的数据类型,如果要实现不同返回值的UDAF是否需要进行多个实现? 2、如果是需要多个实现类的话如何注册到同一个方法名上?测试发现后注册的UDAF会覆盖之前的注册,也就是只有最后注册的UDAF生效,还是只能支持一种数据类型 3、看了源码中的aggFuction的注册过程,发现也是对不同的数据类型进行了多次实现,然后在使用时根据参数的类型进行不同的实现类的创建,最后的疑问就是现有基于现有的flink api如果实现类似的效果呢? 感谢大佬们的解答
Re: flink sql执行sql语句无法执行的错误-No operators defined in streaming topology. Cannot execute.
没有insert语句也就是没有sink无法触发计算 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink实时统计GMV,如果订单金额下午变了该怎么处理
个人理解有几种实现方案 1、通过主键加LAST_VALUE()使用最新的记录进行计算 2、通过flink-cdc connector source 3、自己根据操作类型写计算逻辑 -- Sent from: http://apache-flink.147419.n8.nabble.com/
回复: flink-sql 1.11版本都还没完全支持checkpoint吗
手动停止再恢复的话需要启动时通过 (-s 上一次checkpoint的mate路径)进行恢复 https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/check points.html#resuming-from-a-retained-checkpoint -邮件原件- 发件人: 凌天荣 <466792...@qq.com> 发送时间: 2020年9月8日 15:50 收件人: user-zh 主题: flink-sql 1.11版本都还没完全支持checkpoint吗 代码里设置了enableCheckpointing,任务停掉后,重启,还是没能消费停掉期间的数 据,也就是checkpoint没生效