Hi 从`Caused by: org.apache.kafka.common.errors.ClusterAuthorizationException: Cluster authorization failed.`这个错误看起来像是权限错误,可以你检查下是否有权限问题
Best, Shammon On Fri, Feb 17, 2023 at 6:29 PM lxk <lxk7...@163.com> wrote: > Flink版本:1.16 > 目前公司针对Flink版本进行升级,从Flink1.14升级到Flink1.16,代码没做任何调整,但是在写入kafka的时候报错: > 2023-02-17 15:03:19 > org.apache.kafka.common.KafkaException: Cannot execute transactional > method because we are in an error state > at > org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:1125) > at > org.apache.kafka.clients.producer.internals.TransactionManager.maybeAddPartition(TransactionManager.java:442) > at > org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:998) > at > org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:912) > at > org.apache.flink.connector.kafka.sink.KafkaWriter.write(KafkaWriter.java:197) > at > org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:160) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233) > at org.apache.flink.streaming.runtime.io > .AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) > at org.apache.flink.streaming.runtime.io > .AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) > at org.apache.flink.streaming.runtime.io > .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.kafka.common.errors.ClusterAuthorizationException: > Cluster authorization failed. > > > 在了解了相关源码之后,知道KafkaSink这种新的kafka > api在实现精准一次的时候,分为了两个阶段,一个是writer,一个是commiter,其中在writer中维护了一个producerpool,因此需要权限创建producer,这块能理解。 > 但是在使用老的kafka > api,即FlinkKafkaProducer时,只需要维护一个Producer。不明白为啥在使用老的api的时候还是会报同样的错误。 > > > 或者我说的原因不是这个报错的根本原因,希望大家能帮忙解答下