Re: Flink SQL Row里嵌套Array该如何用DDL定义?

2020-11-23 文章 Benchao Li
看起来你的DDL写的没有什么问题。 你用的是哪个Flink版本呢? 此外就是可以发下更完整的异常栈么? zilong xiao 于2020年11月24日周二 下午2:54写道: > Hi Benchao,图片可以看https://imgchr.com/i/DtoGge,期待您的解答~ > > Benchao Li 于2020年11月24日周二 下午2:49写道: > > > 你的图片挂了,可以将图片上传到第三方的图床再发出来;或者直接发送文本。 > > > > zilong xiao 于2020年11月24日周二 上午10:49写道: > > > > > [image:

Re: 退订

2020-11-23 文章 Xingbo Huang
Hi, 退订请发邮件到 user-zh-unsubscr...@flink.apache.org 详细的可以参考 [1] [1] https://flink.apache.org/zh/community.html#section-1 Best, Xingbo 李军 于2020年11月24日周二 下午2:49写道: > >

Re: ProcessingTime下的watermark

2020-11-23 文章 Xingbo Huang
Hi, watermark是对于数据的eventTime没有顺序到来帮助何时触发计算用的,你如果用processingTime来,processingTime肯定是递增的,就不存在乱序这个概念了,就不需要watermark了。 Best, Xingbo Kyle Zhang 于2020年11月24日周二 下午1:34写道: > Hi, > 使用flink1.11,在SQL ddl中基于process time声明watermark报错 > > SQL validation failed. Watermark can not be defined for a processing

FlinkSQL导致Prometheus内存暴涨

2020-11-23 文章 Luna Wong
FlinkSQL 生成的Metrics数据 task_name名字超长,导致Prometheus查询的时候内存暴涨,生产环境接受不了。 下面只是一个最简单的例子,复杂的SQL生成的task_name长达9000字节。这会导致Prometheus内存暴涨,我该怎么办。

Re: Flink SQL Row里嵌套Array该如何用DDL定义?

2020-11-23 文章 zilong xiao
Hi Benchao,图片可以看https://imgchr.com/i/DtoGge,期待您的解答~ Benchao Li 于2020年11月24日周二 下午2:49写道: > 你的图片挂了,可以将图片上传到第三方的图床再发出来;或者直接发送文本。 > > zilong xiao 于2020年11月24日周二 上午10:49写道: > > > [image: image.png] > > 如题,尝试用以下方式定义时会遇到异常,求社区大佬指点正确的打开姿势。 > > > > > -- > > Best, > Benchao Li >

退订

2020-11-23 文章 李军

Re: Flink SQL Row里嵌套Array该如何用DDL定义?

2020-11-23 文章 Benchao Li
你的图片挂了,可以将图片上传到第三方的图床再发出来;或者直接发送文本。 zilong xiao 于2020年11月24日周二 上午10:49写道: > [image: image.png] > 如题,尝试用以下方式定义时会遇到异常,求社区大佬指点正确的打开姿势。 > -- Best, Benchao Li

flink table api 或者 sql 使用 自定义含有的state方法

2020-11-23 文章 戴嘉诚
大家好: 请问,因为当前flink sql或者flink table 中,不支持自定义的udf中使用有state的逻辑,所以,当我们自己任务中,如果统计需要聚集型指标的情况下,就不能用上flink sql了,只能自己使用flink datastream去硬编码,请问,flink sql中,能否有其他方式,可以调用我们自己定义的有state的udf,并且可以不让再解析执行的时候,多次出现呢?还是说,只能一个指标一个flink job?

Re: 关于Catalog的建议

2020-11-23 文章 admin
感谢jark大佬,试过了确实可以 我是先用hive的catalog+dialect 建了 hive表, 然后切换到default catalog 建了 kafka source表, 在insert into hive select from kafka时需要指定hive_catalog.hive_db.hive_table,否则会报表不存在,因为当前是在default catalog 下。大家注意一下 > 2020年11月24日 上午11:41,Jark Wu 写道: > > 1. 可以的 > 2. 是的。见文档 >

ProcessingTime下的watermark

2020-11-23 文章 Kyle Zhang
Hi, 使用flink1.11,在SQL ddl中基于process time声明watermark报错 SQL validation failed. Watermark can not be defined for a processing time attribute column. 文档里关于watermark的解释也基本是跟eventTime在一起[1] 我想问的是基于processingTime的流处理是不需要watermark,还是被flink优化,不需要我们关心? [1]

Re:Flink SQL 对延迟数据怎么处理?

2020-11-23 文章 hailongwang
Hi, 据我所知,FlinkSQL 不支持将迟到的数据输出到侧流中。 如果你下游使用的是 window 的话,可以通过设置 `table.exec.emit.late-fire.enabled` 和 `table.exec.emit.late-fire.delay` 来触发晚于 watermark 到达的数据。 其中允许等待晚与 watermark 的数据的时间由 `table.exec.state.ttl` 控制,等价于 Datastream 中的 allowedLateness, 故 window 的最大等待时间为 watermark 的 outOfOrder +

Re:关于flink实时写入hbase用flush方法频繁报操作超时问题

2020-11-23 文章 hailongwang
这个错误感觉是 Hbase 的错误。具体实现的话,你可以参考社区的 HBaseSinkFunction[1] 的实现。 [1] https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkFunction.java Best, Hailong 在 2020-11-24 09:32:55,"bradyMk" 写道: >请教各位:

Re:flink sql 中是否可以使用 mysql 的存储过程和函数?

2020-11-23 文章 hailongwang
Hi, 不可以的,其中链接[1] 是Flink SQL 支持的所有内置函数,链接[2] 是 Flink SQL 允许自己定义函数,来满足个性化需求。 [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/functions/systemFunctions.html [2] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/functions/udfs.html Best, Hailong 在 2020-11-24

flink使用hive udf函数

2020-11-23 文章 酷酷的浑蛋
Flink-1.11.1, hive-2.2.0 在使用current_timestamp或者current_date函数时会报 Caused by: java.lang.NullPointerException at org.apache.hadoop.hive.ql.udf.generic.GenericUDFCurrentTimestamp.initialize(GenericUDFCurrentTimestamp.java:51) at

Re: Flink SQL的灵异事件----查询语句中增加表中的某个字段时就没法正常查询了。

2020-11-23 文章 jy l
我使用的是release-1.12.0-rc1 Best Jark Wu 于2020年11月24日周二 上午11:42写道: > 看报错像是一个 bug。 请问使用的是哪个版本呢? > 可以去 JIRA issue 提个 issue。 > > Best, > Jark > > On Tue, 24 Nov 2020 at 11:27, jy l wrote: > > > Hi: > > FlinkSQL我在使用时发生一件很诡异的事件。具体如下: > > > > 我的DDL: > > create table if not exists t_order( > > id int

Re: Flink SQL的灵异事件----查询语句中增加表中的某个字段时就没法正常查询了。

2020-11-23 文章 Jark Wu
看报错像是一个 bug。 请问使用的是哪个版本呢? 可以去 JIRA issue 提个 issue。 Best, Jark On Tue, 24 Nov 2020 at 11:27, jy l wrote: > Hi: > FlinkSQL我在使用时发生一件很诡异的事件。具体如下: > > 我的DDL: > create table if not exists t_order( > id int PRIMARY KEY comment '订单id', > timestamps bigint comment '订单创建时间', > orderInformationId string

Re: 关于Catalog的建议

2020-11-23 文章 Jark Wu
1. 可以的 2. 是的。见文档 https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/use.html#use-catloag 3. 是的。 Hive metastore catalog 就是 Flink 官方提供的通用 catalog(可以存任何 connector 类型)。 Best, Jark On Tue, 24 Nov 2020 at 10:58, admin <17626017...@163.com> wrote: > Hi Rui Li, > >

Re: 关于flink实时写入hbase用flush方法频繁报操作超时问题

2020-11-23 文章 bradyMk
补充下上个问题中图片的文字版: 图一: if (count > 300) { mutator.flush() count = 0 } count = count + 1 图二: Caused by: org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException: Failed 101 actions: Operation Timeout: 101 times, servers with issues:

Flink SQL的灵异事件----查询语句中增加表中的某个字段时就没法正常查询了。

2020-11-23 文章 jy l
Hi: FlinkSQL我在使用时发生一件很诡异的事件。具体如下: 我的DDL: create table if not exists t_order( id int PRIMARY KEY comment '订单id', timestamps bigint comment '订单创建时间', orderInformationId string comment '订单信息ID', userId string comment '用户ID', categoryId int comment '商品类别', productId int comment '商品ID', price

Re: 关于Catalog的建议

2020-11-23 文章 admin
Hi Rui Li, > FlinkSQL允许一个Session使用多个Catalog,所以Catalog的选择不是互斥的,可以混用。 一个job里面可以切换catalog的是吧,比如从读kafka中 写hive 的 db1.hive_table。 几个问题请教一下: 1.create kafka source 使用 memory catalog,hive table 使用hive catalog,这样是可以的吧 2.在sql里面切换catalog的语法是什么,在[1]里面没看到,是这样吗 USE CATALOG

Flink SQL Row里嵌套Array该如何用DDL定义?

2020-11-23 文章 zilong xiao
[image: image.png] 如题,尝试用以下方式定义时会遇到异常,求社区大佬指点正确的打开姿势。

关于flink实时写入hbase用flush方法频繁报操作超时问题

2020-11-23 文章 bradyMk
请教各位: 我用flink实时写入hbase,继承RichSinkFunction后用的hbase的BufferedMutator,每当写入一定量的数据后,就用flush的方法,类似这样: 但是我的任务会频繁报出如下错误:

Flink SQL 对延迟数据怎么处理?

2020-11-23 文章 jy l
Hi: 请教一下,FlinkSQL中,我在创建表时设置了watermark并设置了最大延迟,可是还是有数据依旧会迟到晚到,对于这样的数据我们又不想直接丢弃,那这个依旧迟到的数据我该怎么收集?是否有与StreamAPI一样可以将依旧迟到的数据进行分流的方案? 祝好!

flink sql 中是否可以使用 mysql 的存储过程和函数?

2020-11-23 文章 macdoor
需求是这样,mysql中使用 binary(16) 存储 uuid,读取到 flink中需要转换成文本串的uuid,sql是这样 select bin_to_uuid(id, true) as text_uuid from usertable 我尝试使用,报错说 bin_to_uuid 找不到 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 用flink 1.11.2 查询hive表自关联(self inner join) 结果不正确

2020-11-23 文章 macdoor
自己回答一下,供其他人参考。 换成flink 1.12.0-rc1,用相同sql处理相同数据,结果跟 hive 计算的结果相同,确认是 1.11.2 的一个bug,1.12应该已经改正了 -- Sent from: http://apache-flink.147419.n8.nabble.com/

statementset下source怎么完全复用

2020-11-23 文章 Jeff
请问一下,flink 1.11statement set 怎么复用同一个source呢? 希望同一个job里不同sink使用完全相同的数据,不是默认的用hash分流,这个有地方设置么?

关于Catalog的建议

2020-11-23 文章 赵一旦
目前Flink提供memory、jdbc、hive这3种catalog。 感觉实际使用中,可以使用如下几种方案。 (1)选择memory catalog,然后每次sql都带上自己的相关DDL。 (2)选择某种catalog,支持“持久化”DDL定义,然后具体sql就不需要带上自己相关的DDL了。 方案1和方案2各有优缺点。 方案1的优点: 比如sql1和sql2都只针kafka topic的部分时间段范围,这种情况某个kafka

Re: flink读mysql分库分表

2020-11-23 文章 Leonard Xu
Hi, 我没理解错的话你是想一次读出所有表(分库分表)的所有数据, 用一个DDL建表语句搞定,目前还不支持 祝好, Leonard > 在 2020年11月23日,17:22,酷酷的浑蛋 写道: > > > > flink读mysql分库分表可以自动识别吗? 还是只能一个一个读? >

Re: JdbcUpsertTableSink输出的时候报‘Duplicate entry’问题

2020-11-23 文章 赵一旦
哦哦, 好吧,我一直以为你说的“新旧”是是否指定了update-mode。理解错了。 good,那应该没问题了,我去改改。 Jark Wu 于2020年11月23日周一 下午5:18写道: > 你用的还是老的 connector 吧?也就是 'connector.type' = 'jdbc'。这个是根据 query > 是否有更新来决定工作模式的。 > 如果你用新的 connector,也就是 'connector' = 'jdbc',这个的行为就是你说的第 (2) 种行为。 > > Best, > Jark > > On Mon, 23 Nov 2020 at 17:14,

Re: Flink任务启动偶尔报错PartitionNotFoundException,会自动恢复。

2020-11-23 文章 赵一旦
@Yun Tang,应该是这个问题。 请教下这几个参数具体含义。 backoff in milliseconds for partition requests of input channels 是什么逻辑,以及initial和max分别表达含义。 akka.ask.timeout这个参数相对明显,就是超时,这个以前也涉及过,在cancel/submit/savepoint等情况都可能导致集群slot陆续没掉,然后再陆续回来(pass环境,基本就是部分机器失联,然后重新连接的case)。 Yun Tang 于2020年11月23日周一 下午5:11写道: > Hi > >

flink读mysql分库分表

2020-11-23 文章 酷酷的浑蛋
flink读mysql分库分表可以自动识别吗? 还是只能一个一个读?

Re: pyflink 1.11.1 调用包含第三方依赖库的udf时报错

2020-11-23 文章 Xingbo Huang
Hi, 你可以帖下taskmanager的日志吗,这个日志只能看到启动Python进程的时候挂掉了,其他信息看不到。 Best, Xingbo fengmuqi...@ruqimobility.com 于2020年11月23日周一 下午4:11写道: > hi. > pyflink 1.11.1 调用包含第三方依赖库的udf时报错 : > > 运行环境: > windows10 > python==3.7.9 > apache-flink==1.11.1 > apache-beam==2.19.0 > > udf 依赖第三方库: > h3==3.7.0 > > pytest

Re: JdbcUpsertTableSink输出的时候报‘Duplicate entry’问题

2020-11-23 文章 Jark Wu
你用的还是老的 connector 吧?也就是 'connector.type' = 'jdbc'。这个是根据 query 是否有更新来决定工作模式的。 如果你用新的 connector,也就是 'connector' = 'jdbc',这个的行为就是你说的第 (2) 种行为。 Best, Jark On Mon, 23 Nov 2020 at 17:14, 赵一旦 wrote: > duplicate情况可能update pv =values(pv), 也可能 update pv = pv + > values(pv),这个根据具体任务统计逻辑决定,所以直接交给用户去设置挺好的。

Re: JdbcUpsertTableSink输出的时候报‘Duplicate entry’问题

2020-11-23 文章 赵一旦
duplicate情况可能update pv =values(pv), 也可能 update pv = pv + values(pv),这个根据具体任务统计逻辑决定,所以直接交给用户去设置挺好的。 此外,当前jdbc connector中,对于source情况也支持自定义sql,我指参数connector.read.query' 。所以其实可以类似来个参数connector.write.sql',sql中通过?占位符指定字段啥的。 赵一旦 于2020年11月23日周一 下午5:09写道: > 总结下: > (1)group >

Re: Flink任务启动偶尔报错PartitionNotFoundException,会自动恢复。

2020-11-23 文章 Yun Tang
Hi 集群负载比较大的时候,下游一直收不到request的partition,就会导致PartitionNotFoundException,建议增大 taskmanager.network.request-backoff.max [1][2] 以增大重试次数 [1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#taskmanager-network-request-backoff-max [2]

Re: JdbcUpsertTableSink输出的时候报‘Duplicate entry’问题

2020-11-23 文章 赵一旦
总结下: (1)group by查询,输出为update流,配合mysqlSink可实现更新。(DDL中不需要定义update-mode,1.11中本身也不支持update-mode参数了)。 (2)group by+tumble window查询,输出为append流,配合mysqlSink不可实现更新。 如上,如果是case(2)想要实现更新方式输出到mysql怎么办呢?感觉Flink不应该基于流的特征决定怎么输出,毕竟涉及到外部存储,实际情况和具体业务逻辑有关,仅根据流的特征决定怎么输出不合适。

Re: JdbcUpsertTableSink输出的时候报‘Duplicate entry’问题

2020-11-23 文章 赵一旦
嗯。刚刚我做了个测试,简化,从kafka读入直接写mysql。 发现这种方式也不行,但是加了group by之后是可以的。 (1) 所以说是否还需要query带有key的语义才行呢? 比如group by的结果是可能update的,并且基于group by key也指出了key。 那么group by + tumble window情况下,输出貌似是append的,这种情况是否可以使用upsert方式输出到mysql呢? (2)如JarkWu所说,是mysql表的DDL部分决定。

flink sql 通过group by 滑窗计算的结果sink到kafka后有重复数据

2020-11-23 文章 dpzhoufengdev
flink sql 通过group by 滑窗计算的结果sink到kafka后有重复数据,每条数据都有两条完全一样的数据。这个是什么原因导致的? 聚合计算的逻辑 Table tableoneHour = tableEnv.sqlQuery( "select appname" + ",productCode" + ",link" + ",count(case when nodeName =

flink sql 通过group by 滑窗计算的结果sink到kafka后有重复数据

2020-11-23 文章 dpzhoufengdev
flink sql 通过group by 滑窗计算的结果sink到kafka后有重复数据,每条数据都有两条完全一样的数据。这个是什么原因导致的? 聚合计算的逻辑 Table tableoneHour = tableEnv.sqlQuery( "select appname" + ",productCode" + ",link" + ",count(case when nodeName =

Re: JdbcUpsertTableSink输出的时候报‘Duplicate entry’问题

2020-11-23 文章 Jark Wu
这个页面就是我上面说的 旧版connector,已经被废弃了,所以侧边栏没有导航。 新版的 jdbc sink,会根据 ddl 上是否有 PK 来决定是否工作在 upsert 模式。 Best, Jark On Mon, 23 Nov 2020 at 15:39, 赵一旦 wrote: > 看了下,1.11中这个页面也有,但好像没左侧导航入口,在sql create页面中有链接可以链到 > >

pyflink 1.11.1 调用包含第三方依赖库的udf时报错

2020-11-23 文章 fengmuqi...@ruqimobility.com
hi. pyflink 1.11.1 调用包含第三方依赖库的udf时报错 : 运行环境: windows10 python==3.7.9 apache-flink==1.11.1 apache-beam==2.19.0 udf 依赖第三方库: h3==3.7.0 pytest 通过。 运行时报错,报错信息如下 2020-11-23 14:20:51,656 WARN org.apache.flink.runtime.taskmanager.Task [] - Source: