Re:Re: Re: Flink Kafka Sink Error: org.apache.kafka.common.errors.UnsupportedVersionException: Attempted to write a non-default producerId at version 0
感谢回复,我这边问题已经修复了,修改一下clients的版本到2.4.1就可以了 在 2022-08-26 16:20:27,"Weihua Hu" 写道: >可以尝试升级到 2.5+ > >Best, >Weihua > > >On Thu, Aug 25, 2022 at 6:41 PM gulugulucxg wrote: > >> 您好,集群版本是1.1.1,是挺低的,是这个原因吗,升级到多少合适呢 >> 在 2022-08-25 18:31:06,"Weihua Hu" 写道: >> >kafka 集群的版本是什么呢?看起来是集群版本有点低了 >> > >> >Best, >> >Weihua >> > >> > >> >On Thu, Aug 25, 2022 at 3:41 PM gulugulucxg wrote: >> > >> >> 大佬们好: >> >> >> >> >> 我这边指定EXACTLY_ONCE写kafka后,任务直接起能起来,但是从savepoint起任务总是失败,kafka-clients版本2.5.0,flink版本及相关依赖版本均为1.12.4, >> >> >> >> 异常如下: >> >> >> >> 2022-08-25 10:42:44 >> >> >> >> org.apache.kafka.common.errors.UnsupportedVersionException: Attempted >> to write a non-default producerId at version 0 >> >> >> >> 相关代码如下: >> >> Properties properties = new Properties(); >> >> properties.put("bootstrap.servers", >> >> KafkaConstant.bootstrap_servers_01); >> >> properties.put("transaction.timeout.ms", 15 * 60 * 1000); >> >> FlinkKafkaProducer statsLogV2Producer = new >> >> FlinkKafkaProducer<>( >> >> KafkaConstant.topic_01, >> >> new MyKafkaSerializationSchema(KafkaConstant.topic_01), >> >> properties , >> >> FlinkKafkaProducer.Semantic.EXACTLY_ONCE); >> >> >>
Re:Re: 基于savepoint重启作业无法保证端到端一致性
flinkKafkaProducer指定EXACTLY_ONCE语义了吗 在 2022-08-26 16:50:33,"杨扬" 写道: >kafka-2.4.1 >flink-1.14.2 > > > > >> 在 2022年8月26日,下午4:42,Hangxiang Yu 写道: >> >> flink会保证自身的exactly once语义,端到端的exactly once的语义是需要source和sink保证幂等的; >> 你用的kafka是哪个版本? >> >> On Fri, Aug 26, 2022 at 4:08 PM 杨扬 wrote: >> >>> 各位好! >>>目前有一flink作业,source与sink均为kafka。 >>>在换版时(未修改任何代码)基于官网文档命令,创建savepoint并停止作业;而后基于之前创建的savepoint启动作业。 >>>现在发现如此操作无法实现启停前后数据无缝对接,会出现一定的数据重复。 >>> >>>想请教这个问题是savepoint设计时本身就无法保证启停前后端到端一致性,还是我们哪里操作不当呢? >>> >>> >>> >>> >>> >>> >>> >> >> -- >> Best, >> Hangxiang. >> >> === >> 此邮件已由 Deep Discovery Email Inspector 进行了分析。 >
Re:Re: Flink Kafka Sink Error: org.apache.kafka.common.errors.UnsupportedVersionException: Attempted to write a non-default producerId at version 0
您好,集群版本是1.1.1,是挺低的,是这个原因吗,升级到多少合适呢 在 2022-08-25 18:31:06,"Weihua Hu" 写道: >kafka 集群的版本是什么呢?看起来是集群版本有点低了 > >Best, >Weihua > > >On Thu, Aug 25, 2022 at 3:41 PM gulugulucxg wrote: > >> 大佬们好: >> >> 我这边指定EXACTLY_ONCE写kafka后,任务直接起能起来,但是从savepoint起任务总是失败,kafka-clients版本2.5.0,flink版本及相关依赖版本均为1.12.4, >> >> 异常如下: >> >> 2022-08-25 10:42:44 >> >> org.apache.kafka.common.errors.UnsupportedVersionException: Attempted to >> write a non-default producerId at version 0 >> >> 相关代码如下: >> Properties properties = new Properties(); >> properties.put("bootstrap.servers", >> KafkaConstant.bootstrap_servers_01); >> properties.put("transaction.timeout.ms", 15 * 60 * 1000); >> FlinkKafkaProducer statsLogV2Producer = new >> FlinkKafkaProducer<>( >> KafkaConstant.topic_01, >> new MyKafkaSerializationSchema(KafkaConstant.topic_01), >> properties , >> FlinkKafkaProducer.Semantic.EXACTLY_ONCE); >>
Flink Kafka Sink Error: org.apache.kafka.common.errors.UnsupportedVersionException: Attempted to write a non-default producerId at version 0
大佬们好: 我这边指定EXACTLY_ONCE写kafka后,任务直接起能起来,但是从savepoint起任务总是失败,kafka-clients版本2.5.0,flink版本及相关依赖版本均为1.12.4, 异常如下: 2022-08-25 10:42:44 org.apache.kafka.common.errors.UnsupportedVersionException: Attempted to write a non-default producerId at version 0 相关代码如下: Properties properties = new Properties(); properties.put("bootstrap.servers", KafkaConstant.bootstrap_servers_01); properties.put("transaction.timeout.ms", 15 * 60 * 1000); FlinkKafkaProducer statsLogV2Producer = new FlinkKafkaProducer<>( KafkaConstant.topic_01, new MyKafkaSerializationSchema(KafkaConstant.topic_01), properties , FlinkKafkaProducer.Semantic.EXACTLY_ONCE);