Re: Re: PartitionNotFoundException循环重启
Hi, 如果要增加request partition的重试时间,可以调整配置项`taskmanager.network.request-backoff.max`,默认是10秒,具体配置可以参阅[1] [1] https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#full-taskmanageroptions Best, Shammon FY On Tue, Jul 4, 2023 at 11:38 AM zhan...@eastcom-sw.com < zhan...@eastcom-sw.com> wrote: > 从前面日志看是重启后从hdfs加载checkpoint数据处理(100M左右)这过程好像有点久,还有连kafka消费 > 下游的超时重试 可以设置次数或者时长吗? > > 发件人: Shammon FY > 发送时间: 2023-07-04 10:12 > 收件人: user-zh > 主题: Re: PartitionNotFoundException循环重启 > Hi, > > PartitionNotFoundException异常原因通常是下游task向上游task发送partition > > request请求,但是上游task还没有部署成功。一般情况下,下游task会重试,超时后会报出异常。你可以查看下有没有其他的异常日志,查一下上游task为什么没有部署成功。 > > Best, > Shammon FY > > On Tue, Jul 4, 2023 at 9:30 AM zhan...@eastcom-sw.com < > zhan...@eastcom-sw.com> wrote: > > > > > 异常日志内容 > > > > 2023-07-03 20:30:15,164 INFO > > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Sink: > > Sink 3 (2/45) > > (79a20a2489a31465de9524eaf6b5ebf7_8fb6014c2df1d028b4c9ec6b86c8738f_ > > 1_3093) switched from RUNNING to FAILED on 10.252.210.63:2359-420157 @ > > nbiot-core-mpp-dcos-b-2.novalocal (dataPort=32769). > > org.apache.flink.runtime.io > .network.partition.PartitionNotFoundException: > > Partition > > > 65e701af2579c0381a2c3e53bd66fed0#24@79a20a2489a31465de9524eaf6b5ebf7_d952d2a6aebfb900c453884c57f96b82_24_ > > 3093 not found. > > at org.apache.flink.runtime.io > .network.partition.ResultPartitionManager.createSubpartitionView(ResultPartitionManager.java:70) > > ~[flink-dist-1.17.1.jar:1.17.1] > > at org.apache.flink.runtime.io > .network.partition.consumer.LocalInputChannel.requestSubpartition(LocalInputChannel.java:136) > > ~[flink-dist-1.17.1.jar:1.17.1] > > at org.apache.flink.runtime.io > .network.partition.consumer.LocalInputChannel$1.run(LocalInputChannel.java:186) > > ~[flink-dist-1.17.1.jar:1.17.1] > > at java.util.TimerThread.mainLoop(Timer.java:555) ~[?:1.8.0_77] > > at java.util.TimerThread.run(Timer.java:505) ~[?:1.8.0_77] > > > > > > > > 发件人: zhan...@eastcom-sw.com > > 发送时间: 2023-07-04 09:25 > > 收件人: user-zh > > 主题: PartitionNotFoundException循环重启 > > hi,我这有两个流量比较大的job(一天3亿/6亿),在启动正常运行了5、6天左右就会出现 > > PartitionNotFoundException 的异常,然后不断的循环重启 > > > > 在flink-conf.yaml中添加以下参数后,也是同样在6天后会 循环报 PartitionNotFoundException > > 的异常后,不断的重启 > > taskmanager.network.tcp-connection.enable-reuse-across-jobs: false > > taskmanager.network.max-num-tcp-connections: 16 > > > > 当前版本 1.17.1,同样的job跟数据在1.14.4中一直没问题,请问这个有什么办法解决么? > > > > >
Re: flink on native k8s里如何使用flink sql gateway
Hi, 我们的做法是启动Flink集群后,在其他节点(pod或者独立启动)启动Sql-Gateway,通过Flink的地址远程连接Flink集群,这样Sql-Gateway的部署和Flink集群完全分开 Best, Shammon FY On Tue, Jul 4, 2023 at 10:52 AM chaojianok wrote: > 大家好,请教个问题。 > > 用native kubernetes方式在k8s集群上部署好了flink,现在需要在这个flink集群里使用flink sql > gateway,大家有什么好的方案吗? > 目前的做法是,进入pod里启动sql gateway,然后在k8s创建flink-sql-gateway > service,这样就可以通过这个service来访问sql > gateway了,但是这个方法有个问题,部署过程中必需进入pod启服务,这是不利于自动化部署的,具体的操作命令如下,大家帮忙看看有没有好的解决方案来避免这个问题。 > > 1、创建flink集群 > ./bin/kubernetes-session.sh \ > -Dkubernetes.cluster-id=flink-cluster \ > -Dkubernetes.namespace=flink \ > -Dkubernetes.service-account=flink-service-account \ > -Dkubernetes.rest-service.exposed.type=NodePort > > 2、进入pod通过 ./bin/sql-gateway.sh start > -Dsql-gateway.endpoint.rest.address=localhost 启动sql gateway服务,退出pod > > 3、创建flink-sql-gateway service > kubectl expose deployment flink-cluster --type=NodePort --port=8083 > --name=flink-sql-gateway -n flink >
Re: Flink 1.16 流表 join 的 FilterPushDown 及并行
Hi, Chai Kelun, 你的 filter condition 里面包含了你自定义的 UDF,是不满足 filter push down 的条件的,因为对于优化器来说 UDF 是不确定的,优化器不能从里面提取到可以下推的条件, 如果你想实现下推,可以尝试抽取下确定性的 condition,如 product.id > 10 etc.。另外,Flink 是支持 broadcast hash join 的,如果你想控制某两个表的 join type,你可以通过 join hint 来指定 join 类型为 broadcast。() Chai Kelun 于2023年7月3日周一 17:58写道: > 有一张 kafka 流表 logClient(id int, name string, price double),一张实现了 > SupportsFilterPushDown 的 customConnector 维表 product(id int, name string, > value double),实现了自定义函数 MyUDF(double d1, double d2) 用于自定义逻辑计算并支持该算子的下推。 > 在 Stream-Table Join 的场景下,下列 SQL 并没有将算子进行下推,而是通过 TableScan 将所有算子提到 Join > 节点进行计算,请问是否有什么选项可以开启下推?(类似与 nestedloop-join,计算推到 product 表数据源进行) > SELECT A.id, A.name, B.name FROM logClient AS A, product AS B WHERE > MyUDF(B.value, A.price) < xxx; > 另外,Kafka 和 customConnector 均支持并行,在 Join 计算时期望使用 BROADCAST 模式,将 product 表在 > logClient 流表的每个 partition 上进行计算,但似乎目前 Flink 流-表 Join 的 distribution 模式仅支持 > SINGLETON 和 HASH[KEY](StreamExecExchange.java Line106 的 switch > case),后续社区是否会考虑支持更多的 distributionType? > > 非常感谢!
Re: flink1.17.1使用kafka source异常
Hi, aiden. 看起来是类冲突,按照官方的文档,使用 kafka 时,你应该是不需要引入 flink-core 和 flink-connector-base 的( https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/table/kafka/)。如果是因为其他原因需要使用这两个 jar, 你可以使用 mvn dependency::tree 查看一下 "org/apache/kafka/clients/consumer/ConsumerRecord" 是在哪里被重复加载进来,可以exclude 掉非 flink-connector-kafka 的这个类。 aiden <18765295...@163.com> 于2023年7月4日周二 14:23写道: > hi > > 在使用1.17.1版本kafka source时遇到如下异常: > Caused by: java.lang.LinkageError: loader constraint violation: loader > (instance of org/apache/flink/util/ChildFirstClassLoader) previously > initiated loading for a different type with name > "org/apache/kafka/clients/consumer/ConsumerRecord" > at java.lang.ClassLoader.defineClass1(Native Method) > at java.lang.ClassLoader.defineClass(ClassLoader.java:763) > at > java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) > at java.net.URLClassLoader.defineClass(URLClassLoader.java:467) > at java.net.URLClassLoader.access$100(URLClassLoader.java:73) > at java.net.URLClassLoader$1.run(URLClassLoader.java:368) > at java.net.URLClassLoader$1.run(URLClassLoader.java:362) > at java.security.AccessController.doPrivileged(Native Method) > at java.net.URLClassLoader.findClass(URLClassLoader.java:361) > at > org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:71) > at > org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:51) > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > at java.lang.Class.getDeclaredMethods0(Native Method) > at java.lang.Class.privateGetDeclaredMethods(Class.java:2701) > at java.lang.Class.getDeclaredMethod(Class.java:2128) > at > java.io.ObjectStreamClass.getPrivateMethod(ObjectStreamClass.java:1629) > at java.io.ObjectStreamClass.access$1700(ObjectStreamClass.java:79) > at java.io.ObjectStreamClass$3.run(ObjectStreamClass.java:520) > at java.io.ObjectStreamClass$3.run(ObjectStreamClass.java:494) > at java.security.AccessController.doPrivileged(Native Method) > at java.io.ObjectStreamClass.(ObjectStreamClass.java:494) > at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:391) > at > java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:681) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1885) > at > java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2042) > at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) > at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) > at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:534) > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:522) > at > org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:67) > at > org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:471) > at > org.apache.flink.runtime.executiongraph.ExecutionJobVertex.createOperatorCoordinatorHolder(ExecutionJobVertex.java:286) > at > org.apache.flink.runtime.executiongraph.ExecutionJobVertex.initialize(ExecutionJobVertex.java:223) > ... 20 more > 以下是我的部分POM > org.apache.flink > flink-core > 1.17.1 > > > org.apache.flink > flink-connector-kafka > 1.17.1 > > > org.apache.flink > flink-connector-base > 1.17.1 > > > > 看起来像是类加载器异常,需要我修改哪些地方吗 >