Hi,

You could also try `cancel —withSavepoint [savepointDir]` even it is in
deprecation. Comparing to take-savepoints and then cancel approach, there
will be no checkpoints in between. This may be important if there are two
phase commit operators in your job.


Best,
Kezhu Wang


On February 28, 2021 at 20:50:29, Meissner, Dylan (
dylan.t.meiss...@nordstrom.com) wrote:

Thank you for opening the bug and including the extra context.

I'll track the progress and, in the meantime, I will work around by taking
two separate actions when stopping job: take-savepoints, then cancel.
------------------------------
*From:* Kezhu Wang <kez...@gmail.com>
*Sent:* Sunday, February 28, 2021 12:31 AM
*To:* user@flink.apache.org <user@flink.apache.org>; Meissner, Dylan <
dylan.t.meiss...@nordstrom.com>
*Subject:* Re: Stateful functions 2.2 and stop with savepoint

Hi,

Thanks for reporting. I think it is a Flink bug and have created
FLINK-21522 for it. You could track progress there.


FLINK-21522: https://issues.apache.org/jira/browse/FLINK-21522


Best,
Kezhu Wang

On February 28, 2021 at 00:59:04, Meissner, Dylan (
dylan.t.meiss...@nordstrom.com) wrote:

I have an embedded function with a SinkFunction as an egress, implemented
as this pseudo-code:

val serializationSchema = KafkaSchemaSerializationSchema(... props required
to use a Confluent Schema Registry with Avro, auth etc ...)
return SinkFunctionSpec(EGRESS_ID, FlinkKafkaProducer(serializationSchema,
props, AT_LEAST_ONCE))

Checkpointing and taking a savepoint without stopping work as expected.

However, when I run "flink stop <job-id>" or even "flink stop --drain
<job-id>", the operation never completes, reporting IN_PROGRESS until I hit
the "failure-cause:
org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint expired
before completing" CompletedException.

In the "Checkpoint History" it shows only 2 of my 3 operators completed
their work:

Source: my-ingress-ingress -> router (my-ingress) | acknowledge: 1/1 (100%)
| end-to-end duration: 638ms | data-size 1.38 KB
feedback-union -> functions -> Sink: my-egress-egress | acknowledge 0/1 0%
| end-to-end duration: n/a | data-size: n/a
feedback | acknowledge: 1/1 (100%) | end-to-end duration: 626ms |
data-size: 0 B

I've been unable to gain any insights from logs so far. Thoughts?

Reply via email to