Unfortunately, I only have the truncated stack trace available (from the flink
UI).
L
2021-04-27 16:32:02
java.lang.Exception: Could not perform checkpoint 10 for operator Source:
progress source (4/6)#9.
at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:924)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$8(StreamTask.java:885)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not
complete snapshot 10 for operator Source: progress source (4/6)#9. Failure
reason: Checkpoint was declined.
at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:241)
at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:162)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:371)
at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:686)
at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:607)
at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:572)
at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:298)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$9(StreamTask.java:1004)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:988)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:912)
... 10 more
Caused by: java.lang.IllegalArgumentException: Invalid negative offset
at
org.apache.kafka.clients.consumer.OffsetAndMetadata.<init>(OffsetAndMetadata.java:50)
at
org.apache.kafka.clients.consumer.OffsetAndMetadata.<init>(OffsetAndMetadata.java:69)
at
org.apache.flink.connector.kafka.source.reader.KafkaSourceReader.snapshotState(KafkaSourceReader.java:96)
at
org.apache.flink.streaming.api.operators.SourceOperator.snapshotState(SourceOperator.java:288)
at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:205)
... 20 more
________________________________
From: Till Rohrmann <[email protected]>
Sent: Thursday, April 29, 2021 18:44
To: Lars Skjærven <[email protected]>
Cc: Becket Qin <[email protected]>; [email protected]
<[email protected]>
Subject: Re: KafkaSourceBuilder causing invalid negative offset on checkpointing
Thanks for the additional information Lars. Could you maybe also share the full
stack traces of the errors you see when the checkpoint fails?
@Becket Qin<mailto:[email protected]> is it a known issue with the new Kafka
sources trying to checkpoint negative offsets?
Cheers,
Till
On Thu, Apr 29, 2021 at 1:06 PM Lars Skjærven
<[email protected]<mailto:[email protected]>> wrote:
Thanks Till.
Here is how we created the KafkaSource:
val sensorSource = KafkaSource.builder[SensorInput]()
.setBootstrapServers(myConfig.kafkaBrokers)
.setGroupId(myConfig.kafkaGroupId)
.setTopics(myConfig.kafkaTopicIn)
.setDeserializer(new SensorInputPBDeserializationSchema)
.setStartingOffsets(OffsetsInitializer.earliest())
.build()
The stream was built with
env.fromSource(sensorSource , WatermarkStrategy.forMonotonousTimestamps(),
"sensor events")
The SensorInputPBDeserializationSchema is a basic KafkaRecordDeserializer that
does SensorInputPB.parseFrom(record.value()) and finally collector.collect(v)
>From here on we're doing a keyed windowed aggregation with
>.keyBy(...).window(EventTimeSessionWindows.withGap(Time.seconds(60))).aggregate(new
> SensorEventAggregator)
L
________________________________
From: Till Rohrmann <[email protected]<mailto:[email protected]>>
Sent: Thursday, April 29, 2021 09:16
To: Lars Skjærven <[email protected]<mailto:[email protected]>>; Becket
Qin <[email protected]<mailto:[email protected]>>
Cc: [email protected]<mailto:[email protected]>
<[email protected]<mailto:[email protected]>>
Subject: Re: KafkaSourceBuilder causing invalid negative offset on checkpointing
Hi Lars,
The KafkaSourceBuilder constructs the new KafkaSource which has not been fully
hardened in 1.12.2. In fact, it should not be documented yet. I think you are
running into an instability/bug of. The new Kafka source should be hardened a
lot more in the 1.13.0 release.
Could you tell us exactly how you created the KafkaSource so that we can verify
that this problem has been properly fixed in the 1.13.0 release? I am also
pulling in Becket who is the original author of this connector. He might be
able to tell you more.
Cheers,
Till
On Wed, Apr 28, 2021 at 10:36 AM Lars Skjærven
<[email protected]<mailto:[email protected]>> wrote:
Hello,
I ran into some issues when using the new KafkaSourceBuilder (running Flink
1.12.2, scala 2.12.13, on ververica platform in a container with java 8).
Initially it generated warnings on kafka configuration, but the job was able to
consume and produce messages.
The configuration 'client.id.prefix' was supplied but isn't a known config.
The configuration
'partition.discovery.interval.ms<http://partition.discovery.interval.ms>' was
supplied but isn't a known config.
Finally the job crashed with a checkpointing error:
java.lang.Exception: Could not perform checkpoint 10 for operator Source:
progress source (4/6)#9.
....
Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not
complete snapshot 10 for operator Source: progress source (4/6)#9. Failure
reason: Checkpoint was declined.
...
Caused by: java.lang.IllegalArgumentException: Invalid negative offset
Switching back to using FlinkKafkaConsumer, the warnings on the kafka config
disapeared, and the job was able to checkpoint successfully.
I'm wondering if the warnings and the errors are connected, and if there is a
compatibility issue between the newer KafkaSourceBuilder and Kafka 2.5 ?
Thanks,
L