Re: Beam State with the Flink Runner when things go wrong

2022-02-04 Thread Cristian Constantinescu
Hi everyone,

I did a little more testing.

Passing the Flink "-s" flag to Flink CLI to submit the job correctly
restores it from the given checkpoint:
flink run --detached -s "checkpoint path" -c Appclass fat.jar --streaming
--numberOfExecutionRetries=2 ... other flink parameters as specified in [1]

Passing the beam Flink pipeline option "savepoint_path" to the Flink CLI to
submit the job does NOT correctly restore it from the given checkpoint. For
the same reason as before, "flink_master" is not being set.
flink run --detached -c Appclass fat.jar --streaming
--numberOfExecutionRetries=2  --savepoint_path="checkpoint path" etc...

However, setting "flink_master" doesn't seem to work with the Flink CLI,
getting "The RemoteEnvironment cannot be instantiated when running in a
pre-defined context (such as Command Line Client, Scala Shell or
TestEnvironment)."
flink run --detached -c Appclass fat.jar --streaming
--numberOfExecutionRetries=2  --flink_master=localhost:portnumber
--savepoint_path="checkpoint path" etc...

As a workaround for now, I will use the -s parameter with the Flink CLI.
I'm surprised that this just came up, I'd think that restoring from
savepoint/checkpoint with Flink and Beam is a pretty common usage scenario.
I guess people would use the Flink parameters when possible. In my case, I
prefer to find the latest checkpoint/savepoint in code and use
options.setSavepointPath(path) programmatically before
Pipeline.create(options) call is made.

Cheers,
Cristian

[1] https://beam.apache.org/documentation/runners/flink/


On Fri, Feb 4, 2022 at 10:17 AM Cristian Constantinescu 
wrote:

> Hey Jan,
>
> I agree that silently ignoring the parameter is misleading and, in my
> case, time consuming.
>
> I will gladly create the JIRA and PR. I do have some other things I want
> to contribute to Beam. Will get to them soon.
>
> On Fri, Feb 4, 2022 at 5:56 AM Jan Lukavský  wrote:
>
>> +dev 
>>
>> Hi Cristian,
>>
>> the savepointPath should not be ignored. We need to verify if local
>> environment supports savepoints (I suppose it does) and in that case we
>> should use it. In the case it does not we should throw exception as silent
>> ignoring of the savepoint is misleading.
>>
>> Would you file a JIRA? Or possibly create a PR to fix this?
>>
>> Best,
>>
>>  Jan
>> On 2/3/22 07:12, Cristian Constantinescu wrote:
>>
>> Hi everyone,
>>
>> I've done some digging within the Beam source code. It looks like when
>> the flinkMaster argument is not set, the savepointPath is not used at all.
>> [1]
>>
>> In fact the only time the savepointPath argument is used within all of
>> Beam's source code is on lines 183 and 186 of the same file. [2]
>>
>> Of course, I did all my testing locally on my dev box with the embedded
>> Flink cluster that Beam starts, which from the looks of it, does NOT use
>> the savepointPath at all.
>>
>> If someone familiar with the code can confirm this finding, I can update
>> the documentation to explicitly state that savepoint resuming is not
>> supported locally.
>>
>> I will do more testing around this with a real Flink cluster and see if
>> the behavior is different than the one described in my first email.
>>
>> Thanks,
>> Cristian
>>
>> [1]
>> https://github.com/apache/beam/blob/51e0e4ef82feaae251b37a0288ad7a04ee603086/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java#L174
>> [2]
>> https://github.com/apache/beam/blob/51e0e4ef82feaae251b37a0288ad7a04ee603086/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java#L183
>>
>> On Wed, Feb 2, 2022 at 3:01 PM Cristian Constantinescu 
>> wrote:
>>
>>> Hey Pavel,
>>>
>>> Thanks for the quick reply. Pardon me as I cannot copy/paste straight
>>> from the IDE, copying by hand:
>>>
>>> KafkaIO.read()
>>> .withBootStrapServer("address")
>>> .withTopic("topic")
>>>
>>> .withKeyDeserializer(StringDeserializer.class)
>>>
>>> .withValueDeserializer(ConfluentSchemaRegistryDesProvider.of(...))
>>>
>>> .withConsumerConfigUpdates(map)
>>> .withReadCommitted()
>>> .commitOffsetInFinalize()
>>>
>>> .withProcessingTime();
>>>
>>>
>>> The config map is:
>>> enable.auto.commit -> false
>>> group.id -> some group
>>> auto.offset.reset -> earliest
>>> specific.avro.reader -> false
>>>
>>>
>>> On Wed, Feb 2, 2022 at 2:44 PM Pavel Solomin 
>>> wrote:
>>>
 Hello Christian,

 Thanks for posting here the detailed scenario of your experiments. I
 think it may be important to share your KafkaIO configuration here too. For
 example, are you setting this config anyhow?
 https://beam.apache.org/releases/javadoc/2.35.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#commitOffsetsInFinalize--

 Best Regards,
 Pavel Solomin

 Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
 





 On Wed, 2 Feb 2022 at 19:20, Cristian Constantinescu 
 wrote:

> Hi everyone,
>
> 

Re: Beam State with the Flink Runner when things go wrong

2022-02-04 Thread Cristian Constantinescu
Hey Jan,

I agree that silently ignoring the parameter is misleading and, in my case,
time consuming.

I will gladly create the JIRA and PR. I do have some other things I want to
contribute to Beam. Will get to them soon.

On Fri, Feb 4, 2022 at 5:56 AM Jan Lukavský  wrote:

> +dev 
>
> Hi Cristian,
>
> the savepointPath should not be ignored. We need to verify if local
> environment supports savepoints (I suppose it does) and in that case we
> should use it. In the case it does not we should throw exception as silent
> ignoring of the savepoint is misleading.
>
> Would you file a JIRA? Or possibly create a PR to fix this?
>
> Best,
>
>  Jan
> On 2/3/22 07:12, Cristian Constantinescu wrote:
>
> Hi everyone,
>
> I've done some digging within the Beam source code. It looks like when the
> flinkMaster argument is not set, the savepointPath is not used at all. [1]
>
> In fact the only time the savepointPath argument is used within all of
> Beam's source code is on lines 183 and 186 of the same file. [2]
>
> Of course, I did all my testing locally on my dev box with the embedded
> Flink cluster that Beam starts, which from the looks of it, does NOT use
> the savepointPath at all.
>
> If someone familiar with the code can confirm this finding, I can update
> the documentation to explicitly state that savepoint resuming is not
> supported locally.
>
> I will do more testing around this with a real Flink cluster and see if
> the behavior is different than the one described in my first email.
>
> Thanks,
> Cristian
>
> [1]
> https://github.com/apache/beam/blob/51e0e4ef82feaae251b37a0288ad7a04ee603086/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java#L174
> [2]
> https://github.com/apache/beam/blob/51e0e4ef82feaae251b37a0288ad7a04ee603086/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java#L183
>
> On Wed, Feb 2, 2022 at 3:01 PM Cristian Constantinescu 
> wrote:
>
>> Hey Pavel,
>>
>> Thanks for the quick reply. Pardon me as I cannot copy/paste straight
>> from the IDE, copying by hand:
>>
>> KafkaIO.read()
>> .withBootStrapServer("address")
>> .withTopic("topic")
>>
>> .withKeyDeserializer(StringDeserializer.class)
>>
>> .withValueDeserializer(ConfluentSchemaRegistryDesProvider.of(...))
>>
>> .withConsumerConfigUpdates(map)
>> .withReadCommitted()
>> .commitOffsetInFinalize()
>>
>> .withProcessingTime();
>>
>>
>> The config map is:
>> enable.auto.commit -> false
>> group.id -> some group
>> auto.offset.reset -> earliest
>> specific.avro.reader -> false
>>
>>
>> On Wed, Feb 2, 2022 at 2:44 PM Pavel Solomin 
>> wrote:
>>
>>> Hello Christian,
>>>
>>> Thanks for posting here the detailed scenario of your experiments. I
>>> think it may be important to share your KafkaIO configuration here too. For
>>> example, are you setting this config anyhow?
>>> https://beam.apache.org/releases/javadoc/2.35.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#commitOffsetsInFinalize--
>>>
>>> Best Regards,
>>> Pavel Solomin
>>>
>>> Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
>>> 
>>>
>>>
>>>
>>>
>>>
>>> On Wed, 2 Feb 2022 at 19:20, Cristian Constantinescu 
>>> wrote:
>>>
 Hi everyone,

 I'm trying to figure out how pipeline state works with Beam running on
 Flink Classic. Would appreciate some help with the below.

 My understanding is that on recovery (whether from a checkpoint or
 savepoint), Flink recreates the operators (I guess those are DoFns in Beam)
 with whatever state they had when the pipeline crashed. For example the
 Kafka operator might contain the latest *safe* offset to restart from. But
 I'm not seeing this when I introduce exceptions in the pipeline.

 My pipeline is as follows:
 1. Read a Kafka topic from start
 2. Have a DoFn that stores all incoming messages in a BagState
 3. Above DoFn triggers a timer set in such a way that it triggers after
 there are a few checkpoints created and kept because of
 --externalizeCheckpointsEnabled = true. This timer handler then outputs the
 elements to the next operator, in this case KafkaIo.Write.
 4. Before the timer in #3 is executed manually trigger an exception
 (listen to another kafka topic, and throw any time a new message comes in)

 What I observe:
 1. In #4 above Flink tries to process the exception twice then stops
 the pipeline (because numberOfExecutionRetries =2 )
 2. After the pipeline is stopped, I see the checkpoints are kept in the
 configured directory
 3. If I restart the pipeline (with --savepointPath = >>> checkpoint from first run>):
 3a. No messages are read from kafka, because the Kafka reader reached
 the end of the topic during the first run
 3b. StartBundles are not executed for my DoFn. Indicating that the DoFn
 isn't even started
 3c. The timer in #3 is never executed, hence there is data loss a

Re: Beam State with the Flink Runner when things go wrong

2022-02-04 Thread Jan Lukavský

+dev 

Hi Cristian,

the savepointPath should not be ignored. We need to verify if local 
environment supports savepoints (I suppose it does) and in that case we 
should use it. In the case it does not we should throw exception as 
silent ignoring of the savepoint is misleading.


Would you file a JIRA? Or possibly create a PR to fix this?

Best,

 Jan

On 2/3/22 07:12, Cristian Constantinescu wrote:

Hi everyone,

I've done some digging within the Beam source code. It looks like when 
the flinkMaster argument is not set, the savepointPath is not used at 
all. [1]


In fact the only time the savepointPath argument is used within all of 
Beam's source code is on lines 183 and 186 of the same file. [2]


Of course, I did all my testing locally on my dev box with the 
embedded Flink cluster that Beam starts, which from the looks of it, 
does NOT use the savepointPath at all.


If someone familiar with the code can confirm this finding, I can 
update the documentation to explicitly state that savepoint resuming 
is not supported locally.


I will do more testing around this with a real Flink cluster and see 
if the behavior is different than the one described in my first email.


Thanks,
Cristian

[1] 
https://github.com/apache/beam/blob/51e0e4ef82feaae251b37a0288ad7a04ee603086/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java#L174
[2] 
https://github.com/apache/beam/blob/51e0e4ef82feaae251b37a0288ad7a04ee603086/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java#L183


On Wed, Feb 2, 2022 at 3:01 PM Cristian Constantinescu 
 wrote:


Hey Pavel,

Thanks for the quick reply. Pardon me as I cannot copy/paste
straight from the IDE, copying by hand:

KafkaIO.read()
.withBootStrapServer("address")
.withTopic("topic")

.withKeyDeserializer(StringDeserializer.class)

.withValueDeserializer(ConfluentSchemaRegistryDesProvider.of(...))

.withConsumerConfigUpdates(map)
.withReadCommitted()
.commitOffsetInFinalize()

.withProcessingTime();


The config map is:
enable.auto.commit -> false
group.id  -> some group
auto.offset.reset -> earliest
specific.avro.reader -> false


On Wed, Feb 2, 2022 at 2:44 PM Pavel Solomin
 wrote:

Hello Christian,

Thanks for posting here the detailed scenario of your
experiments. I think it may be important to share your KafkaIO
configuration here too. For example, are you setting this
config anyhow?

https://beam.apache.org/releases/javadoc/2.35.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#commitOffsetsInFinalize--

Best Regards,
Pavel Solomin

Tel: +351 962 950 692| Skype: pavel_solomin | Linkedin







On Wed, 2 Feb 2022 at 19:20, Cristian Constantinescu
 wrote:

Hi everyone,

I'm trying to figure out how pipeline state works with
Beam running on Flink Classic. Would appreciate some help
with the below.

My understanding is that on recovery (whether from a
checkpoint or savepoint), Flink recreates the operators (I
guess those are DoFns in Beam) with whatever state they
had when the pipeline crashed. For example the Kafka
operator might contain the latest *safe* offset to restart
from. But I'm not seeing this when I introduce exceptions
in the pipeline.

My pipeline is as follows:
1. Read a Kafka topic from start
2. Have a DoFn that stores all incoming messages in a BagState
3. Above DoFn triggers a timer set in such a way that it
triggers after there are a few checkpoints created and
kept because of --externalizeCheckpointsEnabled = true.
This timer handler then outputs the elements to the next
operator, in this case KafkaIo.Write.
4. Before the timer in #3 is executed manually trigger an
exception (listen to another kafka topic, and throw any
time a new message comes in)

What I observe:
1. In #4 above Flink tries to process the exception twice
then stops the pipeline (because numberOfExecutionRetries =2 )
2. After the pipeline is stopped, I see the checkpoints
are kept in the configured directory
3. If I restart the pipeline (with --savepointPath = ):
3a. No messages are read from kafka, because the Kafka
reader reached the end of the topic during the first run
3b. StartBundles are not executed for my DoFn. Indicating
that the DoFn isn't even started
3c. The timer in #3 is never executed, hence there is data
loss as