Hi, Thanks for reporting. I think it is a Flink bug and have created FLINK-21522 for it. You could track progress there.
FLINK-21522: https://issues.apache.org/jira/browse/FLINK-21522 Best, Kezhu Wang On February 28, 2021 at 00:59:04, Meissner, Dylan ( dylan.t.meiss...@nordstrom.com) wrote: I have an embedded function with a SinkFunction as an egress, implemented as this pseudo-code: val serializationSchema = KafkaSchemaSerializationSchema(... props required to use a Confluent Schema Registry with Avro, auth etc ...) return SinkFunctionSpec(EGRESS_ID, FlinkKafkaProducer(serializationSchema, props, AT_LEAST_ONCE)) Checkpointing and taking a savepoint without stopping work as expected. However, when I run "flink stop <job-id>" or even "flink stop --drain <job-id>", the operation never completes, reporting IN_PROGRESS until I hit the "failure-cause: org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint expired before completing" CompletedException. In the "Checkpoint History" it shows only 2 of my 3 operators completed their work: Source: my-ingress-ingress -> router (my-ingress) | acknowledge: 1/1 (100%) | end-to-end duration: 638ms | data-size 1.38 KB feedback-union -> functions -> Sink: my-egress-egress | acknowledge 0/1 0% | end-to-end duration: n/a | data-size: n/a feedback | acknowledge: 1/1 (100%) | end-to-end duration: 626ms | data-size: 0 B I've been unable to gain any insights from logs so far. Thoughts?