+dev <mailto:d...@beam.apache.org>

Hi Cristian,

the savepointPath should not be ignored. We need to verify if local environment supports savepoints (I suppose it does) and in that case we should use it. In the case it does not we should throw exception as silent ignoring of the savepoint is misleading.

Would you file a JIRA? Or possibly create a PR to fix this?

Best,

 Jan

On 2/3/22 07:12, Cristian Constantinescu wrote:
Hi everyone,

I've done some digging within the Beam source code. It looks like when the flinkMaster argument is not set, the savepointPath is not used at all. [1]

In fact the only time the savepointPath argument is used within all of Beam's source code is on lines 183 and 186 of the same file. [2]

Of course, I did all my testing locally on my dev box with the embedded Flink cluster that Beam starts, which from the looks of it, does NOT use the savepointPath at all.

If someone familiar with the code can confirm this finding, I can update the documentation to explicitly state that savepoint resuming is not supported locally.

I will do more testing around this with a real Flink cluster and see if the behavior is different than the one described in my first email.

Thanks,
Cristian

[1] https://github.com/apache/beam/blob/51e0e4ef82feaae251b37a0288ad7a04ee603086/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java#L174 [2] https://github.com/apache/beam/blob/51e0e4ef82feaae251b37a0288ad7a04ee603086/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java#L183

On Wed, Feb 2, 2022 at 3:01 PM Cristian Constantinescu <zei...@gmail.com> wrote:

    Hey Pavel,

    Thanks for the quick reply. Pardon me as I cannot copy/paste
    straight from the IDE, copying by hand:

        KafkaIO.<Pojo>read()
        .withBootStrapServer("address")
        .withTopic("topic")

        .withKeyDeserializer(StringDeserializer.class)

        .withValueDeserializer(ConfluentSchemaRegistryDesProvider.of(...))

        .withConsumerConfigUpdates(map)
        .withReadCommitted()
        .commitOffsetInFinalize()

        .withProcessingTime();


    The config map is:
    enable.auto.commit -> false
    group.id <http://group.id> -> some group
    auto.offset.reset -> earliest
    specific.avro.reader -> false


    On Wed, Feb 2, 2022 at 2:44 PM Pavel Solomin
    <p.o.solo...@gmail.com> wrote:

        Hello Christian,

        Thanks for posting here the detailed scenario of your
        experiments. I think it may be important to share your KafkaIO
        configuration here too. For example, are you setting this
        config anyhow?
        
https://beam.apache.org/releases/javadoc/2.35.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#commitOffsetsInFinalize--

        Best Regards,
        Pavel Solomin

        Tel: +351 962 950 692| Skype: pavel_solomin | Linkedin
        <https://www.linkedin.com/in/pavelsolomin>






        On Wed, 2 Feb 2022 at 19:20, Cristian Constantinescu
        <zei...@gmail.com> wrote:

            Hi everyone,

            I'm trying to figure out how pipeline state works with
            Beam running on Flink Classic. Would appreciate some help
            with the below.

            My understanding is that on recovery (whether from a
            checkpoint or savepoint), Flink recreates the operators (I
            guess those are DoFns in Beam) with whatever state they
            had when the pipeline crashed. For example the Kafka
            operator might contain the latest *safe* offset to restart
            from. But I'm not seeing this when I introduce exceptions
            in the pipeline.

            My pipeline is as follows:
            1. Read a Kafka topic from start
            2. Have a DoFn that stores all incoming messages in a BagState
            3. Above DoFn triggers a timer set in such a way that it
            triggers after there are a few checkpoints created and
            kept because of --externalizeCheckpointsEnabled = true.
            This timer handler then outputs the elements to the next
            operator, in this case KafkaIo.Write.
            4. Before the timer in #3 is executed manually trigger an
            exception (listen to another kafka topic, and throw any
            time a new message comes in)

            What I observe:
            1. In #4 above Flink tries to process the exception twice
            then stops the pipeline (because numberOfExecutionRetries =2 )
            2. After the pipeline is stopped, I see the checkpoints
            are kept in the configured directory
            3. If I restart the pipeline (with --savepointPath = <path
            to latest checkpoint from first run>):
            3a. No messages are read from kafka, because the Kafka
            reader reached the end of the topic during the first run
            3b. StartBundles are not executed for my DoFn. Indicating
            that the DoFn isn't even started
            3c. The timer in #3 is never executed, hence there is data
            loss as the elements I had in my DoFn state are never
            processed
            4. If I manually reset the offset to the start of the
            topic and restart the pipeline (with --savepointPath =
            <path to latest checkpoint from first run>):
            4a. StartBundle methods are called
            4b. In ProcessElement, the BagState is empty on the first
            received message. If I'm restoring from a
            checkpoint/savepoint, I would expect this state to be filled.

            Is this correct behaviour? Am I doing something wrong?

            Thanks,
            Cristian

            Other quirks I found:
            a. If KafkaIO.Read is configured to read from the latest
            offset, and there is an exception thrown in the pipeline
            before the first checkpoint happens (let's say on the
            first message that comes in), when Flink retries KafkaIO
            reads from the latest offset again. That means that the
            message that caused the exception is not reprocessed. On
            the other hand, if the exception is thrown after the first
            checkpoint, that message will be tried twice (because
            numberOfExecutionRetries =2 ), and then the pipeline will
            exit. I think this is working as designed but it feels a
            little weird that the behaviour is different depending if
            there's a checkpoint or not.

            b. When KafkaIO.Write is configured with .withEOS(number,
            "group"), and there is an exception thrown in the
            pipeline, the Flink job doesn't exit. I think there is a
            kafka producer in KafkaExactlyOnceSink that is not closed
            correctly.

Reply via email to