hi flinkers,
i encountered one kafka sink exception when start from a savepoint. msg as
below:
java.lang.IllegalStateException: Received element after endOfInput: Record @
(undef) : org.apache.flink.table.data.binary.BinaryRowData@3f797ef9
my env:
flink verison: 1.20.1
kafka flink connector version: 3.3.0
jdk version: 1.8
this sample flink sql job:
----------------------------------------------------
CREATE TEMPORARY TABLE `t1` (
`org_id` VARCHAR
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1'
);
CREATE TEMPORARY TABLE `kafka_out` (
`org_id` VARCHAR
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = 'xxx:9092',
'topic' = 'test2',
'value.format' = 'json',
'properties.enable.idempotence' = 'false'
);
insert into kafka_out select * from t1;
------------------------------------------
steps to reproduce:
1. start this sql job and then stop with savepoint
2. using this savepoint to restart this sql job. then encounter:
2025-12-0817:14:44.002 [kafka_out[2]: Writer -> kafka_out[2]: Committer
(1/1)#0] WARN org.apache.flink.runtime.taskmanager.Task - kafka_out[2]: Writer
-> kafka_out[2]: Committer (1/1)#0
(54d08e14b76ac2f5d7f195c7732acee6_20ba6b65f97481d5570070de90e4e791_0_0)
switched fromRUNNING to FAILED with failure cause:
java.lang.IllegalStateException: Received element after endOfInput: Record @
(undef) : org.apache.flink.table.data.binary.BinaryRowData@3f797ef9
at org.apache.flink.util.Preconditions.checkState(Preconditions.java:215)
at
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:206)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:238)
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:157)
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:114)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917)
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.lang.Thread.run(Thread.java:748)
deep into the flink 1.20 source code SinkWriterOperator.class, i found that:
1. when savepoint was triggered, SourceOperator called the method stop and the
method emitNextNotReading sent the status: DataInputStatus.END_OF_DATA
2. the END_OF_DATA finalliy called the SinkWriterOperator.endInput which set
the variable endOfInput=true. the call stack:
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.endInput(SinkWriterOperator.java:232)
at
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.endOperatorInput(StreamOperatorWrapper.java:101)
at
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$finish$0(StreamOperatorWrapper.java:154)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.finish(StreamOperatorWrapper.java:154)
at
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.finish(StreamOperatorWrapper.java:161)
at
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.finishOperators(RegularOperatorChain.java:115)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.endData(StreamTask.java:695)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:653)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917)
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.lang.Thread.run(Thread.java:748)
3. the endOfInput was stored in flink status in the savepoint file.
4. when restored from this savepoint, the endOfInput variable was set to true.
and the SinkWriterOperator throw an exception when processing new incoming
record. the source code is as below:
@Override
public void processElement(StreamRecord<InputT> element) throws Exception {
checkState(!endOfInput, "Received element after endOfInput: %s", element);
context.element = element;
sinkWriter.write(element.getValue(), context);
}
is there any way to work around this issue?
best regards,
jian