Hi Clemens,
this is a known issue that has been fixed on the connector’s main
branch but has not been released yet.
JIRA: https://issues.apache.org/jira/browse/FLINK-38947
Fix: https://github.com/apache/flink-connector-kafka/pull/214
What’s happening:
When Flink stops a task (e.g., during rescaling), KafkaWriter.close()
is invoked. It force-closes the underlying Kafka producer with
Duration.ZERO, which aborts all pending (unacknowledged) batches in
the producer’s RecordAccumulator. Each aborted batch triggers an error
callback with KafkaException("Producer is closed forcefully.").
In the released connector versions, WriterCallback.onCompletion() does
not check whether the writer is already shutting down and
unconditionally stores the exception. Then checkAsyncException() at
the end of close() re-throws it as an IOException (with root cause
KafkaException: Producer is closed forcefully in your message), which
propagates through the task cleanup path and can fail the TaskManager.
You are correct that this cannot be solved via Kafka producer
configuration this is a connector-level issue in how shutdown errors
are handled.
What the fix does (in KafkaWriter):
1.Makes the closed field volatile to ensure visibility between the
task thread and the Kafka IO thread.
2.Adds a closed guard in WriterCallback.onCompletion() so that
callbacks arriving after shutdown has started are ignored.
Potential workaround until the next release:
You can build the connector from the main branch to pick up the fix.
On Tue, 7 Apr 2026 at 15:08, Clemens S <[email protected]> wrote:
>
> I encountered this error that happened every time the job manager stopped the
> job to scale up.
> The task manager stops the KafkaProducer, encounters an exception, and dies
> every single time:
>
> [flink-concurrent-counter-paxid-taskmanager-1-56] 2026-04-07 10:56:24,910
> INFO org.apache.kafka.clients.producer.KafkaProducer [] -
> [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis
> = 0 ms.
> [flink-concurrent-counter-paxid-taskmanager-1-56] 2026-04-07 10:56:24,911
> INFO org.apache.kafka.clients.producer.KafkaProducer [] -
> [Producer clientId=producer-1] Proceeding to force close the producer since
> pending requests could not be completed within timeout 0 ms.
>
> [flink-concurrent-counter-paxid-taskmanager-1-56] 2026-04-07 10:56:25,048
> ERROR org.apache.flink.runtime.taskexecutor.TaskManagerRunner [] - Fatal
> error occurred while executing the TaskManager. Shutting it down...
> [flink-concurrent-counter-paxid-taskmanager-1-56] java.io.IOException: One or
> more Kafka Producer send requests have encountered exception
> [flink-concurrent-counter-paxid-taskmanager-1-56] at
> org.apache.flink.connector.kafka.sink.KafkaWriter.checkAsyncException(KafkaWriter.java:290)
>
> ~[blob_p-c4f01cac09c2a184c2aa0f2916c77912120e2565-39854d6a628e76fba3f670bfa00160d8:flink-2-2-0]
> [flink-concurrent-counter-paxid-taskmanager-1-56] at
> org.apache.flink.connector.kafka.sink.KafkaWriter.close(KafkaWriter.java:207)
> ~[blob_p-c4f01cac09c2a184c2aa0f2916c77912120e2565-39854d6a628e76fba3f670bfa00160d8:flink-2-2-0]
> [flink-concurrent-counter-paxid-taskmanager-1-56] at
> org.apache.flink.util.IOUtils.closeAll(IOUtils.java:257)
> ~[flink-dist-2.2.0.jar:2.2.0]
> [flink-concurrent-counter-paxid-taskmanager-1-56] at
> org.apache.flink.util.IOUtils.closeAll(IOUtils.java:236)
> ~[flink-dist-2.2.0.jar:2.2.0]
> [flink-concurrent-counter-paxid-taskmanager-1-56] at
> org.apache.flink.util.IOUtils.closeAll(IOUtils.java:225)
> ~[flink-dist-2.2.0.jar:2.2.0]
> [flink-concurrent-counter-paxid-taskmanager-1-56] at
> org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.close(SinkWriterOperator.java:247)
> ~[flink-dist-2.2.0.jar:2.2.0]
> [flink-concurrent-counter-paxid-taskmanager-1-56] at
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:163)
> ~[flink-dist-2.2.0.jar:2.2.0]
> [flink-concurrent-counter-paxid-taskmanager-1-56] at
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.closeAllOperators(RegularOperatorChain.java:125)
> ~[flink-dist-2.2.0.jar:2.2.0]
> [flink-concurrent-counter-paxid-taskmanager-1-56] at
> org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:1197)
> ~[flink-dist-2.2.0.jar:2.2.0]
> [flink-concurrent-counter-paxid-taskmanager-1-56] at
> org.apache.flink.util.IOUtils.closeAll(IOUtils.java:257)
> ~[flink-dist-2.2.0.jar:2.2.0]
> [flink-concurrent-counter-paxid-taskmanager-1-56] at
> org.apache.flink.core.fs.AutoCloseableRegistry.doClose(AutoCloseableRegistry.java:83)
> ~[flink-dist-2.2.0.jar:2.2.0]
> [flink-concurrent-counter-paxid-taskmanager-1-56] at
> org.apache.flink.util.AbstractAutoCloseableRegistry.close(AbstractAutoCloseableRegistry.java:127)
> ~[flink-dist-2.2.0.jar:2.2.0]
> [flink-concurrent-counter-paxid-taskmanager-1-56] at
> org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUp(StreamTask.java:1101)
> ~[flink-dist-2.2.0.jar:2.2.0]
> [flink-concurrent-counter-paxid-taskmanager-1-56] at
> org.apache.flink.runtime.taskmanager.Task.lambda$restoreAndInvoke$2(Task.java:958)
> ~[flink-dist-2.2.0.jar:2.2.0]
> [flink-concurrent-counter-paxid-taskmanager-1-56] at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:973)
> ~[flink-dist-2.2.0.jar:2.2.0]
> [flink-concurrent-counter-paxid-taskmanager-1-56] at
> org.apache.flink.runtime.taskmanager.Task.lambda$restoreAndInvoke$3(Task.java:958)
> [flink-dist-2.2.0.jar:2.2.0]
> [flink-concurrent-counter-paxid-taskmanager-1-56] at
> org.apache.flink.util.IOUtils.closeAll(IOUtils.java:257)
> ~[flink-dist-2.2.0.jar:2.2.0]
> [flink-concurrent-counter-paxid-taskmanager-1-56] at
> org.apache.flink.core.fs.AutoCloseableRegistry.doClose(AutoCloseableRegistry.java:83)
> ~[flink-dist-2.2.0.jar:2.2.0]
> [flink-concurrent-counter-paxid-taskmanager-1-56] at
> org.apache.flink.util.AbstractAutoCloseableRegistry.close(AbstractAutoCloseableRegistry.java:127)
> ~[flink-dist-2.2.0.jar:2.2.0]
> [flink-concurrent-counter-paxid-taskmanager-1-56] at
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:794)
> [flink-dist-2.2.0.jar:2.2.0]
> [flink-concurrent-counter-paxid-taskmanager-1-56] at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:569)
> [flink-dist-2.2.0.jar:2.2.0]
> [flink-concurrent-counter-paxid-taskmanager-1-56] at
> java.base/java.lang.Thread.run(Unknown Source) [?:?]
> [flink-concurrent-counter-paxid-taskmanager-1-56] Caused by:
> org.apache.flink.util.FlinkRuntimeException: Failed to send data to Kafka
> prd-flink-acorn-feature-egress-9@-1 with
> FlinkKafkaInternalProducer@776061509{transactionalId='null',
> transactionState=NOT_IN_TRANSACTION, closed=false}
> [flink-concurrent-counter-paxid-taskmanager-1-56] at
> org.apache.flink.connector.kafka.sink.KafkaWriter$WriterCallback.decorateException(KafkaWriter.java:346)
>
> ~[blob_p-c4f01cac09c2a184c2aa0f2916c77912120e2565-39854d6a628e76fba3f670bfa00160d8:flink-2-2-0]
> [flink-concurrent-counter-paxid-taskmanager-1-56] at
> org.apache.flink.connector.kafka.sink.KafkaWriter$WriterCallback.onCompletion(KafkaWriter.java:318)
>
> ~[blob_p-c4f01cac09c2a184c2aa0f2916c77912120e2565-39854d6a628e76fba3f670bfa00160d8:flink-2-2-0]
> [flink-concurrent-counter-paxid-taskmanager-1-56] at
> org.apache.kafka.clients.producer.KafkaProducer$AppendCallbacks.onCompletion(KafkaProducer.java:1578)
>
> ~[blob_p-c4f01cac09c2a184c2aa0f2916c77912120e2565-39854d6a628e76fba3f670bfa00160d8:flink-2-2-0]
> [flink-concurrent-counter-paxid-taskmanager-1-56] at
> org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:312)
>
> ~[blob_p-c4f01cac09c2a184c2aa0f2916c77912120e2565-39854d6a628e76fba3f670bfa00160d8:flink-2-2-0]
> [flink-concurrent-counter-paxid-taskmanager-1-56] at
> org.apache.kafka.clients.producer.internals.ProducerBatch.abort(ProducerBatch.java:200)
>
> ~[blob_p-c4f01cac09c2a184c2aa0f2916c77912120e2565-39854d6a628e76fba3f670bfa00160d8:flink-2-2-0]
> [flink-concurrent-counter-paxid-taskmanager-1-56] at
> org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:1146)
>
> ~[blob_p-c4f01cac09c2a184c2aa0f2916c77912120e2565-39854d6a628e76fba3f670bfa00160d8:flink-2-2-0]
> [flink-concurrent-counter-paxid-taskmanager-1-56] at
> org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:1133)
>
> ~[blob_p-c4f01cac09c2a184c2aa0f2916c77912120e2565-39854d6a628e76fba3f670bfa00160d8:flink-2-2-0]
> [flink-concurrent-counter-paxid-taskmanager-1-56] at
> org.apache.kafka.clients.producer.internals.RecordAccumulator.abortIncompleteBatches(RecordAccumulator.java:1120)
>
> ~[blob_p-c4f01cac09c2a184c2aa0f2916c77912120e2565-39854d6a628e76fba3f670bfa00160d8:flink-2-2-0]
> [flink-concurrent-counter-paxid-taskmanager-1-56] at
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:299)
> ~[blob_p-c4f01cac09c2a184c2aa0f2916c77912120e2565-39854d6a628e76fba3f670bfa00160d8:flink-2-2-0]
> [flink-concurrent-counter-paxid-taskmanager-1-56] ... 1 more
> [flink-concurrent-counter-paxid-taskmanager-1-56] Caused by:
> org.apache.kafka.common.KafkaException: Producer is closed forcefully.
> [flink-concurrent-counter-paxid-taskmanager-1-56] at
> org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:1133)
>
> ~[blob_p-c4f01cac09c2a184c2aa0f2916c77912120e2565-39854d6a628e76fba3f670bfa00160d8:flink-2-2-0]
> [flink-concurrent-counter-paxid-taskmanager-1-56] at
> org.apache.kafka.clients.producer.internals.RecordAccumulator.abortIncompleteBatches(RecordAccumulator.java:1120)
>
> ~[blob_p-c4f01cac09c2a184c2aa0f2916c77912120e2565-39854d6a628e76fba3f670bfa00160d8:flink-2-2-0]
> [flink-concurrent-counter-paxid-taskmanager-1-56] at
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:299)
> ~[blob_p-c4f01cac09c2a184c2aa0f2916c77912120e2565-39854d6a628e76fba3f670bfa00160d8:flink-2-2-0]
> [flink-concurrent-counter-paxid-taskmanager-1-56] ... 1 more
>
>
> the kafkaWriter calls closeAll on its producer via IOUtils:
> https://raw.githubusercontent.com/apache/flink-connector-kafka/main/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
>
> the IOUtils call close() on the producer:
> https://github.com/apache/flink/blob/release-2.2/flink-core/src/main/java/org/apache/flink/util/IOUtils.java#L257
>
> and the producer (FlinkKafkaInternalProducer)
> https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/FlinkKafkaInternalProducer.java
>
> calls super.close(Duration.ZERO); on the KafkaProducer
>
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L1518
> which then throws the exception.
>
> I don't think it's a good idea to force close the producer, and the
> TaskManager doesn't seem to be able to handle it either. Unfortunately it
> seems hardcoded behaviour that I cannot solve via a kafka producer config?
>
> Would appreciate some help here
> Best Regards
> Clemens
--
Kind regards,
Aleksandr