I encountered this error that happened every time the job manager stopped
the job to scale up.
The task manager stops the KafkaProducer, encounters an exception, and dies
every single time:
[flink-concurrent-counter-paxid-taskmanager-1-56] 2026-04-07 10:56:24,910
INFO org.apache.kafka.clients.producer.KafkaProducer [] -
[Producer clientId=producer-1] Closing the Kafka producer with
timeoutMillis = 0 ms.
[flink-concurrent-counter-paxid-taskmanager-1-56] 2026-04-07 10:56:24,911
INFO org.apache.kafka.clients.producer.KafkaProducer [] -
[Producer clientId=producer-1] Proceeding to force close the producer since
pending requests could not be completed within timeout 0 ms.
[flink-concurrent-counter-paxid-taskmanager-1-56] 2026-04-07 10:56:25,048
ERROR org.apache.flink.runtime.taskexecutor.TaskManagerRunner [] -
Fatal error occurred while executing the TaskManager. Shutting it down...
[flink-concurrent-counter-paxid-taskmanager-1-56] java.io.IOException: One
or more Kafka Producer send requests have encountered exception
[flink-concurrent-counter-paxid-taskmanager-1-56] at
org.apache.flink.connector.kafka.sink.KafkaWriter.checkAsyncException(KafkaWriter.java:290)
~[blob_p-c4f01cac09c2a184c2aa0f2916c77912120e2565-39854d6a628e76fba3f670bfa00160d8:flink-2-2-0]
[flink-concurrent-counter-paxid-taskmanager-1-56] at
org.apache.flink.connector.kafka.sink.KafkaWriter.close(KafkaWriter.java:207)
~[blob_p-c4f01cac09c2a184c2aa0f2916c77912120e2565-39854d6a628e76fba3f670bfa00160d8:flink-2-2-0]
[flink-concurrent-counter-paxid-taskmanager-1-56] at
org.apache.flink.util.IOUtils.closeAll(IOUtils.java:257)
~[flink-dist-2.2.0.jar:2.2.0]
[flink-concurrent-counter-paxid-taskmanager-1-56] at
org.apache.flink.util.IOUtils.closeAll(IOUtils.java:236)
~[flink-dist-2.2.0.jar:2.2.0]
[flink-concurrent-counter-paxid-taskmanager-1-56] at
org.apache.flink.util.IOUtils.closeAll(IOUtils.java:225)
~[flink-dist-2.2.0.jar:2.2.0]
[flink-concurrent-counter-paxid-taskmanager-1-56] at
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.close(SinkWriterOperator.java:247)
~[flink-dist-2.2.0.jar:2.2.0]
[flink-concurrent-counter-paxid-taskmanager-1-56] at
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:163)
~[flink-dist-2.2.0.jar:2.2.0]
[flink-concurrent-counter-paxid-taskmanager-1-56] at
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.closeAllOperators(RegularOperatorChain.java:125)
~[flink-dist-2.2.0.jar:2.2.0]
[flink-concurrent-counter-paxid-taskmanager-1-56] at
org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:1197)
~[flink-dist-2.2.0.jar:2.2.0]
[flink-concurrent-counter-paxid-taskmanager-1-56] at
org.apache.flink.util.IOUtils.closeAll(IOUtils.java:257)
~[flink-dist-2.2.0.jar:2.2.0]
[flink-concurrent-counter-paxid-taskmanager-1-56] at
org.apache.flink.core.fs.AutoCloseableRegistry.doClose(AutoCloseableRegistry.java:83)
~[flink-dist-2.2.0.jar:2.2.0]
[flink-concurrent-counter-paxid-taskmanager-1-56] at
org.apache.flink.util.AbstractAutoCloseableRegistry.close(AbstractAutoCloseableRegistry.java:127)
~[flink-dist-2.2.0.jar:2.2.0]
[flink-concurrent-counter-paxid-taskmanager-1-56] at
org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUp(StreamTask.java:1101)
~[flink-dist-2.2.0.jar:2.2.0]
[flink-concurrent-counter-paxid-taskmanager-1-56] at
org.apache.flink.runtime.taskmanager.Task.lambda$restoreAndInvoke$2(Task.java:958)
~[flink-dist-2.2.0.jar:2.2.0]
[flink-concurrent-counter-paxid-taskmanager-1-56] at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:973)
~[flink-dist-2.2.0.jar:2.2.0]
[flink-concurrent-counter-paxid-taskmanager-1-56] at
org.apache.flink.runtime.taskmanager.Task.lambda$restoreAndInvoke$3(Task.java:958)
[flink-dist-2.2.0.jar:2.2.0]
[flink-concurrent-counter-paxid-taskmanager-1-56] at
org.apache.flink.util.IOUtils.closeAll(IOUtils.java:257)
~[flink-dist-2.2.0.jar:2.2.0]
[flink-concurrent-counter-paxid-taskmanager-1-56] at
org.apache.flink.core.fs.AutoCloseableRegistry.doClose(AutoCloseableRegistry.java:83)
~[flink-dist-2.2.0.jar:2.2.0]
[flink-concurrent-counter-paxid-taskmanager-1-56] at
org.apache.flink.util.AbstractAutoCloseableRegistry.close(AbstractAutoCloseableRegistry.java:127)
~[flink-dist-2.2.0.jar:2.2.0]
[flink-concurrent-counter-paxid-taskmanager-1-56] at
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:794)
[flink-dist-2.2.0.jar:2.2.0]
[flink-concurrent-counter-paxid-taskmanager-1-56] at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:569)
[flink-dist-2.2.0.jar:2.2.0]
[flink-concurrent-counter-paxid-taskmanager-1-56] at
java.base/java.lang.Thread.run(Unknown Source) [?:?]
[flink-concurrent-counter-paxid-taskmanager-1-56] Caused by:
org.apache.flink.util.FlinkRuntimeException: Failed to send data to Kafka
prd-flink-acorn-feature-egress-9@-1 with
FlinkKafkaInternalProducer@776061509{transactionalId='null',
transactionState=NOT_IN_TRANSACTION, closed=false}
[flink-concurrent-counter-paxid-taskmanager-1-56] at
org.apache.flink.connector.kafka.sink.KafkaWriter$WriterCallback.decorateException(KafkaWriter.java:346)
~[blob_p-c4f01cac09c2a184c2aa0f2916c77912120e2565-39854d6a628e76fba3f670bfa00160d8:flink-2-2-0]
[flink-concurrent-counter-paxid-taskmanager-1-56] at
org.apache.flink.connector.kafka.sink.KafkaWriter$WriterCallback.onCompletion(KafkaWriter.java:318)
~[blob_p-c4f01cac09c2a184c2aa0f2916c77912120e2565-39854d6a628e76fba3f670bfa00160d8:flink-2-2-0]
[flink-concurrent-counter-paxid-taskmanager-1-56] at
org.apache.kafka.clients.producer.KafkaProducer$AppendCallbacks.onCompletion(KafkaProducer.java:1578)
~[blob_p-c4f01cac09c2a184c2aa0f2916c77912120e2565-39854d6a628e76fba3f670bfa00160d8:flink-2-2-0]
[flink-concurrent-counter-paxid-taskmanager-1-56] at
org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:312)
~[blob_p-c4f01cac09c2a184c2aa0f2916c77912120e2565-39854d6a628e76fba3f670bfa00160d8:flink-2-2-0]
[flink-concurrent-counter-paxid-taskmanager-1-56] at
org.apache.kafka.clients.producer.internals.ProducerBatch.abort(ProducerBatch.java:200)
~[blob_p-c4f01cac09c2a184c2aa0f2916c77912120e2565-39854d6a628e76fba3f670bfa00160d8:flink-2-2-0]
[flink-concurrent-counter-paxid-taskmanager-1-56] at
org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:1146)
~[blob_p-c4f01cac09c2a184c2aa0f2916c77912120e2565-39854d6a628e76fba3f670bfa00160d8:flink-2-2-0]
[flink-concurrent-counter-paxid-taskmanager-1-56] at
org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:1133)
~[blob_p-c4f01cac09c2a184c2aa0f2916c77912120e2565-39854d6a628e76fba3f670bfa00160d8:flink-2-2-0]
[flink-concurrent-counter-paxid-taskmanager-1-56] at
org.apache.kafka.clients.producer.internals.RecordAccumulator.abortIncompleteBatches(RecordAccumulator.java:1120)
~[blob_p-c4f01cac09c2a184c2aa0f2916c77912120e2565-39854d6a628e76fba3f670bfa00160d8:flink-2-2-0]
[flink-concurrent-counter-paxid-taskmanager-1-56] at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:299)
~[blob_p-c4f01cac09c2a184c2aa0f2916c77912120e2565-39854d6a628e76fba3f670bfa00160d8:flink-2-2-0]
[flink-concurrent-counter-paxid-taskmanager-1-56] ... 1 more
[flink-concurrent-counter-paxid-taskmanager-1-56] Caused by:
org.apache.kafka.common.KafkaException: Producer is closed forcefully.
[flink-concurrent-counter-paxid-taskmanager-1-56] at
org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:1133)
~[blob_p-c4f01cac09c2a184c2aa0f2916c77912120e2565-39854d6a628e76fba3f670bfa00160d8:flink-2-2-0]
[flink-concurrent-counter-paxid-taskmanager-1-56] at
org.apache.kafka.clients.producer.internals.RecordAccumulator.abortIncompleteBatches(RecordAccumulator.java:1120)
~[blob_p-c4f01cac09c2a184c2aa0f2916c77912120e2565-39854d6a628e76fba3f670bfa00160d8:flink-2-2-0]
[flink-concurrent-counter-paxid-taskmanager-1-56] at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:299)
~[blob_p-c4f01cac09c2a184c2aa0f2916c77912120e2565-39854d6a628e76fba3f670bfa00160d8:flink-2-2-0]
[flink-concurrent-counter-paxid-taskmanager-1-56] ... 1 more
the kafkaWriter calls closeAll on its producer via IOUtils:
https://raw.githubusercontent.com/apache/flink-connector-kafka/main/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
the IOUtils call close() on the producer:
https://github.com/apache/flink/blob/release-2.2/flink-core/src/main/java/org/apache/flink/util/IOUtils.java#L257
and the producer (FlinkKafkaInternalProducer)
https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/FlinkKafkaInternalProducer.java
calls super.close(Duration.ZERO); on the KafkaProducer
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L1518
which then throws the exception.
I don't think it's a good idea to force close the producer, and the
TaskManager doesn't seem to be able to handle it either. Unfortunately it
seems hardcoded behaviour that I cannot solve via a kafka producer config?
Would appreciate some help here
Best Regards
Clemens