Flink 1.16 流表 join 的 FilterPushDown 及并行

2023-07-03 Thread Chai Kelun
有一张 kafka 流表 logClient(id int, name string, price double),一张实现了 
SupportsFilterPushDown 的 customConnector 维表 product(id int, name string, value 
double),实现了自定义函数 MyUDF(double d1, double d2) 用于自定义逻辑计算并支持该算子的下推。
在 Stream-Table Join 的场景下,下列 SQL 并没有将算子进行下推,而是通过 TableScan 将所有算子提到 Join 
节点进行计算,请问是否有什么选项可以开启下推?(类似与 nestedloop-join,计算推到 product 表数据源进行)
SELECT A.id, A.name, B.name FROM logClient AS A, product AS B WHERE 
MyUDF(B.value, A.price) < xxx;
另外,Kafka 和 customConnector 均支持并行,在 Join 计算时期望使用 BROADCAST 模式,将 product 表在 
logClient 流表的每个 partition 上进行计算,但似乎目前 Flink 流-表 Join 的 distribution 模式仅支持 
SINGLETON 和 HASH[KEY](StreamExecExchange.java Line106 的 switch 
case),后续社区是否会考虑支持更多的 distributionType?

非常感谢!

退订

2023-07-03 Thread 周勃
退订

Re: PartitionNotFoundException循环重启

2023-07-03 Thread Shammon FY
Hi,

PartitionNotFoundException异常原因通常是下游task向上游task发送partition
request请求,但是上游task还没有部署成功。一般情况下,下游task会重试,超时后会报出异常。你可以查看下有没有其他的异常日志,查一下上游task为什么没有部署成功。

Best,
Shammon FY

On Tue, Jul 4, 2023 at 9:30 AM zhan...@eastcom-sw.com <
zhan...@eastcom-sw.com> wrote:

>
> 异常日志内容
>
> 2023-07-03 20:30:15,164 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Sink:
> Sink 3 (2/45)
> (79a20a2489a31465de9524eaf6b5ebf7_8fb6014c2df1d028b4c9ec6b86c8738f_
> 1_3093) switched from RUNNING to FAILED on 10.252.210.63:2359-420157 @
> nbiot-core-mpp-dcos-b-2.novalocal (dataPort=32769).
> org.apache.flink.runtime.io.network.partition.PartitionNotFoundException:
> Partition
> 65e701af2579c0381a2c3e53bd66fed0#24@79a20a2489a31465de9524eaf6b5ebf7_d952d2a6aebfb900c453884c57f96b82_24_
> 3093 not found.
> at 
> org.apache.flink.runtime.io.network.partition.ResultPartitionManager.createSubpartitionView(ResultPartitionManager.java:70)
> ~[flink-dist-1.17.1.jar:1.17.1]
> at 
> org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel.requestSubpartition(LocalInputChannel.java:136)
> ~[flink-dist-1.17.1.jar:1.17.1]
> at 
> org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel$1.run(LocalInputChannel.java:186)
> ~[flink-dist-1.17.1.jar:1.17.1]
> at java.util.TimerThread.mainLoop(Timer.java:555) ~[?:1.8.0_77]
> at java.util.TimerThread.run(Timer.java:505) ~[?:1.8.0_77]
>
>
>
> 发件人: zhan...@eastcom-sw.com
> 发送时间: 2023-07-04 09:25
> 收件人: user-zh
> 主题: PartitionNotFoundException循环重启
> hi,我这有两个流量比较大的job(一天3亿/6亿),在启动正常运行了5、6天左右就会出现
> PartitionNotFoundException 的异常,然后不断的循环重启
>
> 在flink-conf.yaml中添加以下参数后,也是同样在6天后会 循环报 PartitionNotFoundException
> 的异常后,不断的重启
> taskmanager.network.tcp-connection.enable-reuse-across-jobs: false
> taskmanager.network.max-num-tcp-connections: 16
>
> 当前版本 1.17.1,同样的job跟数据在1.14.4中一直没问题,请问这个有什么办法解决么?
>
>