Flink 1.16 流表 join 的 FilterPushDown 及并行
有一张 kafka 流表 logClient(id int, name string, price double),一张实现了 SupportsFilterPushDown 的 customConnector 维表 product(id int, name string, value double),实现了自定义函数 MyUDF(double d1, double d2) 用于自定义逻辑计算并支持该算子的下推。 在 Stream-Table Join 的场景下,下列 SQL 并没有将算子进行下推,而是通过 TableScan 将所有算子提到 Join 节点进行计算,请问是否有什么选项可以开启下推?(类似与 nestedloop-join,计算推到 product 表数据源进行) SELECT A.id, A.name, B.name FROM logClient AS A, product AS B WHERE MyUDF(B.value, A.price) < xxx; 另外,Kafka 和 customConnector 均支持并行,在 Join 计算时期望使用 BROADCAST 模式,将 product 表在 logClient 流表的每个 partition 上进行计算,但似乎目前 Flink 流-表 Join 的 distribution 模式仅支持 SINGLETON 和 HASH[KEY](StreamExecExchange.java Line106 的 switch case),后续社区是否会考虑支持更多的 distributionType? 非常感谢!
退订
退订
Re: PartitionNotFoundException循环重启
Hi, PartitionNotFoundException异常原因通常是下游task向上游task发送partition request请求,但是上游task还没有部署成功。一般情况下,下游task会重试,超时后会报出异常。你可以查看下有没有其他的异常日志,查一下上游task为什么没有部署成功。 Best, Shammon FY On Tue, Jul 4, 2023 at 9:30 AM zhan...@eastcom-sw.com < zhan...@eastcom-sw.com> wrote: > > 异常日志内容 > > 2023-07-03 20:30:15,164 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Sink: > Sink 3 (2/45) > (79a20a2489a31465de9524eaf6b5ebf7_8fb6014c2df1d028b4c9ec6b86c8738f_ > 1_3093) switched from RUNNING to FAILED on 10.252.210.63:2359-420157 @ > nbiot-core-mpp-dcos-b-2.novalocal (dataPort=32769). > org.apache.flink.runtime.io.network.partition.PartitionNotFoundException: > Partition > 65e701af2579c0381a2c3e53bd66fed0#24@79a20a2489a31465de9524eaf6b5ebf7_d952d2a6aebfb900c453884c57f96b82_24_ > 3093 not found. > at > org.apache.flink.runtime.io.network.partition.ResultPartitionManager.createSubpartitionView(ResultPartitionManager.java:70) > ~[flink-dist-1.17.1.jar:1.17.1] > at > org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel.requestSubpartition(LocalInputChannel.java:136) > ~[flink-dist-1.17.1.jar:1.17.1] > at > org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel$1.run(LocalInputChannel.java:186) > ~[flink-dist-1.17.1.jar:1.17.1] > at java.util.TimerThread.mainLoop(Timer.java:555) ~[?:1.8.0_77] > at java.util.TimerThread.run(Timer.java:505) ~[?:1.8.0_77] > > > > 发件人: zhan...@eastcom-sw.com > 发送时间: 2023-07-04 09:25 > 收件人: user-zh > 主题: PartitionNotFoundException循环重启 > hi,我这有两个流量比较大的job(一天3亿/6亿),在启动正常运行了5、6天左右就会出现 > PartitionNotFoundException 的异常,然后不断的循环重启 > > 在flink-conf.yaml中添加以下参数后,也是同样在6天后会 循环报 PartitionNotFoundException > 的异常后,不断的重启 > taskmanager.network.tcp-connection.enable-reuse-across-jobs: false > taskmanager.network.max-num-tcp-connections: 16 > > 当前版本 1.17.1,同样的job跟数据在1.14.4中一直没问题,请问这个有什么办法解决么? > >