Hi everyone, I did a little more testing.
Passing the Flink "-s" flag to Flink CLI to submit the job correctly restores it from the given checkpoint: flink run --detached -s "checkpoint path" -c Appclass fat.jar --streaming --numberOfExecutionRetries=2 ... other flink parameters as specified in [1] Passing the beam Flink pipeline option "savepoint_path" to the Flink CLI to submit the job does NOT correctly restore it from the given checkpoint. For the same reason as before, "flink_master" is not being set. flink run --detached -c Appclass fat.jar --streaming --numberOfExecutionRetries=2 --savepoint_path="checkpoint path" etc... However, setting "flink_master" doesn't seem to work with the Flink CLI, getting "The RemoteEnvironment cannot be instantiated when running in a pre-defined context (such as Command Line Client, Scala Shell or TestEnvironment)." flink run --detached -c Appclass fat.jar --streaming --numberOfExecutionRetries=2 --flink_master=localhost:portnumber --savepoint_path="checkpoint path" etc... As a workaround for now, I will use the -s parameter with the Flink CLI. I'm surprised that this just came up, I'd think that restoring from savepoint/checkpoint with Flink and Beam is a pretty common usage scenario. I guess people would use the Flink parameters when possible. In my case, I prefer to find the latest checkpoint/savepoint in code and use options.setSavepointPath(path) programmatically before Pipeline.create(options) call is made. Cheers, Cristian [1] https://beam.apache.org/documentation/runners/flink/ On Fri, Feb 4, 2022 at 10:17 AM Cristian Constantinescu <zei...@gmail.com> wrote: > Hey Jan, > > I agree that silently ignoring the parameter is misleading and, in my > case, time consuming. > > I will gladly create the JIRA and PR. I do have some other things I want > to contribute to Beam. Will get to them soon. > > On Fri, Feb 4, 2022 at 5:56 AM Jan Lukavský <je...@seznam.cz> wrote: > >> +dev <d...@beam.apache.org> >> >> Hi Cristian, >> >> the savepointPath should not be ignored. We need to verify if local >> environment supports savepoints (I suppose it does) and in that case we >> should use it. In the case it does not we should throw exception as silent >> ignoring of the savepoint is misleading. >> >> Would you file a JIRA? Or possibly create a PR to fix this? >> >> Best, >> >> Jan >> On 2/3/22 07:12, Cristian Constantinescu wrote: >> >> Hi everyone, >> >> I've done some digging within the Beam source code. It looks like when >> the flinkMaster argument is not set, the savepointPath is not used at all. >> [1] >> >> In fact the only time the savepointPath argument is used within all of >> Beam's source code is on lines 183 and 186 of the same file. [2] >> >> Of course, I did all my testing locally on my dev box with the embedded >> Flink cluster that Beam starts, which from the looks of it, does NOT use >> the savepointPath at all. >> >> If someone familiar with the code can confirm this finding, I can update >> the documentation to explicitly state that savepoint resuming is not >> supported locally. >> >> I will do more testing around this with a real Flink cluster and see if >> the behavior is different than the one described in my first email. >> >> Thanks, >> Cristian >> >> [1] >> https://github.com/apache/beam/blob/51e0e4ef82feaae251b37a0288ad7a04ee603086/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java#L174 >> [2] >> https://github.com/apache/beam/blob/51e0e4ef82feaae251b37a0288ad7a04ee603086/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java#L183 >> >> On Wed, Feb 2, 2022 at 3:01 PM Cristian Constantinescu <zei...@gmail.com> >> wrote: >> >>> Hey Pavel, >>> >>> Thanks for the quick reply. Pardon me as I cannot copy/paste straight >>> from the IDE, copying by hand: >>> >>> KafkaIO.<Pojo>read() >>> .withBootStrapServer("address") >>> .withTopic("topic") >>> >>> .withKeyDeserializer(StringDeserializer.class) >>> >>> .withValueDeserializer(ConfluentSchemaRegistryDesProvider.of(...)) >>> >>> .withConsumerConfigUpdates(map) >>> .withReadCommitted() >>> .commitOffsetInFinalize() >>> >>> .withProcessingTime(); >>> >>> >>> The config map is: >>> enable.auto.commit -> false >>> group.id -> some group >>> auto.offset.reset -> earliest >>> specific.avro.reader -> false >>> >>> >>> On Wed, Feb 2, 2022 at 2:44 PM Pavel Solomin <p.o.solo...@gmail.com> >>> wrote: >>> >>>> Hello Christian, >>>> >>>> Thanks for posting here the detailed scenario of your experiments. I >>>> think it may be important to share your KafkaIO configuration here too. For >>>> example, are you setting this config anyhow? >>>> https://beam.apache.org/releases/javadoc/2.35.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#commitOffsetsInFinalize-- >>>> >>>> Best Regards, >>>> Pavel Solomin >>>> >>>> Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin >>>> <https://www.linkedin.com/in/pavelsolomin> >>>> >>>> >>>> >>>> >>>> >>>> On Wed, 2 Feb 2022 at 19:20, Cristian Constantinescu <zei...@gmail.com> >>>> wrote: >>>> >>>>> Hi everyone, >>>>> >>>>> I'm trying to figure out how pipeline state works with Beam running on >>>>> Flink Classic. Would appreciate some help with the below. >>>>> >>>>> My understanding is that on recovery (whether from a checkpoint or >>>>> savepoint), Flink recreates the operators (I guess those are DoFns in >>>>> Beam) >>>>> with whatever state they had when the pipeline crashed. For example the >>>>> Kafka operator might contain the latest *safe* offset to restart from. But >>>>> I'm not seeing this when I introduce exceptions in the pipeline. >>>>> >>>>> My pipeline is as follows: >>>>> 1. Read a Kafka topic from start >>>>> 2. Have a DoFn that stores all incoming messages in a BagState >>>>> 3. Above DoFn triggers a timer set in such a way that it triggers >>>>> after there are a few checkpoints created and kept because of >>>>> --externalizeCheckpointsEnabled = true. This timer handler then outputs >>>>> the >>>>> elements to the next operator, in this case KafkaIo.Write. >>>>> 4. Before the timer in #3 is executed manually trigger an exception >>>>> (listen to another kafka topic, and throw any time a new message comes in) >>>>> >>>>> What I observe: >>>>> 1. In #4 above Flink tries to process the exception twice then stops >>>>> the pipeline (because numberOfExecutionRetries =2 ) >>>>> 2. After the pipeline is stopped, I see the checkpoints are kept in >>>>> the configured directory >>>>> 3. If I restart the pipeline (with --savepointPath = <path to latest >>>>> checkpoint from first run>): >>>>> 3a. No messages are read from kafka, because the Kafka reader reached >>>>> the end of the topic during the first run >>>>> 3b. StartBundles are not executed for my DoFn. Indicating that the >>>>> DoFn isn't even started >>>>> 3c. The timer in #3 is never executed, hence there is data loss as the >>>>> elements I had in my DoFn state are never processed >>>>> 4. If I manually reset the offset to the start of the topic and >>>>> restart the pipeline (with --savepointPath = <path to latest checkpoint >>>>> from first run>): >>>>> 4a. StartBundle methods are called >>>>> 4b. In ProcessElement, the BagState is empty on the first received >>>>> message. If I'm restoring from a checkpoint/savepoint, I would expect this >>>>> state to be filled. >>>>> >>>>> Is this correct behaviour? Am I doing something wrong? >>>>> >>>>> Thanks, >>>>> Cristian >>>>> >>>>> Other quirks I found: >>>>> a. If KafkaIO.Read is configured to read from the latest offset, and >>>>> there is an exception thrown in the pipeline before the first checkpoint >>>>> happens (let's say on the first message that comes in), when Flink retries >>>>> KafkaIO reads from the latest offset again. That means that the message >>>>> that caused the exception is not reprocessed. On the other hand, if the >>>>> exception is thrown after the first checkpoint, that message will be tried >>>>> twice (because numberOfExecutionRetries =2 ), and then the pipeline will >>>>> exit. I think this is working as designed but it feels a little weird >>>>> that the behaviour is different depending if there's a checkpoint or not. >>>>> >>>>> b. When KafkaIO.Write is configured with .withEOS(number, "group"), >>>>> and there is an exception thrown in the pipeline, the Flink job doesn't >>>>> exit. I think there is a kafka producer in KafkaExactlyOnceSink that is >>>>> not >>>>> closed correctly. >>>>> >>>>