Thanx Tobi very interesting ! Sorry for the long gaps in email, I am based
out of Singapore :-)

I wonder if this is coming from the use of  .to(
partitionedTableDynamicDestinations), which I am guessing accesses the
 Intervalwindow. With the use of the Time column based partitioning in my
pipeline I just use the URI to the table.

And thanx for raising the question in general; Looking around I could not
find a nice write up of the possible BigQueryIO patterns with the various
partitioned and sharded table options that are now available. Will put
something together over the next week or so and post back here, might see
if its something that would suite a short blog as well.

Cheers

Reza



On Tue, 29 Jan 2019 at 19:56, Kaymak, Tobias <[email protected]>
wrote:

> I am using FILE_LOADS and no timestamp column, but partitioned tables. In
> this case I have seen the following error, when I comment the windowing
> out (direct runner):
>
> Exception in thread "main"
> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> java.lang.ClassCastException:
> org.apache.beam.sdk.transforms.windowing.GlobalWindow cannot be cast to
> org.apache.beam.sdk.transforms.windowing.IntervalWindow
>
> On Tue, Jan 29, 2019 at 9:11 AM Reza Rokni <[email protected]> wrote:
>
>> Also my BQ table was partitioned based on a TIMESTAMP column, rather than
>> being ingestion time based partitioning.
>>
>> Cheers
>> Reza
>>
>> On Tue, 29 Jan 2019 at 15:44, Reza Rokni <[email protected]> wrote:
>>
>>> Hya Tobi,
>>>
>>> When you mention failed do you mean you get an error on running the
>>> pipeline or there is a incorrect data issue?
>>>
>>> I was just trying some things with a PubSub source and a partitioned
>>> table sink and was able to push things through, it was a very simple
>>> pipeline through, with BigQueryIO.to() set to simple string.
>>>
>>> Cheers
>>>
>>> Reza
>>>
>>> On Mon, 28 Jan 2019 at 22:39, Kaymak, Tobias <[email protected]>
>>> wrote:
>>>
>>>> But it seems like that it fails, when I remove the windowing from the
>>>> pipeline, so I guess the answer is a no.
>>>>
>>>> On Mon, Jan 28, 2019 at 11:36 AM Kaymak, Tobias <
>>>> [email protected]> wrote:
>>>>
>>>>> Yes I am making use of partitioned tables, that's why I was wondering
>>>>> if the windowing step could be skipped. :)
>>>>>
>>>>> Cheers
>>>>> Tobi
>>>>>
>>>>> On Mon, Jan 28, 2019 at 10:43 AM Reza Rokni <[email protected]> wrote:
>>>>>
>>>>>> My apologies Tobi too quick to hit the send button :-(
>>>>>>
>>>>>> I was checking to ask if you had also looked at partition tables in
>>>>>> BigQuery, assuming the only partitioning you are doing is by Day.
>>>>>>
>>>>>> Cheers
>>>>>> Reza
>>>>>>
>>>>>> On Mon, 28 Jan 2019 at 17:22, Reza Rokni <[email protected]> wrote:
>>>>>>
>>>>>>> Hi Tobi,
>>>>>>>
>>>>>>> Are you making use of partitioned tables in BigQuery or shard tables?
>>>>>>>
>>>>>>> https://cloud.google.com/bigquery/docs/partitioned-tables
>>>>>>>
>>>>>>> Cheers
>>>>>>>
>>>>>>> Reza
>>>>>>>
>>>>>>>
>>>>>>> On Mon, 28 Jan 2019 at 17:11, Kaymak, Tobias <
>>>>>>> [email protected]> wrote:
>>>>>>>
>>>>>>>> I was spending some time with the "Streaming Systems" [0] book over
>>>>>>>> the weekend and I thought that my pipeline might be doing something 
>>>>>>>> "too
>>>>>>>> much" as the BigQuery sink already should partition the data by day 
>>>>>>>> and put
>>>>>>>> it in the right place - so can my windowing function in the following
>>>>>>>> pipeline be left out?
>>>>>>>>
>>>>>>>> I am asking this since sometimes I miss an element at the very edge
>>>>>>>> of a window compared to a pipeline with a GCS sink and I thought maybe 
>>>>>>>> that
>>>>>>>> this is related to doing the same thing twice (windowing and then the 
>>>>>>>> sink
>>>>>>>> does "window" it again):
>>>>>>>>
>>>>>>>> pipeline
>>>>>>>>         .apply(
>>>>>>>>             KafkaIO.<String, String>read()
>>>>>>>>                 .withBootstrapServers(bootstrap)
>>>>>>>>                 .withTopics(topics)
>>>>>>>>                 .withKeyDeserializer(StringDeserializer.class)
>>>>>>>>
>>>>>>>> .withValueDeserializer(ConfigurableDeserializer.class)
>>>>>>>>                 .updateConsumerProperties(
>>>>>>>>
>>>>>>>> ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME,
>>>>>>>> inputMessagesConfig))
>>>>>>>>
>>>>>>>> .updateConsumerProperties(ImmutableMap.of("auto.offset.reset", 
>>>>>>>> "earliest"))
>>>>>>>>                 .updateConsumerProperties(ImmutableMap.of("group.id",
>>>>>>>> groupId))
>>>>>>>>
>>>>>>>> .updateConsumerProperties(ImmutableMap.of("enable.auto.commit", 
>>>>>>>> "true"))
>>>>>>>>                 .withReadCommitted()
>>>>>>>>                 .withTimestampPolicyFactory(withEventTs)
>>>>>>>>                 .commitOffsetsInFinalize())
>>>>>>>>         .apply(ParDo.of(new ToEventFn()))
>>>>>>>>
>>>>>>>> // DELETE the following up to
>>>>>>>>         .apply(
>>>>>>>>             Window.into(new ZurichTimePartitioningWindowFn())
>>>>>>>>
>>>>>>>>                 .triggering(
>>>>>>>>                     Repeatedly.forever(
>>>>>>>>                         AfterFirst.of(
>>>>>>>>
>>>>>>>> AfterPane.elementCountAtLeast(bundleSize),
>>>>>>>>
>>>>>>>> AfterProcessingTime.pastFirstElementInPane()
>>>>>>>>                                 .plusDelayOf(refreshFrequency))))
>>>>>>>>                 .withAllowedLateness(Duration.standardDays(14))
>>>>>>>>                 .discardingFiredPanes())
>>>>>>>> // HERE - END of DELETION
>>>>>>>>
>>>>>>>>         .apply(
>>>>>>>>             BigQueryIO.<Event>write()
>>>>>>>>                 .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
>>>>>>>>                 .withTriggeringFrequency(refreshFrequency)
>>>>>>>>                 .withNumFileShards(1)
>>>>>>>>                 .to(partitionedTableDynamicDestinations)
>>>>>>>>                 .withFormatFunction(
>>>>>>>>                     (SerializableFunction<Event, TableRow>)
>>>>>>>>                         KafkaToBigQuery::convertUserEventToTableRow)
>>>>>>>>
>>>>>>>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
>>>>>>>>
>>>>>>>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
>>>>>>>>
>>>>>>>>     pipeline.run().waitUntilFinish();
>>>>>>>>
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Tobi
>>>>>>>>
>>>>>>>> [0] http://streamingsystems.net/
>>>>>>>>
>>>>>>>

-- 

This email may be confidential and privileged. If you received this
communication by mistake, please don't forward it to anyone else, please
erase all copies and attachments, and please let me know that it has gone
to the wrong person.

The above terms reflect a potential business arrangement, are provided
solely as a basis for further discussion, and are not intended to be and do
not constitute a legally binding obligation. No legally binding obligations
will be created, implied, or inferred until an agreement in final form is
executed in writing by all parties involved.

Reply via email to