Hi

从`Caused by: org.apache.kafka.common.errors.ClusterAuthorizationException:
Cluster authorization failed.`这个错误看起来像是权限错误,可以你检查下是否有权限问题

Best,
Shammon

On Fri, Feb 17, 2023 at 6:29 PM lxk <lxk7...@163.com> wrote:

> Flink版本:1.16
> 目前公司针对Flink版本进行升级,从Flink1.14升级到Flink1.16,代码没做任何调整,但是在写入kafka的时候报错:
> 2023-02-17 15:03:19
> org.apache.kafka.common.KafkaException: Cannot execute transactional
> method because we are in an error state
> at
> org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:1125)
> at
> org.apache.kafka.clients.producer.internals.TransactionManager.maybeAddPartition(TransactionManager.java:442)
> at
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:998)
> at
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:912)
> at
> org.apache.flink.connector.kafka.sink.KafkaWriter.write(KafkaWriter.java:197)
> at
> org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:160)
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
> at org.apache.flink.streaming.runtime.io
> .AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
> at org.apache.flink.streaming.runtime.io
> .AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
> at org.apache.flink.streaming.runtime.io
> .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
> at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.kafka.common.errors.ClusterAuthorizationException:
> Cluster authorization failed.
>
>
> 在了解了相关源码之后,知道KafkaSink这种新的kafka
> api在实现精准一次的时候,分为了两个阶段,一个是writer,一个是commiter,其中在writer中维护了一个producerpool,因此需要权限创建producer,这块能理解。
> 但是在使用老的kafka
> api,即FlinkKafkaProducer时,只需要维护一个Producer。不明白为啥在使用老的api的时候还是会报同样的错误。
>
>
> 或者我说的原因不是这个报错的根本原因,希望大家能帮忙解答下

回复