HI ,
   好的,我准备测试以下,目前flink 版本是FLINK-1.12.4,kafka-connector flink-connector-kafka_2.11

在 2022-01-21 14:36:06,"selves_nan" <selves_...@163.com> 写道:
>Hi,这个事务id自己指定即可,如果指定了之后还是报错,方便给下用到的flink和kafka-connector版本吗,目前在使用的版本没有看到相关的api
>
>
>| |
>selves_nan
>|
>|
>selves_...@163.com
>|
>签名由网易邮箱大师定制
>
>
>在2022年01月21日 13:00,潘明文<panmin...@163.com> 写道:
>HI,
>"生产者的事务id"  怎么获取呀?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2022-01-21 10:41:37,"selves_nan" <selves_...@163.com> 写道:
>Hi,我觉得应该是prop缺失了kafka事务型生产者的一些配置项导致的,可以尝试一下加入下面的配置项。
>prop.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"生产者的事务id");
>//开启幂等性
>prop.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);
>
>
>| |
>selves_nan
>|
>|
>selves_...@163.com
>|
>签名由网易邮箱大师定制
>
>
>在2022年01月20日 14:39,潘明文<panmin...@163.com> 写道:
>hi,
>我创建FlinkKafkaProducer 是,运行时有时出现以下错误,不知道啥原因。
>
>FlinkKafkaProducer<String> kafkaSink = new FlinkKafkaProducer<>(WRITE_TOPIC, 
>new KeyedSerializationSchemaWrapper<String>(new SimpleStringSchema()), prop, 
>FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
>
>
>org.apache.kafka.common.KafkaException: Unhandled error in EndTxnResponse: The 
>producer attempted to use a producer id which is not currently assigned to its 
>transactional id.
>at 
>org.apache.kafka.clients.producer.internals.TransactionManager$EndTxnHandler.handleResponse(TransactionManager.java:1362)
>at 
>org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1074)
>at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
>at 
>org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:569)
>at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561)
>at 
>org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:425)
>at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:311)
>at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244)
>at java.lang.Thread.run(Thread.java:748)
>Suppressed: java.lang.NullPointerException
>

回复