Re: flink 1.10 sql kafka format json 定制schema时, 一个字段的数据可以定义为类似json object不?

2020-07-09 文章 Benchao Li
Hi Peihui, 正如Jark所说,FLINK-18002正是想解决这个问题,可以指定任意一个JsonNode为varchar类型。 当然,这个feature不能解决所有问题,比如你有一个字段,内容不太确定,而且也不需要额外处理, 主要是想保留这个字段,下游输出json的时候仍然还是这个字段。 如果用FLINK-18002的思路,输出到下游的时候,会把这部分数据整体作为一个json string,所以 从结果上来看,*还不能完全做到原封不动的输出到下游*。 不知道后面这个场景是不是你面对的场景。如果是的话,我们目前有两个思路解决这个问题: 1. 用RAW类型,这个需要json

回复:flink 1.11 on kubernetes 构建失败

2020-07-09 文章 SmileSmile
hi Yang 在1.10版本,running的作业点击拓普图中随便一个operation,有detail subtasks taskmanagers xxx x 这行,taskmanagers这栏里的host,显示的是 podname:端口 在1.11变成ip:端口 目前我这边遇到的情况是,构建了一个有120slot的集群,作业并行度是120。 提交到jm后jm就失联了,jm timeout。观察jm日志,疯狂在刷 No hostname could be resolved for the IP address 10.35.160.5, using IP address

Re: pyflink udf中发送rest api会导致udf被调用两次

2020-07-09 文章 Dian Fu
好的,针对你这个case,这个是个已知问题:https://issues.apache.org/jira/browse/FLINK-15973 ,暂时还没有修复。 你可以这样改写一下,应该可以绕过去这个问题: table = st_env.scan("source") \ .where("action === 'Insert'") \

Re: Flink DataStream 统计UV问题

2020-07-09 文章 tison
你这个需求貌似是要看一天的 UV 的实时更新量,可以看一下 sliding window。如果是每天 0 点清零,实时看今天的 UV,那就是另一个问题了,应该需要自己定义 trigger & evictor 每条触发一次 window...看你数据量吧 Best, tison. shizk233 于2020年7月10日周五 上午10:23写道: > Hi Jiazhi, > > >

Re: flink 1.10 sql kafka format json 定制schema时, 一个字段的数据可以定义为类似json object不?

2020-07-09 文章 Jark Wu
社区有个 issue 正在解决这个问题,可以关注一下 https://issues.apache.org/jira/browse/FLINK-18002 Best, Jark On Fri, 10 Jul 2020 at 11:13, Leonard Xu wrote: > Hi, Peihui > > 我理解你的需求是json中有一些复杂的字段,你不想解析,希望后续用UDTF在来解析,这个应该做不到的,现在的json format 的解析的底层实现 > 就是按照json的标准格式解析(jackson)的,没法将一个 >

Re:Re: flink 写入es失败导致数据丢失

2020-07-09 文章 sunfulin
hi, 感谢两位的回复。我先来本地复现下。如果有问题的话,会建个issue。 在 2020-07-10 11:43:33,"Congxian Qiu" 写道: >Hi > >从官方文档的配置[1] 来看,对于 handle failure 来说,默认是 fail,也就是说 request 失败了会导致作业失败的,可以尝试在 >log 中看能否找到这个日志,或者显示的设置成 fail 看看。如果发现 handle failure 是 fail 的情况下不符合预期,可以想 >Leonard 说的那样建立一个 issue 来追踪这个问题 > >[1]

Re: flink 1.11 on kubernetes 构建失败

2020-07-09 文章 Yang Wang
我记得1.11里面对host这个地方应该是没有改动,taskmanager.network.bind-policy的 默认值一会都是ip。所以你说的UI上是podname,这个是哪里的?正常TM列表akka地址 都是ip地址的 Best, Yang SmileSmile 于2020年7月10日周五 上午10:42写道: > hi yang wang > > 1.11版本的on kubernetes在hostname上有做什么变化吗? > > 作业运行的时候 flink ui上 tm变成ip:端口 > ,在1.10版本,ui上是 podname:端口。 > >

Re: Re: flink1.10.1在yarn上无法写入kafka的问题

2020-07-09 文章 m...@sinoiov.com
hi:Congxian qiu: topic没问题,用kafka指令创建,其他应用也能写入,我换一个kafka集群也不行 马琪 研发中心 北京中交兴路车联网科技有限公司 T. 010-50822710 M. 13701177502 F. 010-50822899 E. m...@sinoiov.com 地址:北京市海淀区东北旺西路8号中关村软件园27号院千方科技大厦A座(100085) 发件人: Congxian Qiu 发送时间: 2020-07-10 10:20 收件人: user-zh 主题: Re: Re:

Re: flink 写入es失败导致数据丢失

2020-07-09 文章 Congxian Qiu
Hi 从官方文档的配置[1] 来看,对于 handle failure 来说,默认是 fail,也就是说 request 失败了会导致作业失败的,可以尝试在 log 中看能否找到这个日志,或者显示的设置成 fail 看看。如果发现 handle failure 是 fail 的情况下不符合预期,可以想 Leonard 说的那样建立一个 issue 来追踪这个问题 [1]

Re: flink 写入es失败导致数据丢失

2020-07-09 文章 Leonard Xu
Hi, 我理解作业应该失败才对,你本地可以复现吗?可以复现的话可以在社区建个issue。 Best, Leonard Xu > 在 2020年7月10日,11:20,sunfulin 写道: > > ,但是作业确实没失败。

Re: flink1.10升级到flink1.11 提交到yarn失败

2020-07-09 文章 Leonard Xu
Hello,Zach >>> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: >>> Could not find a suitable table factory for >>> 'org.apache.flink.table.factories.TableSourceFactory' in >>> the classpath. >>> >>> >>> Reason: Required context properties mismatch. 这个错误,一般是SQL 程序缺少了SQL

Re:Re: flink 写入es失败导致数据丢失

2020-07-09 文章 sunfulin
hi,Leonard 是的。es集群服务不可用。我能观察到写入es失败,但是作业确实没失败。等到es集群服务恢复后,作业也正常了,但是故障期间的数据有丢失。 在 2020-07-10 11:16:17,"Leonard Xu" 写道: >Hello, fulin > >> es服务挂掉时,这段时间写入es失败,但是没有积压,而是数据丢失了。这个应该不符合预期。想问下可能是啥原因造成的。 > >es 服务挂掉是指es 集群不可用吗?那这时应该是写入es应该失败,作业也会失败,你说的没有积压是指什么呢? > >Best >Leonard Xu

Re: flink 写入es失败导致数据丢失

2020-07-09 文章 Leonard Xu
Hello, fulin > es服务挂掉时,这段时间写入es失败,但是没有积压,而是数据丢失了。这个应该不符合预期。想问下可能是啥原因造成的。 es 服务挂掉是指es 集群不可用吗?那这时应该是写入es应该失败,作业也会失败,你说的没有积压是指什么呢? Best Leonard Xu

Re: flink 1.10 sql kafka format json 定制schema时, 一个字段的数据可以定义为类似json object不?

2020-07-09 文章 Leonard Xu
Hi, Peihui 我理解你的需求是json中有一些复杂的字段,你不想解析,希望后续用UDTF在来解析,这个应该做不到的,现在的json format 的解析的底层实现 就是按照json的标准格式解析(jackson)的,没法将一个 jsonObject解析成一个String。另外如果你jsonObject中的内容格式不确定,也不适合在Schema中声明, 因为SQL 是预编译后执行的,不能做到schema里是三个field,执行时又能解析四个field。 一种做法是定义复杂的jsonObject对应的ROW

Re: Re: flink1.10升级到flink1.11 提交到yarn失败

2020-07-09 文章 Shuiqiang Chen
Hi, 看样子是kafka table source没有成功创建,也许你需要将 org.apache.flink flink-sql-connector-kafka_2.11 ${flink.version} 这个jar 放到 FLINK_HOME/lib 目录下 Congxian Qiu 于2020年7月10日周五 上午10:57写道: > Hi > > 从异常看,可能是某个包没有引入导致的,和这个[1]比较像,可能你需要对比一下需要的是哪个包没有引入。 > > PS 从栈那里看到是 csv

Re:Re: flink 写入es失败导致数据丢失

2020-07-09 文章 sunfulin
hi 我使用社区默认的ES,主要配置如下:我使用flink 1.10.1,blink-planner。使用了ES6的sink。 我看了下文档,默认有个参数是 connector.failure-handler,是fail。我也能在TM日志里看到连接es失败的报错,但是整个任务checkpoint并没有失败。数据丢了。 WITH ( 'connector.type' = 'elasticsearch', 'connector.version' = '', 'connector.hosts' = '', 'connector.index' =

Re: flink 写入es失败导致数据丢失

2020-07-09 文章 Congxian Qiu
Hi 你 ES Sink 是自己写的,还是用的社区的呢?社区的使用了哪个版本,以及配置是啥样的呢 Best, Congxian sunfulin 于2020年7月10日周五 上午10:51写道: > hi, > > 我们现在使用flink消费kafka数据写到es,今天发现在默认设置下,es服务挂掉时,这段时间写入es失败,但是没有积压,而是数据丢失了。这个应该不符合预期。想问下可能是啥原因造成的。

Re: Re: flink1.10升级到flink1.11 提交到yarn失败

2020-07-09 文章 Congxian Qiu
Hi 从异常看,可能是某个包没有引入导致的,和这个[1]比较像,可能你需要对比一下需要的是哪个包没有引入。 PS 从栈那里看到是 csv 相关的,可以优先考虑下 cvs 相关的包 ``` The following factories have been considered: org.apache.flink.table.sources.CsvBatchTableSourceFactory org.apache.flink.table.sources.CsvAppendTableSourceFactory

flink 写入es失败导致数据丢失

2020-07-09 文章 sunfulin
hi, 我们现在使用flink消费kafka数据写到es,今天发现在默认设置下,es服务挂掉时,这段时间写入es失败,但是没有积压,而是数据丢失了。这个应该不符合预期。想问下可能是啥原因造成的。

Re:Re: flink1.10升级到flink1.11 提交到yarn失败

2020-07-09 文章 Zhou Zach
日志贴全了的,这是从yarn ui贴的full log,用yarn logs命令也是这些log,太简短,看不出错误在哪。。。 我又提交了另外之前用flink1.10跑过的任务,现在用flink1.11跑,报了异常: SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/opt/flink-1.11.0/lib/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J:

Re: flink1.10升级到flink1.11 提交到yarn失败

2020-07-09 文章 Congxian Qiu
Hi 这个看上去是提交到 Yarn 了,具体的原因需要看下 JM log 是啥原因。另外是否是日志没有贴全,这里只看到本地 log,其他的就只有小部分 jobmanager.err 的 log。 Best, Congxian Zhou Zach 于2020年7月9日周四 下午9:23写道: > hi all, > 原来用1.10使用per job模式,可以提交的作业,现在用1.11使用应用模式提交失败,看日志,也不清楚原因, > yarn log: > Log Type: jobmanager.err > > > Log Upload Time: Thu Jul 09

Re: Flink DataStream 统计UV问题

2020-07-09 文章 shizk233
Hi Jiazhi, 1.如果数据流量不是很大的话,按每条数据触发也没问题。另外,基于事件时间的情况,提前触发可以选择ContinuousEventTimeTrigger,可以查看Trigger接口的实现找到你想要的trigger。 2.窗口结束后会自动释放。一般对于Global窗口需要手动设置TTL Best, shizk233 ゞ野蠻遊戲χ 于2020年7月7日周二 下午10:27写道: > 大家好! > > 想问下,现在在用DataStream的api来统计每天的UV,代码如下,有2个使用问题: >

Re: Re: flink1.10.1在yarn上无法写入kafka的问题

2020-07-09 文章 Congxian Qiu
Hi 从 org.apache.kafka.common.errors.InvalidTopicException: 这个异常来看,是 topic invalid 导致,具体的可以看一下 InvalidTopicException 的介绍[1], 这上面说的有可能是 名字太长,或者有非法字符等,这也可以查看一下 [1] https://www.javadoc.io/doc/org.apache.kafka/kafka-clients/2.0.0/org/apache/kafka/common/errors/InvalidTopicException.html Best, Congxian

Re: Re: flink1.10.1在yarn上无法写入kafka的问题

2020-07-09 文章 m...@sinoiov.com
hi ,LakeShen 对,测试环境包括yarn集群和kafka集群,他们想联通的 配置的是测试环境的kafka broker的地址 road.kafka.brokers=172.17.47.134:9092,172.17.47.135:9092,172.17.47.136:9092 road.kafka.topic=road-map road.kafka.group.id=ins-001 road.kafka.transaction.timeout.ms=30 马琪 研发中心 北京中交兴路车联网科技有限公司 T. 010-50822710

Re: flink时间窗口

2020-07-09 文章 Congxian Qiu
Hi 对于这个问题,可以尝试看添加相关日志能否在线上(或者测试环境)排查,另外可以使用 watermark 相关的 metric[1] 查看下是否符合预期 如果上面的不行,可以尝试看能否在 IDE 中进行复现,这样可以 debug 进行追查 [1] https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#io Best, Congxian 忝忝向仧 <153488...@qq.com> 于2020年7月9日周四 下午11:36写道: > new

Re: Re: flink1.10.1在yarn上无法写入kafka的问题

2020-07-09 文章 LakeShen
你的 Yarn 环境,Flink 任务使用的 Kafka 地址,应该是 Yarn 环境的 kafka broker 地址。 LakeShen 于2020年7月10日周五 上午10:08写道: > Hi, > > 从日志看出,应该是你提交到 Yarn 的环境,这个环境和你的测试环境的 kafka 连接不上,获取不到元数据。 > > 这里你检查一下你的 Yarn 环境,Flink kafka broker 地址是否是测试环境的 kafka broker 地址。 > > Best, > LakeShen > > m...@sinoiov.com 于2020年7月9日周四

Re: pyflink udf中发送rest api会导致udf被调用两次

2020-07-09 文章 lgs
谢谢提示。 我打印出来explain,发现确实调用了两次udf,条件是那个eventtime.isNotNull: st_env.scan("source") \ .where("action === 'Insert'") \ .window(Tumble.over("1.hour").on("actionTime").alias("hourlywindow")) \ .group_by("hourlywindow") \ .select("action.max as action1,

Re: Re: flink1.10.1在yarn上无法写入kafka的问题

2020-07-09 文章 LakeShen
Hi, 从日志看出,应该是你提交到 Yarn 的环境,这个环境和你的测试环境的 kafka 连接不上,获取不到元数据。 这里你检查一下你的 Yarn 环境,Flink kafka broker 地址是否是测试环境的 kafka broker 地址。 Best, LakeShen m...@sinoiov.com 于2020年7月9日周四 下午9:21写道: > hi:zhisheng: > > 这是TM日志,在这之前没有任何错误日志, > > 代码逻辑很简单: > SingleOutputStreamOperator> > sourceStream =

Re: flink 1.10 sql kafka format json 定制schema时, 一个字段的数据可以定义为类似json object不?

2020-07-09 文章 LakeShen
Hi Peihui, 如果消费的 Kafka json 中,json 比较复杂的话,比如存在嵌套,就像下面的格式: { "a":"b", "c":{ "d":"e", "g":"f" } }, 那么在 kafka table source 可以使用 row 来定义: create table xxx ( a varchar, c row ) 如果 还存在嵌套,可以继续再使用 Row 来定义。 Best, LakeShen Peihui He 于2020年7月10日周五 上午9:12写道: > Hello: > >

Re: pyflink1.11.0window

2020-07-09 文章 Shuiqiang Chen
琴师你好, 你的source ddl里有指定time1为 time attribute吗? create table source1( id int, time1 timestamp, type string, WATERMARK FOR time1 as time1 - INTERVAL '2' SECOND ) with (...) 奇怪的不朽琴师 <1129656...@qq.com> 于2020年7月10日周五 上午8:43写道: >

flink 1.10 sql kafka format json 定制schema时, 一个字段的数据可以定义为类似json object不?

2020-07-09 文章 Peihui He
Hello: 在用flink sql从kafka消费数据时,有些json比较复杂,想直接定义为object,在后续通过udf转为string。 有什么办法不?我在建表的时候有提示过json_object 类型,但是用了又提示其他类型。 Best wishes.

flink 1.11 使用sql写入hdfs无法自动提交分区

2020-07-09 文章 Jun Zhang
大家好: 我在用flink 1.11 的sql从kafka消费然后写入hdfs的过程中,发现没法自动提交分区,请问这个是什么原因呢?谢谢 我的checkpoint设置了间隔10s,对于如下的配置,正常应该是每隔10在hdfs相应的分区下会有_SUCCESS文件,但是实际上过了好久也没有,ORC格式的结果数据是正常写入了。 public static void main(String[] args) throws Exception{ StreamExecutionEnvironment bsEnv =

??????pyflink1.11.0window

2020-07-09 文章 ??????????????
---- ??: "??" <1129656...@qq.com;

??????flink????????

2020-07-09 文章 ????????
new ProcessWindowFunction? ??? ---- ??: "user-zh"

flink1.10升级到flink1.11 提交到yarn失败

2020-07-09 文章 Zhou Zach
hi all, 原来用1.10使用per job模式,可以提交的作业,现在用1.11使用应用模式提交失败,看日志,也不清楚原因, yarn log: Log Type: jobmanager.err Log Upload Time: Thu Jul 09 21:02:48 +0800 2020 Log Length: 785 SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in

Re: Re: flink1.10.1在yarn上无法写入kafka的问题

2020-07-09 文章 m...@sinoiov.com
hi:zhisheng: 这是TM日志,在这之前没有任何错误日志, 代码逻辑很简单: SingleOutputStreamOperator> sourceStream = env.addSource(source) .setParallelism(2) .uid("DataProcessSource") .flatMap(new DataConvertFunction()) .setParallelism(2) .uid("DataProcessDataCovert")

Re: flink1.10.1在yarn上无法写入kafka的问题

2020-07-09 文章 zhisheng
hi,maqi 有完整的日志吗?在这个异常之前还有其他的异常信息吗?如果有,可以提供一下! Best, zhisheng m...@sinoiov.com 于2020年7月9日周四 下午7:57写道: > > 请教各位: > flink任务在本机写入测试环境kafka集群没问题, > > 但是上传到yarn环境,就是写不进去,其他job运行在yarn可以写入测试环境的kafka > > 异常信息如下: > > 2020-07-09 19:17:33,126 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph

Re: pyflink udf中发送rest api会导致udf被调用两次

2020-07-09 文章 Dian Fu
Table API的作业在执行之前会经过一系列的rule优化,最终的执行计划,存在一个UDF调用多次的可能,你可以把执行计划打印出来看看(TableEnvironment#explain)。 具体原因,需要看一下作业逻辑。可以发一下你的作业吗?可重现代码即可。 > 在 2020年7月9日,下午5:50,lgs <9925...@qq.com> 写道: > > Hi, > > 我观察到一个现象:我定义了一个tumble window,调用一个python udf,在这个udf里面使用requests发送rest api。 >

pyflink udf中发送rest api会导致udf被调用两次

2020-07-09 文章 lgs
Hi, 我观察到一个现象:我定义了一个tumble window,调用一个python udf,在这个udf里面使用requests发送rest api。 log显示这个udf会被调用两次。相隔不到一秒。这个是什么原因?requests库跟beam冲突了? 2020-07-09 17:44:17,501 INFO flink_test_stream_time_kafka.py:22 [] - start to ad 2020-07-09 17:44:17,530 INFO

Re:flink 双流join报错,java.lang.AssertionError

2020-07-09 文章 sunfulin
hi, 我切到最新的1.11 release版本,跑同样的sql,没有抛出异常。想问下这有相关的issue么?想确认下原因。 在 2020-07-09 16:53:34,"sunfulin" 写道: >hi, >我使用flink 1.10.1 >blink-planner。运行以下SQL时,抛出异常。其中A和B分别是两个Kafka消息流。任务使用processtime。如果我把join的B表的select > 具体字段名 修改为 select *,貌似就可以执行。但是拿到的B表字段顺序貌似是错乱的。请问这个问题是bug么? > > >select

flink 提交 offset 到 kafka

2020-07-09 文章 liangji
flink:1.6.2(部分集群未升级。) kafka:0.11 作业从kafka中消费消息,并运行在yarn上,提供的作业未配置checkpoint,autoCommit设置为true。 作业刚启动时通过kafka-console-consumer.sh可以正常观察到提交的offset,大概50分钟左右,通过kafka-console-consumer.sh就看不到相应的offset信息了(期间没有新消息),请问下flink是有什么机制吗?另外在flink web ui中看到的committed-offset metric一直显示的是

flink 双流join报错,java.lang.AssertionError

2020-07-09 文章 sunfulin
hi, 我使用flink 1.10.1 blink-planner。运行以下SQL时,抛出异常。其中A和B分别是两个Kafka消息流。任务使用processtime。如果我把join的B表的select 具体字段名 修改为 select *,貌似就可以执行。但是拿到的B表字段顺序貌似是错乱的。请问这个问题是bug么? select A.recvTime, A.khh, A.live_id, A.fund_code as product_code, A.fund_name as product_name, cast(B.balance as double) as balance

回复:ddl es 报错

2020-07-09 文章 Evan
Hello, 这个报错,在flink 1.11 最新版本我也遇见了,跟你同样的操作 真正原因是这个ddl 是flink 的sink table,是数据写入端,不能打印数据。 而tableEnv.toRetractStream(table, Row.class).print(); 这个打印的数据方法只适合flink 的Source Table,也就是数据输入端,比如kafka table就可以正常使用。 2020年7月9日15:31:56 --原始邮件-- 发件人:"出发"<573693...@qq.com;

Re: kafka connector问题

2020-07-09 文章 Benchao Li
首先,从checkpoint/savepoint 恢复的话,一定会以checkpoint/savepoint中的offset为准,所以它的优先级是最高的, 不管你配置哪种startup mode。 如果你没有开启checkpoint,那么如果你用了group-offsets,那它就会从保存在kafka中的offset进行启动。 提交offset到kafka这个应该是默认就开了的。 op <520075...@qq.com> 于2020年7月9日周四 上午11:25写道: > 官网给的kafka table配置里的scan.startup.modeCREATE TABLE

??????DataStream????uv????

2020-07-09 文章 Yichao Yang
Hi, ??uv??uv[1]?? [1]https://lists.apache.org/thread.html/rbe00ee38e2d07310d4e3c796de86c65205d1f5deecfc1678d9ebbdea%40%3Cuser-zh.flink.apache.org%3E

Re: 回复:flink时间窗口

2020-07-09 文章 Congxian Qiu
对于 window 来说,你需要判断下是没有数据进来,还是有数据进来但是 window 没有触发。 如果是数据没有进来,那么需要看 window 节点之前的逻辑,如果是数据进来了,但是没有触发,需要看下 wateramrk 是不是符合预期 Best, Congxian 爱吃鱼 于2020年7月9日周四 下午1:42写道: > > > > Hi, > > > > > > > 因为业务原因具体的keyby字段没有写清楚,我是根据warningPojo类里面的字段进行排序,源数据 > 是从kafka实时流传输过来的,每一分钟滑动窗口计算一次 > > >