Hi Tobi, Are you making use of partitioned tables in BigQuery or shard tables?
https://cloud.google.com/bigquery/docs/partitioned-tables Cheers Reza On Mon, 28 Jan 2019 at 17:11, Kaymak, Tobias <[email protected]> wrote: > I was spending some time with the "Streaming Systems" [0] book over > the weekend and I thought that my pipeline might be doing something "too > much" as the BigQuery sink already should partition the data by day and put > it in the right place - so can my windowing function in the following > pipeline be left out? > > I am asking this since sometimes I miss an element at the very edge of a > window compared to a pipeline with a GCS sink and I thought maybe that this > is related to doing the same thing twice (windowing and then the sink does > "window" it again): > > pipeline > .apply( > KafkaIO.<String, String>read() > .withBootstrapServers(bootstrap) > .withTopics(topics) > .withKeyDeserializer(StringDeserializer.class) > .withValueDeserializer(ConfigurableDeserializer.class) > .updateConsumerProperties( > > ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME, > inputMessagesConfig)) > > .updateConsumerProperties(ImmutableMap.of("auto.offset.reset", "earliest")) > .updateConsumerProperties(ImmutableMap.of("group.id", > groupId)) > > .updateConsumerProperties(ImmutableMap.of("enable.auto.commit", "true")) > .withReadCommitted() > .withTimestampPolicyFactory(withEventTs) > .commitOffsetsInFinalize()) > .apply(ParDo.of(new ToEventFn())) > > // DELETE the following up to > .apply( > Window.into(new ZurichTimePartitioningWindowFn()) > > .triggering( > Repeatedly.forever( > AfterFirst.of( > AfterPane.elementCountAtLeast(bundleSize), > AfterProcessingTime.pastFirstElementInPane() > .plusDelayOf(refreshFrequency)))) > .withAllowedLateness(Duration.standardDays(14)) > .discardingFiredPanes()) > // HERE - END of DELETION > > .apply( > BigQueryIO.<Event>write() > .withMethod(BigQueryIO.Write.Method.FILE_LOADS) > .withTriggeringFrequency(refreshFrequency) > .withNumFileShards(1) > .to(partitionedTableDynamicDestinations) > .withFormatFunction( > (SerializableFunction<Event, TableRow>) > KafkaToBigQuery::convertUserEventToTableRow) > > .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) > > .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)); > > pipeline.run().waitUntilFinish(); > > > Best, > Tobi > > [0] http://streamingsystems.net/ > -- This email may be confidential and privileged. If you received this communication by mistake, please don't forward it to anyone else, please erase all copies and attachments, and please let me know that it has gone to the wrong person. The above terms reflect a potential business arrangement, are provided solely as a basis for further discussion, and are not intended to be and do not constitute a legally binding obligation. No legally binding obligations will be created, implied, or inferred until an agreement in final form is executed in writing by all parties involved.
