I am using FILE_LOADS and no timestamp column, but partitioned tables. In
this case I have seen the following error, when I comment the windowing out
(direct runner):

Exception in thread "main"
org.apache.beam.sdk.Pipeline$PipelineExecutionException:
java.lang.ClassCastException:
org.apache.beam.sdk.transforms.windowing.GlobalWindow cannot be cast to
org.apache.beam.sdk.transforms.windowing.IntervalWindow

On Tue, Jan 29, 2019 at 9:11 AM Reza Rokni <[email protected]> wrote:

> Also my BQ table was partitioned based on a TIMESTAMP column, rather than
> being ingestion time based partitioning.
>
> Cheers
> Reza
>
> On Tue, 29 Jan 2019 at 15:44, Reza Rokni <[email protected]> wrote:
>
>> Hya Tobi,
>>
>> When you mention failed do you mean you get an error on running the
>> pipeline or there is a incorrect data issue?
>>
>> I was just trying some things with a PubSub source and a partitioned
>> table sink and was able to push things through, it was a very simple
>> pipeline through, with BigQueryIO.to() set to simple string.
>>
>> Cheers
>>
>> Reza
>>
>> On Mon, 28 Jan 2019 at 22:39, Kaymak, Tobias <[email protected]>
>> wrote:
>>
>>> But it seems like that it fails, when I remove the windowing from the
>>> pipeline, so I guess the answer is a no.
>>>
>>> On Mon, Jan 28, 2019 at 11:36 AM Kaymak, Tobias <
>>> [email protected]> wrote:
>>>
>>>> Yes I am making use of partitioned tables, that's why I was wondering
>>>> if the windowing step could be skipped. :)
>>>>
>>>> Cheers
>>>> Tobi
>>>>
>>>> On Mon, Jan 28, 2019 at 10:43 AM Reza Rokni <[email protected]> wrote:
>>>>
>>>>> My apologies Tobi too quick to hit the send button :-(
>>>>>
>>>>> I was checking to ask if you had also looked at partition tables in
>>>>> BigQuery, assuming the only partitioning you are doing is by Day.
>>>>>
>>>>> Cheers
>>>>> Reza
>>>>>
>>>>> On Mon, 28 Jan 2019 at 17:22, Reza Rokni <[email protected]> wrote:
>>>>>
>>>>>> Hi Tobi,
>>>>>>
>>>>>> Are you making use of partitioned tables in BigQuery or shard tables?
>>>>>>
>>>>>> https://cloud.google.com/bigquery/docs/partitioned-tables
>>>>>>
>>>>>> Cheers
>>>>>>
>>>>>> Reza
>>>>>>
>>>>>>
>>>>>> On Mon, 28 Jan 2019 at 17:11, Kaymak, Tobias <
>>>>>> [email protected]> wrote:
>>>>>>
>>>>>>> I was spending some time with the "Streaming Systems" [0] book over
>>>>>>> the weekend and I thought that my pipeline might be doing something "too
>>>>>>> much" as the BigQuery sink already should partition the data by day and 
>>>>>>> put
>>>>>>> it in the right place - so can my windowing function in the following
>>>>>>> pipeline be left out?
>>>>>>>
>>>>>>> I am asking this since sometimes I miss an element at the very edge
>>>>>>> of a window compared to a pipeline with a GCS sink and I thought maybe 
>>>>>>> that
>>>>>>> this is related to doing the same thing twice (windowing and then the 
>>>>>>> sink
>>>>>>> does "window" it again):
>>>>>>>
>>>>>>> pipeline
>>>>>>>         .apply(
>>>>>>>             KafkaIO.<String, String>read()
>>>>>>>                 .withBootstrapServers(bootstrap)
>>>>>>>                 .withTopics(topics)
>>>>>>>                 .withKeyDeserializer(StringDeserializer.class)
>>>>>>>
>>>>>>> .withValueDeserializer(ConfigurableDeserializer.class)
>>>>>>>                 .updateConsumerProperties(
>>>>>>>
>>>>>>> ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME,
>>>>>>> inputMessagesConfig))
>>>>>>>
>>>>>>> .updateConsumerProperties(ImmutableMap.of("auto.offset.reset", 
>>>>>>> "earliest"))
>>>>>>>                 .updateConsumerProperties(ImmutableMap.of("group.id",
>>>>>>> groupId))
>>>>>>>
>>>>>>> .updateConsumerProperties(ImmutableMap.of("enable.auto.commit", "true"))
>>>>>>>                 .withReadCommitted()
>>>>>>>                 .withTimestampPolicyFactory(withEventTs)
>>>>>>>                 .commitOffsetsInFinalize())
>>>>>>>         .apply(ParDo.of(new ToEventFn()))
>>>>>>>
>>>>>>> // DELETE the following up to
>>>>>>>         .apply(
>>>>>>>             Window.into(new ZurichTimePartitioningWindowFn())
>>>>>>>
>>>>>>>                 .triggering(
>>>>>>>                     Repeatedly.forever(
>>>>>>>                         AfterFirst.of(
>>>>>>>
>>>>>>> AfterPane.elementCountAtLeast(bundleSize),
>>>>>>>
>>>>>>> AfterProcessingTime.pastFirstElementInPane()
>>>>>>>                                 .plusDelayOf(refreshFrequency))))
>>>>>>>                 .withAllowedLateness(Duration.standardDays(14))
>>>>>>>                 .discardingFiredPanes())
>>>>>>> // HERE - END of DELETION
>>>>>>>
>>>>>>>         .apply(
>>>>>>>             BigQueryIO.<Event>write()
>>>>>>>                 .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
>>>>>>>                 .withTriggeringFrequency(refreshFrequency)
>>>>>>>                 .withNumFileShards(1)
>>>>>>>                 .to(partitionedTableDynamicDestinations)
>>>>>>>                 .withFormatFunction(
>>>>>>>                     (SerializableFunction<Event, TableRow>)
>>>>>>>                         KafkaToBigQuery::convertUserEventToTableRow)
>>>>>>>
>>>>>>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
>>>>>>>
>>>>>>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
>>>>>>>
>>>>>>>     pipeline.run().waitUntilFinish();
>>>>>>>
>>>>>>>
>>>>>>> Best,
>>>>>>> Tobi
>>>>>>>
>>>>>>> [0] http://streamingsystems.net/
>>>>>>>
>>>>>>

Reply via email to