Hi Tobias,

Checkpointing should be enabled when you set it in the Flink config or via the Beam option `checkpointingInterval`. Did you set `runner` to `FlinkRunner`?

If possible, could you share parts of the Flink logs?

Thanks,
Max

On 25.01.19 15:14, Kaymak, Tobias wrote:
Hi,

I am trying to migrate my existing KafkaToGCS pipeline to a KafkaToBigQuery pipeline to skip the loading step from GCS which is currently handled externally from Beam.

I noticed that the pipeline, written in Beam 2.9.0 (Java) does not trigger any checkpoint on Flink (1.5.5), even though its configured to do so when I launch it. Is this normal? How does Beam then guarantee exactly once when there are no checkpoints in Flink? (It seems to start from scratch when it crashes, during my tests, but I am not 100% sure)


This is my pipeline:

  pipeline
         .apply(
             KafkaIO.<String, String>read()
                 .withBootstrapServers(bootstrap)
                 .withTopics(topics)
                 .withKeyDeserializer(StringDeserializer.class)
                 .withValueDeserializer(ConfigurableDeserializer.class)
                 .updateConsumerProperties(
                    ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME, inputMessagesConfig))                 .updateConsumerProperties(ImmutableMap.of("auto.offset.reset", "earliest"))                 .updateConsumerProperties(ImmutableMap.of("group.id <http://group.id>", groupId))                 .updateConsumerProperties(ImmutableMap.of("enable.auto.commit", "true"))
                 .withReadCommitted()
                 .withTimestampPolicyFactory(withEventTs)
                 .commitOffsetsInFinalize())
         .apply(ParDo.of(new ToEventFn()))
         .apply(
             Window.into(new ZurichTimePartitioningWindowFn())

                 .triggering(
                     Repeatedly.forever(
                         AfterFirst.of(
                             AfterPane.elementCountAtLeast(bundleSize),
                             AfterProcessingTime.pastFirstElementInPane()
                                 .plusDelayOf(refreshFrequency))))
                 .withAllowedLateness(Duration.standardDays(14))
                 .discardingFiredPanes())
         .apply(
             BigQueryIO.<Event>write()
                 .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
                 .withTriggeringFrequency(refreshFrequency)
                 .withNumFileShards(1)
                 .to(partitionedTableDynamicDestinations)
                 .withFormatFunction(
                     (SerializableFunction<Event, TableRow>)
                         KafkaToBigQuery::convertUserEventToTableRow)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));

     pipeline.run().waitUntilFinish();
It's launched like the other (GCS) one via:

...--checkpointingMode=EXACTLY_ONCE --checkpointingInterval=300000 --parallelism=1 --tempLocation=gs://foo..

Any idea why checkpointing does not work here?

Best,
Tobias

Reply via email to