Hi,

I am trying to migrate my existing KafkaToGCS pipeline to a KafkaToBigQuery
pipeline to skip the loading step from GCS which is currently handled
externally from Beam.

I noticed that the pipeline, written in Beam 2.9.0 (Java) does not trigger
any checkpoint on Flink (1.5.5), even though its configured to do so when I
launch it. Is this normal? How does Beam then guarantee exactly once when
there are no checkpoints in Flink? (It seems to start from scratch when it
crashes, during my tests, but I am not 100% sure)


This is my pipeline:

 pipeline
        .apply(
            KafkaIO.<String, String>read()
                .withBootstrapServers(bootstrap)
                .withTopics(topics)
                .withKeyDeserializer(StringDeserializer.class)
                .withValueDeserializer(ConfigurableDeserializer.class)
                .updateConsumerProperties(

ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME,
inputMessagesConfig))

.updateConsumerProperties(ImmutableMap.of("auto.offset.reset", "earliest"))
                .updateConsumerProperties(ImmutableMap.of("group.id",
groupId))

.updateConsumerProperties(ImmutableMap.of("enable.auto.commit", "true"))
                .withReadCommitted()
                .withTimestampPolicyFactory(withEventTs)
                .commitOffsetsInFinalize())
        .apply(ParDo.of(new ToEventFn()))
        .apply(
            Window.into(new ZurichTimePartitioningWindowFn())

                .triggering(
                    Repeatedly.forever(
                        AfterFirst.of(
                            AfterPane.elementCountAtLeast(bundleSize),
                            AfterProcessingTime.pastFirstElementInPane()
                                .plusDelayOf(refreshFrequency))))
                .withAllowedLateness(Duration.standardDays(14))
                .discardingFiredPanes())
        .apply(
            BigQueryIO.<Event>write()
                .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
                .withTriggeringFrequency(refreshFrequency)
                .withNumFileShards(1)
                .to(partitionedTableDynamicDestinations)
                .withFormatFunction(
                    (SerializableFunction<Event, TableRow>)
                        KafkaToBigQuery::convertUserEventToTableRow)

.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)

.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));

    pipeline.run().waitUntilFinish();

It's launched like the other (GCS) one via:

...--checkpointingMode=EXACTLY_ONCE --checkpointingInterval=300000
--parallelism=1 --tempLocation=gs://foo..

Any idea why checkpointing does not work here?

Best,
Tobias

Reply via email to