I was spending some time with the "Streaming Systems" [0] book over
the weekend and I thought that my pipeline might be doing something "too
much" as the BigQuery sink already should partition the data by day and put
it in the right place - so can my windowing function in the following
pipeline be left out?

I am asking this since sometimes I miss an element at the very edge of a
window compared to a pipeline with a GCS sink and I thought maybe that this
is related to doing the same thing twice (windowing and then the sink does
"window" it again):

pipeline
        .apply(
            KafkaIO.<String, String>read()
                .withBootstrapServers(bootstrap)
                .withTopics(topics)
                .withKeyDeserializer(StringDeserializer.class)
                .withValueDeserializer(ConfigurableDeserializer.class)
                .updateConsumerProperties(

ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME,
inputMessagesConfig))

.updateConsumerProperties(ImmutableMap.of("auto.offset.reset", "earliest"))
                .updateConsumerProperties(ImmutableMap.of("group.id",
groupId))

.updateConsumerProperties(ImmutableMap.of("enable.auto.commit", "true"))
                .withReadCommitted()
                .withTimestampPolicyFactory(withEventTs)
                .commitOffsetsInFinalize())
        .apply(ParDo.of(new ToEventFn()))

// DELETE the following up to
        .apply(
            Window.into(new ZurichTimePartitioningWindowFn())

                .triggering(
                    Repeatedly.forever(
                        AfterFirst.of(
                            AfterPane.elementCountAtLeast(bundleSize),
                            AfterProcessingTime.pastFirstElementInPane()
                                .plusDelayOf(refreshFrequency))))
                .withAllowedLateness(Duration.standardDays(14))
                .discardingFiredPanes())
// HERE - END of DELETION

        .apply(
            BigQueryIO.<Event>write()
                .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
                .withTriggeringFrequency(refreshFrequency)
                .withNumFileShards(1)
                .to(partitionedTableDynamicDestinations)
                .withFormatFunction(
                    (SerializableFunction<Event, TableRow>)
                        KafkaToBigQuery::convertUserEventToTableRow)

.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)

.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));

    pipeline.run().waitUntilFinish();


Best,
Tobi

[0] http://streamingsystems.net/

Reply via email to