Hey Jan, I agree that silently ignoring the parameter is misleading and, in my case, time consuming.
I will gladly create the JIRA and PR. I do have some other things I want to contribute to Beam. Will get to them soon. On Fri, Feb 4, 2022 at 5:56 AM Jan Lukavský <je...@seznam.cz> wrote: > +dev <d...@beam.apache.org> > > Hi Cristian, > > the savepointPath should not be ignored. We need to verify if local > environment supports savepoints (I suppose it does) and in that case we > should use it. In the case it does not we should throw exception as silent > ignoring of the savepoint is misleading. > > Would you file a JIRA? Or possibly create a PR to fix this? > > Best, > > Jan > On 2/3/22 07:12, Cristian Constantinescu wrote: > > Hi everyone, > > I've done some digging within the Beam source code. It looks like when the > flinkMaster argument is not set, the savepointPath is not used at all. [1] > > In fact the only time the savepointPath argument is used within all of > Beam's source code is on lines 183 and 186 of the same file. [2] > > Of course, I did all my testing locally on my dev box with the embedded > Flink cluster that Beam starts, which from the looks of it, does NOT use > the savepointPath at all. > > If someone familiar with the code can confirm this finding, I can update > the documentation to explicitly state that savepoint resuming is not > supported locally. > > I will do more testing around this with a real Flink cluster and see if > the behavior is different than the one described in my first email. > > Thanks, > Cristian > > [1] > https://github.com/apache/beam/blob/51e0e4ef82feaae251b37a0288ad7a04ee603086/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java#L174 > [2] > https://github.com/apache/beam/blob/51e0e4ef82feaae251b37a0288ad7a04ee603086/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java#L183 > > On Wed, Feb 2, 2022 at 3:01 PM Cristian Constantinescu <zei...@gmail.com> > wrote: > >> Hey Pavel, >> >> Thanks for the quick reply. Pardon me as I cannot copy/paste straight >> from the IDE, copying by hand: >> >> KafkaIO.<Pojo>read() >> .withBootStrapServer("address") >> .withTopic("topic") >> >> .withKeyDeserializer(StringDeserializer.class) >> >> .withValueDeserializer(ConfluentSchemaRegistryDesProvider.of(...)) >> >> .withConsumerConfigUpdates(map) >> .withReadCommitted() >> .commitOffsetInFinalize() >> >> .withProcessingTime(); >> >> >> The config map is: >> enable.auto.commit -> false >> group.id -> some group >> auto.offset.reset -> earliest >> specific.avro.reader -> false >> >> >> On Wed, Feb 2, 2022 at 2:44 PM Pavel Solomin <p.o.solo...@gmail.com> >> wrote: >> >>> Hello Christian, >>> >>> Thanks for posting here the detailed scenario of your experiments. I >>> think it may be important to share your KafkaIO configuration here too. For >>> example, are you setting this config anyhow? >>> https://beam.apache.org/releases/javadoc/2.35.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#commitOffsetsInFinalize-- >>> >>> Best Regards, >>> Pavel Solomin >>> >>> Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin >>> <https://www.linkedin.com/in/pavelsolomin> >>> >>> >>> >>> >>> >>> On Wed, 2 Feb 2022 at 19:20, Cristian Constantinescu <zei...@gmail.com> >>> wrote: >>> >>>> Hi everyone, >>>> >>>> I'm trying to figure out how pipeline state works with Beam running on >>>> Flink Classic. Would appreciate some help with the below. >>>> >>>> My understanding is that on recovery (whether from a checkpoint or >>>> savepoint), Flink recreates the operators (I guess those are DoFns in Beam) >>>> with whatever state they had when the pipeline crashed. For example the >>>> Kafka operator might contain the latest *safe* offset to restart from. But >>>> I'm not seeing this when I introduce exceptions in the pipeline. >>>> >>>> My pipeline is as follows: >>>> 1. Read a Kafka topic from start >>>> 2. Have a DoFn that stores all incoming messages in a BagState >>>> 3. Above DoFn triggers a timer set in such a way that it triggers after >>>> there are a few checkpoints created and kept because of >>>> --externalizeCheckpointsEnabled = true. This timer handler then outputs the >>>> elements to the next operator, in this case KafkaIo.Write. >>>> 4. Before the timer in #3 is executed manually trigger an exception >>>> (listen to another kafka topic, and throw any time a new message comes in) >>>> >>>> What I observe: >>>> 1. In #4 above Flink tries to process the exception twice then stops >>>> the pipeline (because numberOfExecutionRetries =2 ) >>>> 2. After the pipeline is stopped, I see the checkpoints are kept in the >>>> configured directory >>>> 3. If I restart the pipeline (with --savepointPath = <path to latest >>>> checkpoint from first run>): >>>> 3a. No messages are read from kafka, because the Kafka reader reached >>>> the end of the topic during the first run >>>> 3b. StartBundles are not executed for my DoFn. Indicating that the DoFn >>>> isn't even started >>>> 3c. The timer in #3 is never executed, hence there is data loss as the >>>> elements I had in my DoFn state are never processed >>>> 4. If I manually reset the offset to the start of the topic and restart >>>> the pipeline (with --savepointPath = <path to latest checkpoint from first >>>> run>): >>>> 4a. StartBundle methods are called >>>> 4b. In ProcessElement, the BagState is empty on the first received >>>> message. If I'm restoring from a checkpoint/savepoint, I would expect this >>>> state to be filled. >>>> >>>> Is this correct behaviour? Am I doing something wrong? >>>> >>>> Thanks, >>>> Cristian >>>> >>>> Other quirks I found: >>>> a. If KafkaIO.Read is configured to read from the latest offset, and >>>> there is an exception thrown in the pipeline before the first checkpoint >>>> happens (let's say on the first message that comes in), when Flink retries >>>> KafkaIO reads from the latest offset again. That means that the message >>>> that caused the exception is not reprocessed. On the other hand, if the >>>> exception is thrown after the first checkpoint, that message will be tried >>>> twice (because numberOfExecutionRetries =2 ), and then the pipeline will >>>> exit. I think this is working as designed but it feels a little weird >>>> that the behaviour is different depending if there's a checkpoint or not. >>>> >>>> b. When KafkaIO.Write is configured with .withEOS(number, "group"), and >>>> there is an exception thrown in the pipeline, the Flink job doesn't exit. I >>>> think there is a kafka producer in KafkaExactlyOnceSink that is not closed >>>> correctly. >>>> >>>