I am using FILE_LOADS and no timestamp column, but partitioned tables. In this case I have seen the following error, when I comment the windowing out (direct runner):
Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.ClassCastException: org.apache.beam.sdk.transforms.windowing.GlobalWindow cannot be cast to org.apache.beam.sdk.transforms.windowing.IntervalWindow On Tue, Jan 29, 2019 at 9:11 AM Reza Rokni <[email protected]> wrote: > Also my BQ table was partitioned based on a TIMESTAMP column, rather than > being ingestion time based partitioning. > > Cheers > Reza > > On Tue, 29 Jan 2019 at 15:44, Reza Rokni <[email protected]> wrote: > >> Hya Tobi, >> >> When you mention failed do you mean you get an error on running the >> pipeline or there is a incorrect data issue? >> >> I was just trying some things with a PubSub source and a partitioned >> table sink and was able to push things through, it was a very simple >> pipeline through, with BigQueryIO.to() set to simple string. >> >> Cheers >> >> Reza >> >> On Mon, 28 Jan 2019 at 22:39, Kaymak, Tobias <[email protected]> >> wrote: >> >>> But it seems like that it fails, when I remove the windowing from the >>> pipeline, so I guess the answer is a no. >>> >>> On Mon, Jan 28, 2019 at 11:36 AM Kaymak, Tobias < >>> [email protected]> wrote: >>> >>>> Yes I am making use of partitioned tables, that's why I was wondering >>>> if the windowing step could be skipped. :) >>>> >>>> Cheers >>>> Tobi >>>> >>>> On Mon, Jan 28, 2019 at 10:43 AM Reza Rokni <[email protected]> wrote: >>>> >>>>> My apologies Tobi too quick to hit the send button :-( >>>>> >>>>> I was checking to ask if you had also looked at partition tables in >>>>> BigQuery, assuming the only partitioning you are doing is by Day. >>>>> >>>>> Cheers >>>>> Reza >>>>> >>>>> On Mon, 28 Jan 2019 at 17:22, Reza Rokni <[email protected]> wrote: >>>>> >>>>>> Hi Tobi, >>>>>> >>>>>> Are you making use of partitioned tables in BigQuery or shard tables? >>>>>> >>>>>> https://cloud.google.com/bigquery/docs/partitioned-tables >>>>>> >>>>>> Cheers >>>>>> >>>>>> Reza >>>>>> >>>>>> >>>>>> On Mon, 28 Jan 2019 at 17:11, Kaymak, Tobias < >>>>>> [email protected]> wrote: >>>>>> >>>>>>> I was spending some time with the "Streaming Systems" [0] book over >>>>>>> the weekend and I thought that my pipeline might be doing something "too >>>>>>> much" as the BigQuery sink already should partition the data by day and >>>>>>> put >>>>>>> it in the right place - so can my windowing function in the following >>>>>>> pipeline be left out? >>>>>>> >>>>>>> I am asking this since sometimes I miss an element at the very edge >>>>>>> of a window compared to a pipeline with a GCS sink and I thought maybe >>>>>>> that >>>>>>> this is related to doing the same thing twice (windowing and then the >>>>>>> sink >>>>>>> does "window" it again): >>>>>>> >>>>>>> pipeline >>>>>>> .apply( >>>>>>> KafkaIO.<String, String>read() >>>>>>> .withBootstrapServers(bootstrap) >>>>>>> .withTopics(topics) >>>>>>> .withKeyDeserializer(StringDeserializer.class) >>>>>>> >>>>>>> .withValueDeserializer(ConfigurableDeserializer.class) >>>>>>> .updateConsumerProperties( >>>>>>> >>>>>>> ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME, >>>>>>> inputMessagesConfig)) >>>>>>> >>>>>>> .updateConsumerProperties(ImmutableMap.of("auto.offset.reset", >>>>>>> "earliest")) >>>>>>> .updateConsumerProperties(ImmutableMap.of("group.id", >>>>>>> groupId)) >>>>>>> >>>>>>> .updateConsumerProperties(ImmutableMap.of("enable.auto.commit", "true")) >>>>>>> .withReadCommitted() >>>>>>> .withTimestampPolicyFactory(withEventTs) >>>>>>> .commitOffsetsInFinalize()) >>>>>>> .apply(ParDo.of(new ToEventFn())) >>>>>>> >>>>>>> // DELETE the following up to >>>>>>> .apply( >>>>>>> Window.into(new ZurichTimePartitioningWindowFn()) >>>>>>> >>>>>>> .triggering( >>>>>>> Repeatedly.forever( >>>>>>> AfterFirst.of( >>>>>>> >>>>>>> AfterPane.elementCountAtLeast(bundleSize), >>>>>>> >>>>>>> AfterProcessingTime.pastFirstElementInPane() >>>>>>> .plusDelayOf(refreshFrequency)))) >>>>>>> .withAllowedLateness(Duration.standardDays(14)) >>>>>>> .discardingFiredPanes()) >>>>>>> // HERE - END of DELETION >>>>>>> >>>>>>> .apply( >>>>>>> BigQueryIO.<Event>write() >>>>>>> .withMethod(BigQueryIO.Write.Method.FILE_LOADS) >>>>>>> .withTriggeringFrequency(refreshFrequency) >>>>>>> .withNumFileShards(1) >>>>>>> .to(partitionedTableDynamicDestinations) >>>>>>> .withFormatFunction( >>>>>>> (SerializableFunction<Event, TableRow>) >>>>>>> KafkaToBigQuery::convertUserEventToTableRow) >>>>>>> >>>>>>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) >>>>>>> >>>>>>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)); >>>>>>> >>>>>>> pipeline.run().waitUntilFinish(); >>>>>>> >>>>>>> >>>>>>> Best, >>>>>>> Tobi >>>>>>> >>>>>>> [0] http://streamingsystems.net/ >>>>>>> >>>>>>
