找到原因了,数据问题,两个kafka source的earliest的数据timestamp差距比较大,导致在join时一直在堆积数据等待另一个队列的时间戳到达。调整offset让两个队列的时间戳一致后问题消失。
guoliubi...@foxmail.com 发件人: guoliubi...@foxmail.com 发送时间: 2020-12-16 07:36 收件人: user-zh 主题: temparol table join后无法sink Hi, 流程是从两个kafka队列中取数据,做完temparol table join后取滚动窗口做UDAF,然后sink,代码大概如下 joined_table = t_env.sql_query(""" SELECT o.exchangeCode_ as code, o.price, o.volume, o.eventTime FROM orders AS o INNER JOIN quotes FOR SYSTEM_TIME AS OF o.eventTime q ON o.exchangeCode_ = q.exchangeCode_ """) tumble_window = Tumble.over(expr.lit(500).millis) \ .on(expr.col("eventTime")) \ .alias("w") aggregate_table = joined_table.window(tumble_window) \ .group_by("w") \ .select("orderCalc(code, price, volume) as aggValue") \ .execute_insert("kafkaSink") 然后执行的时候数据都堆积在TemporalJoin环节,没法进入sink环节。执行图如下 https://ftp.bmp.ovh/imgs/2020/12/702ccb600bb01968.png 最后sink环节的bytes received一直是0,然后运行到最后就因为内存不足失败。 看了taskmanager的日志里面没有报错。 想问下这种问题应该从哪里进行排查,多谢。 guoliubi...@foxmail.com