Hey Jan,

I agree that silently ignoring the parameter is misleading and, in my case,
time consuming.

I will gladly create the JIRA and PR. I do have some other things I want to
contribute to Beam. Will get to them soon.

On Fri, Feb 4, 2022 at 5:56 AM Jan Lukavský <je...@seznam.cz> wrote:

> +dev <d...@beam.apache.org>
>
> Hi Cristian,
>
> the savepointPath should not be ignored. We need to verify if local
> environment supports savepoints (I suppose it does) and in that case we
> should use it. In the case it does not we should throw exception as silent
> ignoring of the savepoint is misleading.
>
> Would you file a JIRA? Or possibly create a PR to fix this?
>
> Best,
>
>  Jan
> On 2/3/22 07:12, Cristian Constantinescu wrote:
>
> Hi everyone,
>
> I've done some digging within the Beam source code. It looks like when the
> flinkMaster argument is not set, the savepointPath is not used at all. [1]
>
> In fact the only time the savepointPath argument is used within all of
> Beam's source code is on lines 183 and 186 of the same file. [2]
>
> Of course, I did all my testing locally on my dev box with the embedded
> Flink cluster that Beam starts, which from the looks of it, does NOT use
> the savepointPath at all.
>
> If someone familiar with the code can confirm this finding, I can update
> the documentation to explicitly state that savepoint resuming is not
> supported locally.
>
> I will do more testing around this with a real Flink cluster and see if
> the behavior is different than the one described in my first email.
>
> Thanks,
> Cristian
>
> [1]
> https://github.com/apache/beam/blob/51e0e4ef82feaae251b37a0288ad7a04ee603086/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java#L174
> [2]
> https://github.com/apache/beam/blob/51e0e4ef82feaae251b37a0288ad7a04ee603086/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java#L183
>
> On Wed, Feb 2, 2022 at 3:01 PM Cristian Constantinescu <zei...@gmail.com>
> wrote:
>
>> Hey Pavel,
>>
>> Thanks for the quick reply. Pardon me as I cannot copy/paste straight
>> from the IDE, copying by hand:
>>
>> KafkaIO.<Pojo>read()
>> .withBootStrapServer("address")
>> .withTopic("topic")
>>
>> .withKeyDeserializer(StringDeserializer.class)
>>
>> .withValueDeserializer(ConfluentSchemaRegistryDesProvider.of(...))
>>
>> .withConsumerConfigUpdates(map)
>> .withReadCommitted()
>> .commitOffsetInFinalize()
>>
>> .withProcessingTime();
>>
>>
>> The config map is:
>> enable.auto.commit -> false
>> group.id -> some group
>> auto.offset.reset -> earliest
>> specific.avro.reader -> false
>>
>>
>> On Wed, Feb 2, 2022 at 2:44 PM Pavel Solomin <p.o.solo...@gmail.com>
>> wrote:
>>
>>> Hello Christian,
>>>
>>> Thanks for posting here the detailed scenario of your experiments. I
>>> think it may be important to share your KafkaIO configuration here too. For
>>> example, are you setting this config anyhow?
>>> https://beam.apache.org/releases/javadoc/2.35.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#commitOffsetsInFinalize--
>>>
>>> Best Regards,
>>> Pavel Solomin
>>>
>>> Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
>>> <https://www.linkedin.com/in/pavelsolomin>
>>>
>>>
>>>
>>>
>>>
>>> On Wed, 2 Feb 2022 at 19:20, Cristian Constantinescu <zei...@gmail.com>
>>> wrote:
>>>
>>>> Hi everyone,
>>>>
>>>> I'm trying to figure out how pipeline state works with Beam running on
>>>> Flink Classic. Would appreciate some help with the below.
>>>>
>>>> My understanding is that on recovery (whether from a checkpoint or
>>>> savepoint), Flink recreates the operators (I guess those are DoFns in Beam)
>>>> with whatever state they had when the pipeline crashed. For example the
>>>> Kafka operator might contain the latest *safe* offset to restart from. But
>>>> I'm not seeing this when I introduce exceptions in the pipeline.
>>>>
>>>> My pipeline is as follows:
>>>> 1. Read a Kafka topic from start
>>>> 2. Have a DoFn that stores all incoming messages in a BagState
>>>> 3. Above DoFn triggers a timer set in such a way that it triggers after
>>>> there are a few checkpoints created and kept because of
>>>> --externalizeCheckpointsEnabled = true. This timer handler then outputs the
>>>> elements to the next operator, in this case KafkaIo.Write.
>>>> 4. Before the timer in #3 is executed manually trigger an exception
>>>> (listen to another kafka topic, and throw any time a new message comes in)
>>>>
>>>> What I observe:
>>>> 1. In #4 above Flink tries to process the exception twice then stops
>>>> the pipeline (because numberOfExecutionRetries =2 )
>>>> 2. After the pipeline is stopped, I see the checkpoints are kept in the
>>>> configured directory
>>>> 3. If I restart the pipeline (with --savepointPath = <path to latest
>>>> checkpoint from first run>):
>>>> 3a. No messages are read from kafka, because the Kafka reader reached
>>>> the end of the topic during the first run
>>>> 3b. StartBundles are not executed for my DoFn. Indicating that the DoFn
>>>> isn't even started
>>>> 3c. The timer in #3 is never executed, hence there is data loss as the
>>>> elements I had in my DoFn state are never processed
>>>> 4. If I manually reset the offset to the start of the topic and restart
>>>> the pipeline (with --savepointPath = <path to latest checkpoint from first
>>>> run>):
>>>> 4a. StartBundle methods are called
>>>> 4b. In ProcessElement, the BagState is empty on the first received
>>>> message. If I'm restoring from a checkpoint/savepoint, I would expect this
>>>> state to be filled.
>>>>
>>>> Is this correct behaviour? Am I doing something wrong?
>>>>
>>>> Thanks,
>>>> Cristian
>>>>
>>>> Other quirks I found:
>>>> a. If KafkaIO.Read is configured to read from the latest offset, and
>>>> there is an exception thrown in the pipeline before the first checkpoint
>>>> happens (let's say on the first message that comes in), when Flink retries
>>>> KafkaIO reads from the latest offset again. That means that the message
>>>> that caused the exception is not reprocessed. On the other hand, if the
>>>> exception is thrown after the first checkpoint, that message will be tried
>>>> twice (because numberOfExecutionRetries =2 ), and then the pipeline will
>>>> exit. I think this is working as designed but it feels a little weird
>>>> that the behaviour is different depending if there's a checkpoint or not.
>>>>
>>>> b. When KafkaIO.Write is configured with .withEOS(number, "group"), and
>>>> there is an exception thrown in the pipeline, the Flink job doesn't exit. I
>>>> think there is a kafka producer in KafkaExactlyOnceSink that is not closed
>>>> correctly.
>>>>
>>>

Reply via email to