I was spending some time with the "Streaming Systems" [0] book over
the weekend and I thought that my pipeline might be doing something "too
much" as the BigQuery sink already should partition the data by day and put
it in the right place - so can my windowing function in the following
pipeline be left out?
I am asking this since sometimes I miss an element at the very edge of a
window compared to a pipeline with a GCS sink and I thought maybe that this
is related to doing the same thing twice (windowing and then the sink does
"window" it again):
pipeline
.apply(
KafkaIO.<String, String>read()
.withBootstrapServers(bootstrap)
.withTopics(topics)
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(ConfigurableDeserializer.class)
.updateConsumerProperties(
ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME,
inputMessagesConfig))
.updateConsumerProperties(ImmutableMap.of("auto.offset.reset", "earliest"))
.updateConsumerProperties(ImmutableMap.of("group.id",
groupId))
.updateConsumerProperties(ImmutableMap.of("enable.auto.commit", "true"))
.withReadCommitted()
.withTimestampPolicyFactory(withEventTs)
.commitOffsetsInFinalize())
.apply(ParDo.of(new ToEventFn()))
// DELETE the following up to
.apply(
Window.into(new ZurichTimePartitioningWindowFn())
.triggering(
Repeatedly.forever(
AfterFirst.of(
AfterPane.elementCountAtLeast(bundleSize),
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(refreshFrequency))))
.withAllowedLateness(Duration.standardDays(14))
.discardingFiredPanes())
// HERE - END of DELETION
.apply(
BigQueryIO.<Event>write()
.withMethod(BigQueryIO.Write.Method.FILE_LOADS)
.withTriggeringFrequency(refreshFrequency)
.withNumFileShards(1)
.to(partitionedTableDynamicDestinations)
.withFormatFunction(
(SerializableFunction<Event, TableRow>)
KafkaToBigQuery::convertUserEventToTableRow)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
pipeline.run().waitUntilFinish();
Best,
Tobi
[0] http://streamingsystems.net/