hi, all
flink 生产数据到 kafka 报错, 导致 job 一直重试 跟踪情况: 每个 job 启动后, 大约正常跑 20 天左右就开始出现这个问题了, 导致任务一直重试, 一直未找到问题的真实原因 报错信息: org.apache.kafka.common.KafkaException: Unhandled error in EndTxnResponse: The producer attempted to use a producer id which is not currently assigned to its transactional id. at org.apache.kafka.clients.producer.internals.TransactionManager$EndTxnHandler.handleResponse(TransactionManager.java:1362) at org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1074) at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:569) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561) at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:425) at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:311) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244) at java.lang.Thread.run(Thread.java:748) kafka 生产配置: // InstanceAlreadyExistsException prop.setProperty("client.id", "") // 修改生产者的事务超时属性transaction.timeout.ms prop.setProperty("transaction.timeout.ms", 1000 * 60 * 5 + "") prop.setProperty("max_in_flight_requests_per_connection", "1") // 幂等性 Producer ENABLE_IDEMPOTENCE_CONFIG prop.setProperty("enable_idempotence_config", "true") // RETRIES_CONFIG prop.setProperty("retries_config", "5") val kafkaSink: FlinkKafkaProducer[String] = new FlinkKafkaProducer[String]( topic, new ResultStringKafkaSerializationSchema(topic), prop, FlinkKafkaProducer.Semantic.EXACTLY_ONCE ) flink 同样配置的 EXACTLY_ONCE env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) 谢谢大家!