hi flinkers,

i encountered one kafka sink exception when start from a savepoint. msg as 
below:
java.lang.IllegalStateException: Received element after endOfInput: Record @ 
(undef) : org.apache.flink.table.data.binary.BinaryRowData@3f797ef9


my env:
flink verison: 1.20.1
kafka flink connector version: 3.3.0
jdk version: 1.8


this sample flink sql job:
----------------------------------------------------
CREATE TEMPORARY TABLE `t1` (
  `org_id` VARCHAR
) WITH (
  'connector' = 'datagen',
  'rows-per-second' = '1'
);
CREATE TEMPORARY TABLE `kafka_out` (

  `org_id` VARCHAR
) WITH (
  'connector' = 'kafka',
  'properties.bootstrap.servers' = 'xxx:9092',
  'topic' = 'test2',
  'value.format' = 'json',
  'properties.enable.idempotence' = 'false'
);
insert into kafka_out select * from t1;

------------------------------------------


steps to reproduce:
1. start this sql job and then stop with savepoint
2. using this savepoint to restart this sql job. then encounter:
2025-12-0817:14:44.002 [kafka_out[2]: Writer -> kafka_out[2]: Committer 
(1/1)#0] WARN org.apache.flink.runtime.taskmanager.Task - kafka_out[2]: Writer 
-> kafka_out[2]: Committer (1/1)#0 
(54d08e14b76ac2f5d7f195c7732acee6_20ba6b65f97481d5570070de90e4e791_0_0) 
switched fromRUNNING to FAILED with failure cause:
java.lang.IllegalStateException: Received element after endOfInput: Record @ 
(undef) : org.apache.flink.table.data.binary.BinaryRowData@3f797ef9
at org.apache.flink.util.Preconditions.checkState(Preconditions.java:215)
at 
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:206)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:238)
at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:157)
at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:114)
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917)
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.lang.Thread.run(Thread.java:748)




deep into the flink 1.20 source code SinkWriterOperator.class, i found that:
1. when savepoint was triggered, SourceOperator called the method stop and the 
method emitNextNotReading sent the status: DataInputStatus.END_OF_DATA
2. the END_OF_DATA finalliy called the SinkWriterOperator.endInput which set 
the variable endOfInput=true. the call stack:

org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.endInput(SinkWriterOperator.java:232)

 at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.endOperatorInput(StreamOperatorWrapper.java:101)

 at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$finish$0(StreamOperatorWrapper.java:154)

 at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)

 at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.finish(StreamOperatorWrapper.java:154)

 at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.finish(StreamOperatorWrapper.java:161)

 at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.finishOperators(RegularOperatorChain.java:115)

 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.endData(StreamTask.java:695)

 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:653)

 at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)

 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973)

 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917)

 at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)

 at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949)

 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)

 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)

 at java.lang.Thread.run(Thread.java:748)




3. the endOfInput was stored in flink status in the savepoint file. 
4. when restored from this savepoint, the endOfInput variable was set to true. 
and the SinkWriterOperator throw an exception when processing new incoming 
record. the source code is as below:
@Override
public void processElement(StreamRecord<InputT> element) throws Exception {
checkState(!endOfInput, "Received element after endOfInput: %s", element);
context.element = element;
sinkWriter.write(element.getValue(), context);
}


is there any way to work around this issue?
best regards,
jian

回复