我们改了权限确实解决了这个问题。但我现在想了解的是为什么Flink在1.16的时候需要创建producerID的权限,以及这个权限是不是针对新老Kafka 
API都需要的。针对新的Kafka 
API在精准一次的时候需要管理ProducerID在源码中有体现,但是老的API没看见相关的,只使用了一个ProducerID也需要由Flink内部自己管理吗?

















在 2023-02-20 08:45:18,"Shammon FY" <zjur...@gmail.com> 写道:
>Hi
>
>从`Caused by: org.apache.kafka.common.errors.ClusterAuthorizationException:
>Cluster authorization failed.`这个错误看起来像是权限错误,可以你检查下是否有权限问题
>
>Best,
>Shammon
>
>On Fri, Feb 17, 2023 at 6:29 PM lxk <lxk7...@163.com> wrote:
>
>> Flink版本:1.16
>> 目前公司针对Flink版本进行升级,从Flink1.14升级到Flink1.16,代码没做任何调整,但是在写入kafka的时候报错:
>> 2023-02-17 15:03:19
>> org.apache.kafka.common.KafkaException: Cannot execute transactional
>> method because we are in an error state
>> at
>> org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:1125)
>> at
>> org.apache.kafka.clients.producer.internals.TransactionManager.maybeAddPartition(TransactionManager.java:442)
>> at
>> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:998)
>> at
>> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:912)
>> at
>> org.apache.flink.connector.kafka.sink.KafkaWriter.write(KafkaWriter.java:197)
>> at
>> org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:160)
>> at
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
>> at org.apache.flink.streaming.runtime.io
>> .AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
>> at org.apache.flink.streaming.runtime.io
>> .AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
>> at org.apache.flink.streaming.runtime.io
>> .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542)
>> at
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
>> at
>> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
>> at
>> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: org.apache.kafka.common.errors.ClusterAuthorizationException:
>> Cluster authorization failed.
>>
>>
>> 在了解了相关源码之后,知道KafkaSink这种新的kafka
>> api在实现精准一次的时候,分为了两个阶段,一个是writer,一个是commiter,其中在writer中维护了一个producerpool,因此需要权限创建producer,这块能理解。
>> 但是在使用老的kafka
>> api,即FlinkKafkaProducer时,只需要维护一个Producer。不明白为啥在使用老的api的时候还是会报同样的错误。
>>
>>
>> 或者我说的原因不是这个报错的根本原因,希望大家能帮忙解答下

回复