Thank you for opening the bug and including the extra context.

I'll track the progress and, in the meantime, I will work around by taking two 
separate actions when stopping job: take-savepoints, then cancel.
________________________________
From: Kezhu Wang <kez...@gmail.com>
Sent: Sunday, February 28, 2021 12:31 AM
To: user@flink.apache.org <user@flink.apache.org>; Meissner, Dylan 
<dylan.t.meiss...@nordstrom.com>
Subject: Re: Stateful functions 2.2 and stop with savepoint

Hi,

Thanks for reporting. I think it is a Flink bug and have created FLINK-21522 
for it. You could track progress there.


FLINK-21522: https://issues.apache.org/jira/browse/FLINK-21522


Best,
Kezhu Wang


On February 28, 2021 at 00:59:04, Meissner, Dylan 
(dylan.t.meiss...@nordstrom.com<mailto:dylan.t.meiss...@nordstrom.com>) wrote:

I have an embedded function with a SinkFunction as an egress, implemented as 
this pseudo-code:

val serializationSchema = KafkaSchemaSerializationSchema(... props required to 
use a Confluent Schema Registry with Avro, auth etc ...)
return SinkFunctionSpec(EGRESS_ID, FlinkKafkaProducer(serializationSchema, 
props, AT_LEAST_ONCE))

Checkpointing and taking a savepoint without stopping work as expected.

However, when I run "flink stop <job-id>" or even "flink stop --drain 
<job-id>", the operation never completes, reporting IN_PROGRESS until I hit the 
"failure-cause: org.apache.flink.runtime.checkpoint.CheckpointException: 
Checkpoint expired before completing" CompletedException.

In the "Checkpoint History" it shows only 2 of my 3 operators completed their 
work:

Source: my-ingress-ingress -> router (my-ingress) | acknowledge: 1/1 (100%) | 
end-to-end duration: 638ms | data-size 1.38 KB
feedback-union -> functions -> Sink: my-egress-egress | acknowledge 0/1 0% | 
end-to-end duration: n/a | data-size: n/a
feedback | acknowledge: 1/1 (100%) | end-to-end duration: 626ms | data-size: 0 B

I've been unable to gain any insights from logs so far. Thoughts?

Reply via email to