Re: flink1.17.1使用kafka source异常

2023-07-05 文章 yh z
Hi, aiden. 看起来是类冲突,按照官方的文档,使用 kafka 时,你应该是不需要引入 flink-core 和
flink-connector-base 的(
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/table/kafka/)。如果是因为其他原因需要使用这两个
jar, 你可以使用 mvn dependency::tree 查看一下
"org/apache/kafka/clients/consumer/ConsumerRecord" 是在哪里被重复加载进来,可以exclude 掉非
flink-connector-kafka 的这个类。

aiden <18765295...@163.com> 于2023年7月4日周二 14:23写道:

> hi
>
> 在使用1.17.1版本kafka source时遇到如下异常:
> Caused by: java.lang.LinkageError: loader constraint violation: loader
> (instance of org/apache/flink/util/ChildFirstClassLoader) previously
> initiated loading for a different type with name
> "org/apache/kafka/clients/consumer/ConsumerRecord"
> at java.lang.ClassLoader.defineClass1(Native Method)
> at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
> at
> java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
> at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
> at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
> at
> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:71)
> at
> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:51)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at java.lang.Class.getDeclaredMethods0(Native Method)
> at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
> at java.lang.Class.getDeclaredMethod(Class.java:2128)
> at
> java.io.ObjectStreamClass.getPrivateMethod(ObjectStreamClass.java:1629)
> at java.io.ObjectStreamClass.access$1700(ObjectStreamClass.java:79)
> at java.io.ObjectStreamClass$3.run(ObjectStreamClass.java:520)
> at java.io.ObjectStreamClass$3.run(ObjectStreamClass.java:494)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.io.ObjectStreamClass.(ObjectStreamClass.java:494)
> at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:391)
> at
> java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:681)
> at
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1885)
> at
> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2042)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:534)
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:522)
> at
> org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:67)
> at
> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:471)
> at
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.createOperatorCoordinatorHolder(ExecutionJobVertex.java:286)
> at
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.initialize(ExecutionJobVertex.java:223)
> ... 20 more
> 以下是我的部分POM   
> org.apache.flink
> flink-core
> 1.17.1
> 
> 
> org.apache.flink
> flink-connector-kafka
> 1.17.1
> 
> 
> org.apache.flink
> flink-connector-base
> 1.17.1
> 
>
>
> 看起来像是类加载器异常,需要我修改哪些地方吗
>


Re: Flink 1.16 流表 join 的 FilterPushDown 及并行

2023-07-05 文章 yh z
Hi, Chai Kelun, 你的 filter condition 里面包含了你自定义的 UDF,是不满足 filter push down
的条件的,因为对于优化器来说 UDF 是不确定的,优化器不能从里面提取到可以下推的条件, 如果你想实现下推,可以尝试抽取下确定性的
condition,如 product.id > 10 etc.。另外,Flink 是支持 broadcast hash join
的,如果你想控制某两个表的 join type,你可以通过 join hint 来指定 join 类型为 broadcast。()

Chai Kelun  于2023年7月3日周一 17:58写道:

> 有一张 kafka 流表 logClient(id int, name string, price double),一张实现了
> SupportsFilterPushDown 的 customConnector 维表 product(id int, name string,
> value double),实现了自定义函数 MyUDF(double d1, double d2) 用于自定义逻辑计算并支持该算子的下推。
> 在 Stream-Table Join 的场景下,下列 SQL 并没有将算子进行下推,而是通过 TableScan 将所有算子提到 Join
> 节点进行计算,请问是否有什么选项可以开启下推?(类似与 nestedloop-join,计算推到 product 表数据源进行)
> SELECT A.id, A.name, B.name FROM logClient AS A, product AS B WHERE
> MyUDF(B.value, A.price) < xxx;
> 另外,Kafka 和 customConnector 均支持并行,在 Join 计算时期望使用 BROADCAST 模式,将 product 表在
> logClient 流表的每个 partition 上进行计算,但似乎目前 Flink 流-表 Join 的 distribution 模式仅支持
> SINGLETON 和 HASH[KEY](StreamExecExchange.java Line106 的 switch
> case),后续社区是否会考虑支持更多的 distributionType?
>
> 非常感谢!


Re: flink on native k8s里如何使用flink sql gateway

2023-07-05 文章 Shammon FY
Hi,

我们的做法是启动Flink集群后,在其他节点(pod或者独立启动)启动Sql-Gateway,通过Flink的地址远程连接Flink集群,这样Sql-Gateway的部署和Flink集群完全分开

Best,
Shammon FY


On Tue, Jul 4, 2023 at 10:52 AM chaojianok  wrote:

> 大家好,请教个问题。
>
> 用native kubernetes方式在k8s集群上部署好了flink,现在需要在这个flink集群里使用flink sql
> gateway,大家有什么好的方案吗?
> 目前的做法是,进入pod里启动sql gateway,然后在k8s创建flink-sql-gateway
> service,这样就可以通过这个service来访问sql
> gateway了,但是这个方法有个问题,部署过程中必需进入pod启服务,这是不利于自动化部署的,具体的操作命令如下,大家帮忙看看有没有好的解决方案来避免这个问题。
>
> 1、创建flink集群
> ./bin/kubernetes-session.sh \
> -Dkubernetes.cluster-id=flink-cluster \
> -Dkubernetes.namespace=flink \
> -Dkubernetes.service-account=flink-service-account \
> -Dkubernetes.rest-service.exposed.type=NodePort
>
> 2、进入pod通过 ./bin/sql-gateway.sh start
> -Dsql-gateway.endpoint.rest.address=localhost 启动sql gateway服务,退出pod
>
> 3、创建flink-sql-gateway service
> kubectl expose deployment flink-cluster --type=NodePort --port=8083
> --name=flink-sql-gateway -n flink
>


Re: Re: PartitionNotFoundException循环重启

2023-07-05 文章 Shammon FY
Hi,

如果要增加request
partition的重试时间,可以调整配置项`taskmanager.network.request-backoff.max`,默认是10秒,具体配置可以参阅[1]

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#full-taskmanageroptions

Best,
Shammon FY

On Tue, Jul 4, 2023 at 11:38 AM zhan...@eastcom-sw.com <
zhan...@eastcom-sw.com> wrote:

> 从前面日志看是重启后从hdfs加载checkpoint数据处理(100M左右)这过程好像有点久,还有连kafka消费
> 下游的超时重试  可以设置次数或者时长吗?
>
> 发件人: Shammon FY
> 发送时间: 2023-07-04 10:12
> 收件人: user-zh
> 主题: Re: PartitionNotFoundException循环重启
> Hi,
>
> PartitionNotFoundException异常原因通常是下游task向上游task发送partition
>
> request请求,但是上游task还没有部署成功。一般情况下,下游task会重试,超时后会报出异常。你可以查看下有没有其他的异常日志,查一下上游task为什么没有部署成功。
>
> Best,
> Shammon FY
>
> On Tue, Jul 4, 2023 at 9:30 AM zhan...@eastcom-sw.com <
> zhan...@eastcom-sw.com> wrote:
>
> >
> > 异常日志内容
> >
> > 2023-07-03 20:30:15,164 INFO
> > org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Sink:
> > Sink 3 (2/45)
> > (79a20a2489a31465de9524eaf6b5ebf7_8fb6014c2df1d028b4c9ec6b86c8738f_
> > 1_3093) switched from RUNNING to FAILED on 10.252.210.63:2359-420157 @
> > nbiot-core-mpp-dcos-b-2.novalocal (dataPort=32769).
> > org.apache.flink.runtime.io
> .network.partition.PartitionNotFoundException:
> > Partition
> >
> 65e701af2579c0381a2c3e53bd66fed0#24@79a20a2489a31465de9524eaf6b5ebf7_d952d2a6aebfb900c453884c57f96b82_24_
> > 3093 not found.
> > at org.apache.flink.runtime.io
> .network.partition.ResultPartitionManager.createSubpartitionView(ResultPartitionManager.java:70)
> > ~[flink-dist-1.17.1.jar:1.17.1]
> > at org.apache.flink.runtime.io
> .network.partition.consumer.LocalInputChannel.requestSubpartition(LocalInputChannel.java:136)
> > ~[flink-dist-1.17.1.jar:1.17.1]
> > at org.apache.flink.runtime.io
> .network.partition.consumer.LocalInputChannel$1.run(LocalInputChannel.java:186)
> > ~[flink-dist-1.17.1.jar:1.17.1]
> > at java.util.TimerThread.mainLoop(Timer.java:555) ~[?:1.8.0_77]
> > at java.util.TimerThread.run(Timer.java:505) ~[?:1.8.0_77]
> >
> >
> >
> > 发件人: zhan...@eastcom-sw.com
> > 发送时间: 2023-07-04 09:25
> > 收件人: user-zh
> > 主题: PartitionNotFoundException循环重启
> > hi,我这有两个流量比较大的job(一天3亿/6亿),在启动正常运行了5、6天左右就会出现
> > PartitionNotFoundException 的异常,然后不断的循环重启
> >
> > 在flink-conf.yaml中添加以下参数后,也是同样在6天后会 循环报 PartitionNotFoundException
> > 的异常后,不断的重启
> > taskmanager.network.tcp-connection.enable-reuse-across-jobs: false
> > taskmanager.network.max-num-tcp-connections: 16
> >
> > 当前版本 1.17.1,同样的job跟数据在1.14.4中一直没问题,请问这个有什么办法解决么?
> >
> >
>