Hi everyone,

I did a little more testing.

Passing the Flink "-s" flag to Flink CLI to submit the job correctly
restores it from the given checkpoint:
flink run --detached -s "checkpoint path" -c Appclass fat.jar --streaming
--numberOfExecutionRetries=2 ... other flink parameters as specified in [1]

Passing the beam Flink pipeline option "savepoint_path" to the Flink CLI to
submit the job does NOT correctly restore it from the given checkpoint. For
the same reason as before, "flink_master" is not being set.
flink run --detached -c Appclass fat.jar --streaming
--numberOfExecutionRetries=2  --savepoint_path="checkpoint path" etc...

However, setting "flink_master" doesn't seem to work with the Flink CLI,
getting "The RemoteEnvironment cannot be instantiated when running in a
pre-defined context (such as Command Line Client, Scala Shell or
TestEnvironment)."
flink run --detached -c Appclass fat.jar --streaming
--numberOfExecutionRetries=2  --flink_master=localhost:portnumber
--savepoint_path="checkpoint path" etc...

As a workaround for now, I will use the -s parameter with the Flink CLI.
I'm surprised that this just came up, I'd think that restoring from
savepoint/checkpoint with Flink and Beam is a pretty common usage scenario.
I guess people would use the Flink parameters when possible. In my case, I
prefer to find the latest checkpoint/savepoint in code and use
options.setSavepointPath(path) programmatically before
Pipeline.create(options) call is made.

Cheers,
Cristian

[1] https://beam.apache.org/documentation/runners/flink/


On Fri, Feb 4, 2022 at 10:17 AM Cristian Constantinescu <zei...@gmail.com>
wrote:

> Hey Jan,
>
> I agree that silently ignoring the parameter is misleading and, in my
> case, time consuming.
>
> I will gladly create the JIRA and PR. I do have some other things I want
> to contribute to Beam. Will get to them soon.
>
> On Fri, Feb 4, 2022 at 5:56 AM Jan Lukavský <je...@seznam.cz> wrote:
>
>> +dev <d...@beam.apache.org>
>>
>> Hi Cristian,
>>
>> the savepointPath should not be ignored. We need to verify if local
>> environment supports savepoints (I suppose it does) and in that case we
>> should use it. In the case it does not we should throw exception as silent
>> ignoring of the savepoint is misleading.
>>
>> Would you file a JIRA? Or possibly create a PR to fix this?
>>
>> Best,
>>
>>  Jan
>> On 2/3/22 07:12, Cristian Constantinescu wrote:
>>
>> Hi everyone,
>>
>> I've done some digging within the Beam source code. It looks like when
>> the flinkMaster argument is not set, the savepointPath is not used at all.
>> [1]
>>
>> In fact the only time the savepointPath argument is used within all of
>> Beam's source code is on lines 183 and 186 of the same file. [2]
>>
>> Of course, I did all my testing locally on my dev box with the embedded
>> Flink cluster that Beam starts, which from the looks of it, does NOT use
>> the savepointPath at all.
>>
>> If someone familiar with the code can confirm this finding, I can update
>> the documentation to explicitly state that savepoint resuming is not
>> supported locally.
>>
>> I will do more testing around this with a real Flink cluster and see if
>> the behavior is different than the one described in my first email.
>>
>> Thanks,
>> Cristian
>>
>> [1]
>> https://github.com/apache/beam/blob/51e0e4ef82feaae251b37a0288ad7a04ee603086/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java#L174
>> [2]
>> https://github.com/apache/beam/blob/51e0e4ef82feaae251b37a0288ad7a04ee603086/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java#L183
>>
>> On Wed, Feb 2, 2022 at 3:01 PM Cristian Constantinescu <zei...@gmail.com>
>> wrote:
>>
>>> Hey Pavel,
>>>
>>> Thanks for the quick reply. Pardon me as I cannot copy/paste straight
>>> from the IDE, copying by hand:
>>>
>>> KafkaIO.<Pojo>read()
>>> .withBootStrapServer("address")
>>> .withTopic("topic")
>>>
>>> .withKeyDeserializer(StringDeserializer.class)
>>>
>>> .withValueDeserializer(ConfluentSchemaRegistryDesProvider.of(...))
>>>
>>> .withConsumerConfigUpdates(map)
>>> .withReadCommitted()
>>> .commitOffsetInFinalize()
>>>
>>> .withProcessingTime();
>>>
>>>
>>> The config map is:
>>> enable.auto.commit -> false
>>> group.id -> some group
>>> auto.offset.reset -> earliest
>>> specific.avro.reader -> false
>>>
>>>
>>> On Wed, Feb 2, 2022 at 2:44 PM Pavel Solomin <p.o.solo...@gmail.com>
>>> wrote:
>>>
>>>> Hello Christian,
>>>>
>>>> Thanks for posting here the detailed scenario of your experiments. I
>>>> think it may be important to share your KafkaIO configuration here too. For
>>>> example, are you setting this config anyhow?
>>>> https://beam.apache.org/releases/javadoc/2.35.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#commitOffsetsInFinalize--
>>>>
>>>> Best Regards,
>>>> Pavel Solomin
>>>>
>>>> Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
>>>> <https://www.linkedin.com/in/pavelsolomin>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Wed, 2 Feb 2022 at 19:20, Cristian Constantinescu <zei...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi everyone,
>>>>>
>>>>> I'm trying to figure out how pipeline state works with Beam running on
>>>>> Flink Classic. Would appreciate some help with the below.
>>>>>
>>>>> My understanding is that on recovery (whether from a checkpoint or
>>>>> savepoint), Flink recreates the operators (I guess those are DoFns in 
>>>>> Beam)
>>>>> with whatever state they had when the pipeline crashed. For example the
>>>>> Kafka operator might contain the latest *safe* offset to restart from. But
>>>>> I'm not seeing this when I introduce exceptions in the pipeline.
>>>>>
>>>>> My pipeline is as follows:
>>>>> 1. Read a Kafka topic from start
>>>>> 2. Have a DoFn that stores all incoming messages in a BagState
>>>>> 3. Above DoFn triggers a timer set in such a way that it triggers
>>>>> after there are a few checkpoints created and kept because of
>>>>> --externalizeCheckpointsEnabled = true. This timer handler then outputs 
>>>>> the
>>>>> elements to the next operator, in this case KafkaIo.Write.
>>>>> 4. Before the timer in #3 is executed manually trigger an exception
>>>>> (listen to another kafka topic, and throw any time a new message comes in)
>>>>>
>>>>> What I observe:
>>>>> 1. In #4 above Flink tries to process the exception twice then stops
>>>>> the pipeline (because numberOfExecutionRetries =2 )
>>>>> 2. After the pipeline is stopped, I see the checkpoints are kept in
>>>>> the configured directory
>>>>> 3. If I restart the pipeline (with --savepointPath = <path to latest
>>>>> checkpoint from first run>):
>>>>> 3a. No messages are read from kafka, because the Kafka reader reached
>>>>> the end of the topic during the first run
>>>>> 3b. StartBundles are not executed for my DoFn. Indicating that the
>>>>> DoFn isn't even started
>>>>> 3c. The timer in #3 is never executed, hence there is data loss as the
>>>>> elements I had in my DoFn state are never processed
>>>>> 4. If I manually reset the offset to the start of the topic and
>>>>> restart the pipeline (with --savepointPath = <path to latest checkpoint
>>>>> from first run>):
>>>>> 4a. StartBundle methods are called
>>>>> 4b. In ProcessElement, the BagState is empty on the first received
>>>>> message. If I'm restoring from a checkpoint/savepoint, I would expect this
>>>>> state to be filled.
>>>>>
>>>>> Is this correct behaviour? Am I doing something wrong?
>>>>>
>>>>> Thanks,
>>>>> Cristian
>>>>>
>>>>> Other quirks I found:
>>>>> a. If KafkaIO.Read is configured to read from the latest offset, and
>>>>> there is an exception thrown in the pipeline before the first checkpoint
>>>>> happens (let's say on the first message that comes in), when Flink retries
>>>>> KafkaIO reads from the latest offset again. That means that the message
>>>>> that caused the exception is not reprocessed. On the other hand, if the
>>>>> exception is thrown after the first checkpoint, that message will be tried
>>>>> twice (because numberOfExecutionRetries =2 ), and then the pipeline will
>>>>> exit. I think this is working as designed but it feels a little weird
>>>>> that the behaviour is different depending if there's a checkpoint or not.
>>>>>
>>>>> b. When KafkaIO.Write is configured with .withEOS(number, "group"),
>>>>> and there is an exception thrown in the pipeline, the Flink job doesn't
>>>>> exit. I think there is a kafka producer in KafkaExactlyOnceSink that is 
>>>>> not
>>>>> closed correctly.
>>>>>
>>>>

Reply via email to