Hi all, My BAD!!!
Sorry for apparent mess up in that moment. I will write a separate test for stream iterations. The stateful function part should be a separated issue. Best, Kezhu Wang On March 4, 2021 at 22:13:48, Piotr Nowojski (piotr.nowoj...@gmail.com) wrote: Hi Meissner, Can you clarify, are you talking about stateful functions? [1] Or the stream iterations [2]? The first e-mail suggests stateful functions, but the ticket that Kezhu created is talking about the latter. Piotrek [1] https://flink.apache.org/news/2020/04/07/release-statefun-2.0.0.html [2] https://ci.apache.org/projects/flink/flink-docs-stable/dev/datastream_api.html#iterations niedz., 28 lut 2021 o 15:33 Kezhu Wang <kez...@gmail.com> napisał(a): > Hi, > > You could also try `cancel —withSavepoint [savepointDir]` even it is in > deprecation. Comparing to take-savepoints and then cancel approach, there > will be no checkpoints in between. This may be important if there are two > phase commit operators in your job. > > > Best, > Kezhu Wang > > > On February 28, 2021 at 20:50:29, Meissner, Dylan ( > dylan.t.meiss...@nordstrom.com) wrote: > > Thank you for opening the bug and including the extra context. > > I'll track the progress and, in the meantime, I will work around by taking > two separate actions when stopping job: take-savepoints, then cancel. > ------------------------------ > *From:* Kezhu Wang <kez...@gmail.com> > *Sent:* Sunday, February 28, 2021 12:31 AM > *To:* user@flink.apache.org <user@flink.apache.org>; Meissner, Dylan < > dylan.t.meiss...@nordstrom.com> > *Subject:* Re: Stateful functions 2.2 and stop with savepoint > > Hi, > > Thanks for reporting. I think it is a Flink bug and have created > FLINK-21522 for it. You could track progress there. > > > FLINK-21522: https://issues.apache.org/jira/browse/FLINK-21522 > > > Best, > Kezhu Wang > > On February 28, 2021 at 00:59:04, Meissner, Dylan ( > dylan.t.meiss...@nordstrom.com) wrote: > > I have an embedded function with a SinkFunction as an egress, implemented > as this pseudo-code: > > val serializationSchema = KafkaSchemaSerializationSchema(... props > required to use a Confluent Schema Registry with Avro, auth etc ...) > return SinkFunctionSpec(EGRESS_ID, FlinkKafkaProducer(serializationSchema, > props, AT_LEAST_ONCE)) > > Checkpointing and taking a savepoint without stopping work as expected. > > However, when I run "flink stop <job-id>" or even "flink stop --drain > <job-id>", the operation never completes, reporting IN_PROGRESS until I hit > the "failure-cause: > org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint expired > before completing" CompletedException. > > In the "Checkpoint History" it shows only 2 of my 3 operators completed > their work: > > Source: my-ingress-ingress -> router (my-ingress) | acknowledge: 1/1 > (100%) | end-to-end duration: 638ms | data-size 1.38 KB > feedback-union -> functions -> Sink: my-egress-egress | acknowledge 0/1 0% > | end-to-end duration: n/a | data-size: n/a > feedback | acknowledge: 1/1 (100%) | end-to-end duration: 626ms | > data-size: 0 B > > I've been unable to gain any insights from logs so far. Thoughts? > >