Yes I am making use of partitioned tables, that's why I was wondering if
the windowing step could be skipped. :)

Cheers
Tobi

On Mon, Jan 28, 2019 at 10:43 AM Reza Rokni <[email protected]> wrote:

> My apologies Tobi too quick to hit the send button :-(
>
> I was checking to ask if you had also looked at partition tables in
> BigQuery, assuming the only partitioning you are doing is by Day.
>
> Cheers
> Reza
>
> On Mon, 28 Jan 2019 at 17:22, Reza Rokni <[email protected]> wrote:
>
>> Hi Tobi,
>>
>> Are you making use of partitioned tables in BigQuery or shard tables?
>>
>> https://cloud.google.com/bigquery/docs/partitioned-tables
>>
>> Cheers
>>
>> Reza
>>
>>
>> On Mon, 28 Jan 2019 at 17:11, Kaymak, Tobias <[email protected]>
>> wrote:
>>
>>> I was spending some time with the "Streaming Systems" [0] book over
>>> the weekend and I thought that my pipeline might be doing something "too
>>> much" as the BigQuery sink already should partition the data by day and put
>>> it in the right place - so can my windowing function in the following
>>> pipeline be left out?
>>>
>>> I am asking this since sometimes I miss an element at the very edge of a
>>> window compared to a pipeline with a GCS sink and I thought maybe that this
>>> is related to doing the same thing twice (windowing and then the sink does
>>> "window" it again):
>>>
>>> pipeline
>>>         .apply(
>>>             KafkaIO.<String, String>read()
>>>                 .withBootstrapServers(bootstrap)
>>>                 .withTopics(topics)
>>>                 .withKeyDeserializer(StringDeserializer.class)
>>>                 .withValueDeserializer(ConfigurableDeserializer.class)
>>>                 .updateConsumerProperties(
>>>
>>> ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME,
>>> inputMessagesConfig))
>>>
>>> .updateConsumerProperties(ImmutableMap.of("auto.offset.reset", "earliest"))
>>>                 .updateConsumerProperties(ImmutableMap.of("group.id",
>>> groupId))
>>>
>>> .updateConsumerProperties(ImmutableMap.of("enable.auto.commit", "true"))
>>>                 .withReadCommitted()
>>>                 .withTimestampPolicyFactory(withEventTs)
>>>                 .commitOffsetsInFinalize())
>>>         .apply(ParDo.of(new ToEventFn()))
>>>
>>> // DELETE the following up to
>>>         .apply(
>>>             Window.into(new ZurichTimePartitioningWindowFn())
>>>
>>>                 .triggering(
>>>                     Repeatedly.forever(
>>>                         AfterFirst.of(
>>>                             AfterPane.elementCountAtLeast(bundleSize),
>>>                             AfterProcessingTime.pastFirstElementInPane()
>>>                                 .plusDelayOf(refreshFrequency))))
>>>                 .withAllowedLateness(Duration.standardDays(14))
>>>                 .discardingFiredPanes())
>>> // HERE - END of DELETION
>>>
>>>         .apply(
>>>             BigQueryIO.<Event>write()
>>>                 .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
>>>                 .withTriggeringFrequency(refreshFrequency)
>>>                 .withNumFileShards(1)
>>>                 .to(partitionedTableDynamicDestinations)
>>>                 .withFormatFunction(
>>>                     (SerializableFunction<Event, TableRow>)
>>>                         KafkaToBigQuery::convertUserEventToTableRow)
>>>
>>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
>>>
>>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
>>>
>>>     pipeline.run().waitUntilFinish();
>>>
>>>
>>> Best,
>>> Tobi
>>>
>>> [0] http://streamingsystems.net/
>>>
>>
>>
>> --
>>
>> This email may be confidential and privileged. If you received this
>> communication by mistake, please don't forward it to anyone else, please
>> erase all copies and attachments, and please let me know that it has gone
>> to the wrong person.
>>
>> The above terms reflect a potential business arrangement, are provided
>> solely as a basis for further discussion, and are not intended to be and do
>> not constitute a legally binding obligation. No legally binding obligations
>> will be created, implied, or inferred until an agreement in final form is
>> executed in writing by all parties involved.
>>
>
>
> --
>
> This email may be confidential and privileged. If you received this
> communication by mistake, please don't forward it to anyone else, please
> erase all copies and attachments, and please let me know that it has gone
> to the wrong person.
>
> The above terms reflect a potential business arrangement, are provided
> solely as a basis for further discussion, and are not intended to be and do
> not constitute a legally binding obligation. No legally binding obligations
> will be created, implied, or inferred until an agreement in final form is
> executed in writing by all parties involved.
>


-- 
Tobias Kaymak
Data Engineer

[email protected]
www.ricardo.ch

Reply via email to