Hi,

I am sharing my experience with you after trying to use the following
pipeline
logic (with Beam 2.6.0 - running on Flink 1.5):

1. Reading from KafkaIO, attaching a timestamp from each parsed element
2. Filtering bad records
3. Writing to a partitioned table in BigQuery with FILE_LOADS (batch jobs)
every 15 minutes

I had a working pipeline that does not write to BigQuery directly, but to
Cloud Storage, so it's 3rd step was

3. Writing files to GCS in daily "subdirectories"

I tried to rewrite the pipeline to reduce complexity: Resetting it's state
should no longer be tied to thinking about what to delete on GCS, also
configurable refresh times directly from within the Pipeline was something I
was looking for. The thing that I needed to change was the output in the
end,
so knew my parsing logic would not change and that should reduce the risk.

I tested the pipeline within our testcluster and it looked promising. When I
deployed it last week everything seemed to go smoothly. On Friday I noticed
that I had holes in the data: in the BigQuery tables there were missing days
(tricky was that the recent days looked fine). (To be sure I reset the
pipeline
and read from the very beginning of each topic from Kafka. Within different
runs, different days were missing.) I spent the weekend rolling back the
changes and trying to figure out what was going on.

I didn't see any error in the logs (the log level was on WARNING for most
parts), but I thought, well maybe it's because there are too many partitions
and BigQuery has a limit of 1000 partition operations per day. So I started
reading from just 90 days in the past, but I still had holes (whole days).

I had a windowing step that I needed for the GCS pipeline, I became aware
that I
wouldn't need this anymore with BigQueryIO so I commented it out and tested
again, without luck.

What struck me was that the Flink Cluster didn't do any checkpoints for the
pipeline that was using BigQueryIO - it does so when writing to GCS and I
tested it's failure logic there. Additionally the graph in Flink with
BigQueryIO becomes very complex, but this is something I expected.

Here is the Pipeline code with the commented out windowing part:

  pipeline
        .apply(
            KafkaIO.<String, String>read()
                .withBootstrapServers(bootstrap)
                .withTopics(topics)
                .withKeyDeserializer(StringDeserializer.class)
                .withValueDeserializer(ConfigurableDeserializer.class)
                .updateConsumerProperties(

ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME,
inputMessagesConfig))

.updateConsumerProperties(ImmutableMap.of("auto.offset.reset", "earliest"))
                .updateConsumerProperties(ImmutableMap.of("group.id",
"di-beam-consumers"))

.updateConsumerProperties(ImmutableMap.of("enable.auto.commit", "true"))
                .withTimestampPolicyFactory(
                    TimestampPolicyFactory.withTimestampFn(
                        new MessageTimestampExtractor(inputMessagesConfig)))
                .withReadCommitted()
                .commitOffsetsInFinalize())
        .apply(ParDo.of(new ToEventFn()))
        //        .apply(
        //            Window.into(new ZurichTimePartitioningWindowFn())
        //                .triggering(
        //                    Repeatedly.forever(
        //                        AfterFirst.of(
        //
AfterPane.elementCountAtLeast(bundleSize),
        //
AfterProcessingTime.pastFirstElementInPane()
        //                                .plusDelayOf(refreshFrequency))))
        //                .withAllowedLateness(Duration.standardDays(1))
        //                .discardingFiredPanes())
        .apply(
            BigQueryIO.<Event>write()
                .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
                .withTriggeringFrequency(refreshFrequency)
                .withNumFileShards(1)
                .to(partitionedTableDynamicDestinations)
                .withFormatFunction(
                    (SerializableFunction<Event, TableRow>)
                        KafkaToBigQuery::convertUserEventToTableRow)

.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)

.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));


I have the feeling that I must make some serious and dumb mistakes as I know
the Beam framework is very robust. Thanks for taking the time to read this.

Tobi

Reply via email to