HI,

 flink 还是报以下错误:
org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send 
data to Kafka: Producer attempted an operation with an old epoch. Either there 
is a newer producer with the same transactionalId, or the producer's 
transaction has been expired by the broker


代码如下:
Properties prop = new Properties();
prop.setProperty("bootstrap.servers", KAFKA_ADDR);
prop.setProperty("acks", "all");
//设置producer 幂等性 保证producer 数据写入到broker 不重复
prop.setProperty("enable.idempotence", "true");
// 设置FlinkKafkaProducer里面的事务超时时间,默认broker的最大事务超时时间为15分钟
prop.setProperty("transaction.timeout.ms", transaction + "");
prop.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"my_tx_id");
prop.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);


FlinkKafkaProducer<String> kafkaSink = new FlinkKafkaProducer<>(
WRITE_TOPIC,
serializationSchema,
prop,
FlinkKafkaProducer.Semantic.EXACTLY_ONCE); 














At 2022-03-07 10:06:45, "潘明文" <panmin...@163.com> wrote:

目前出现下错误:
org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send 
data to Kafka: Producer attempted an operation with an old epoch. Either there 
is a newer producer with the same transactionalId, or the producer's 
transaction has been expired by the broker


















在 2022-01-21 15:15:51,"潘明文" <panmin...@163.com> 写道:
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>HI ,
>   好的,我准备测试以下,目前flink 版本是FLINK-1.12.4,kafka-connector 
> flink-connector-kafka_2.11
>
>在 2022-01-21 14:36:06,"selves_nan" <selves_...@163.com> 写道:
>>Hi,这个事务id自己指定即可,如果指定了之后还是报错,方便给下用到的flink和kafka-connector版本吗,目前在使用的版本没有看到相关的api
>>
>>
>>| |
>>selves_nan
>>|
>>|
>>selves_...@163.com
>>|
>>签名由网易邮箱大师定制
>>
>>
>>在2022年01月21日 13:00,潘明文<panmin...@163.com> 写道:
>>HI,
>>"生产者的事务id"  怎么获取呀?
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>在 2022-01-21 10:41:37,"selves_nan" <selves_...@163.com> 写道:
>>Hi,我觉得应该是prop缺失了kafka事务型生产者的一些配置项导致的,可以尝试一下加入下面的配置项。
>>prop.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"生产者的事务id");
>>//开启幂等性
>>prop.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);
>>
>>
>>| |
>>selves_nan
>>|
>>|
>>selves_...@163.com
>>|
>>签名由网易邮箱大师定制
>>
>>
>>在2022年01月20日 14:39,潘明文<panmin...@163.com> 写道:
>>hi,
>>我创建FlinkKafkaProducer 是,运行时有时出现以下错误,不知道啥原因。
>>
>>FlinkKafkaProducer<String> kafkaSink = new FlinkKafkaProducer<>(WRITE_TOPIC, 
>>new KeyedSerializationSchemaWrapper<String>(new SimpleStringSchema()), prop, 
>>FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
>>
>>
>>org.apache.kafka.common.KafkaException: Unhandled error in EndTxnResponse: 
>>The producer attempted to use a producer id which is not currently assigned 
>>to its transactional id.
>>at 
>>org.apache.kafka.clients.producer.internals.TransactionManager$EndTxnHandler.handleResponse(TransactionManager.java:1362)
>>at 
>>org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1074)
>>at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
>>at 
>>org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:569)
>>at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561)
>>at 
>>org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:425)
>>at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:311)
>>at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244)
>>at java.lang.Thread.run(Thread.java:748)
>>Suppressed: java.lang.NullPointerException
>>





 

回复