Greetings all,

I am having an issue instantiating multiple flink jobs uisng the same JAR in the same Flink native cluster (all 1.12.1).

When processing events, the jobs fail with the following trace:

org.apache.kafka.common.KafkaException: Cannotperform send because at least one previous transactional oridempotent request has failed with errors.     at org.apache.kafka.clients.producer.internals.TransactionManager.failIfNotReadyForSend(TransactionManager.java:356)     at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:926)     at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:865)     at org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.send(FlinkKafkaInternalProducer.java:133)     at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:915)     at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:99)     at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:223)     at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)     at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:187)     at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204)     at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)     at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)     at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:395)     at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)     at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609)     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
    at java.base/java.lang.Thread.run(UnknownSource)
Suppressed: org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failedto send data to Kafka: Producerattempted an operation with an old epoch. Eitherthere is a newer producer with the same transactionalId, orthe producer's transaction has been expired by the broker.         at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1392)         at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:965)         at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)         at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)         at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:783)         at org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:762)         at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:681)         at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:585)
        ... 3more
Causedby: org.apache.kafka.common.errors.ProducerFencedException: Producerattempted an operation with an old epoch. Eitherthere is a newer producer with the same transactionalId, orthe producer's transaction has been expired by the broker. Suppressed: java.lang.IllegalStateException: Pendingrecord count must be zero at thispoint: 1             at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:1090)             at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:925)             at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)             at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)             at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:783)             at org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:762)             at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:681)             at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:585)             at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
            at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
            at java.base/java.lang.Thread.run(UnknownSource)
Suppressed: java.lang.IllegalStateException: Pendingrecord count must be zero at thispoint: 1             at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:1090)             at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:925)             at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)             at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)             at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:783)             at org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:762)             at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:681)             at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:585)             at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
            at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
            at java.base/java.lang.Thread.run(UnknownSource) Concerning configurations, I have set the transaction.max.timeout.ms on the Kafka server to one hour as advised here https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-producers-and-fault-tolerance. Additionally in the producer TRANSACTIONAL_ID_CONFIG variable to random i.e. UUID.randomUUID().toString() Any ideas of why this would be the case? Regards, Morgan.

Reply via email to