Awesome!! Thanks for the heads up, very exciting, this is going to make a lot of people happy :)
On Tue, Jul 3, 2018, 3:40 AM Carlos Alonso <car...@mrcalonso.com> wrote: > + dev@beam.apache.org > > Just a quick email to let you know that I'm starting developing this. > > On Fri, Apr 20, 2018 at 10:30 PM Eugene Kirpichov <kirpic...@google.com> > wrote: > >> Hi Carlos, >> >> Thank you for expressing interest in taking this on! Let me give you a >> few pointers to start, and I'll be happy to help everywhere along the way. >> >> Basically we want BigQueryIO.write() to return something (e.g. a >> PCollection) that can be used as input to Wait.on(). >> Currently it returns a WriteResult, which only contains a >> PCollection<TableRow> of failed inserts - that one can not be used >> directly, instead we should add another component to WriteResult that >> represents the result of successfully writing some data. >> >> Given that BQIO supports dynamic destination writes, I think it makes >> sense for that to be a PCollection<KV<DestinationT, ???>> so that in theory >> we could sequence different destinations independently (currently Wait.on() >> does not provide such a feature, but it could); and it will require >> changing WriteResult to be WriteResult<DestinationT>. As for what the "???" >> might be - it is something that represents the result of successfully >> writing a window of data. I think it can even be Void, or "?" (wildcard >> type) for now, until we figure out something better. >> >> Implementing this would require roughly the following work: >> - Add this PCollection<KV<DestinationT, ?>> to WriteResult >> - Modify the BatchLoads transform to provide it on both codepaths: >> expandTriggered() and expandUntriggered() >> ...- expandTriggered() itself writes via 2 codepaths: single-partition >> and multi-partition. Both need to be handled - we need to get a >> PCollection<KV<DestinationT, ?>> from each of them, and Flatten these two >> PCollections together to get the final result. The single-partition >> codepath (writeSinglePartition) under the hood already uses WriteTables >> that returns a KV<DestinationT, ...> so it's directly usable. The >> multi-partition codepath ends in WriteRenameTriggered - unfortunately, this >> codepath drops DestinationT along the way and will need to be refactored a >> bit to keep it until the end. >> ...- expandUntriggered() should be treated the same way. >> - Modify the StreamingWriteTables transform to provide it >> ...- Here also, the challenge is to propagate the DestinationT type all >> the way until the end of StreamingWriteTables - it will need to be >> refactored. After such a refactoring, returning a KV<DestinationT, ...> >> should be easy. >> >> Another challenge with all of this is backwards compatibility in terms of >> API and pipeline update. >> Pipeline update is much less of a concern for the BatchLoads codepath, >> because it's typically used in batch-mode pipelines that don't get updated. >> I would recommend to start with this, perhaps even with only the >> untriggered codepath (it is much more commonly used) - that will pave the >> way for future work. >> >> Hope this helps, please ask more if something is unclear! >> >> On Fri, Apr 20, 2018 at 12:48 AM Carlos Alonso <car...@mrcalonso.com> >> wrote: >> >>> Hey Eugene!! >>> >>> I’d gladly take a stab on it although I’m not sure how much available >>> time I might have to put into but... yeah, let’s try it. >>> >>> Where should I begin? Is there a Jira issue or shall I file one? >>> >>> Thanks! >>> On Thu, 12 Apr 2018 at 00:41, Eugene Kirpichov <kirpic...@google.com> >>> wrote: >>> >>>> Hi, >>>> >>>> Yes, you're both right - BigQueryIO.write() is currently not >>>> implemented in a way that it can be used with Wait.on(). It would certainly >>>> be a welcome contribution to change this - many people expressed interest >>>> in specifically waiting for BigQuery writes. Is any of you interested in >>>> helping out? >>>> >>>> Thanks. >>>> >>>> On Fri, Apr 6, 2018 at 12:36 AM Carlos Alonso <car...@mrcalonso.com> >>>> wrote: >>>> >>>>> Hi Simon, I think your explanation was very accurate, at least to my >>>>> understanding. I'd also be interested in getting batch load result's >>>>> feedback on the pipeline... hopefully someone may suggest something, >>>>> otherwise we could propose submitting a Jira, or even better, a PR!! :) >>>>> >>>>> Thanks! >>>>> >>>>> On Thu, Apr 5, 2018 at 2:01 PM Simon Kitching < >>>>> simon.kitch...@unbelievable-machine.com> wrote: >>>>> >>>>>> Hi All, >>>>>> >>>>>> I need to write some data to BigQuery (batch-mode) and then send a >>>>>> Pubsub message to trigger further processing. >>>>>> >>>>>> I found this thread titled "Callbacks/other functions run after a >>>>>> PDone/output transform" on the user-list which was very relevant: >>>>>> >>>>>> https://lists.apache.org/thread.html/ddcdf93604396b1cbcacdff49aba60817dc90ee7c8434725ea0d26c0@%3Cuser.beam.apache.org%3E >>>>>> >>>>>> Thanks to the author of the Wait transform (Beam 2.4.0)! >>>>>> >>>>>> Unfortunately, it appears that the Wait.on transform does not work >>>>>> with BiqQueryIO in FILE_LOADS mode - or at least I cannot get it to work. >>>>>> Advice appreciated. >>>>>> >>>>>> Here's (most of) the relevant test code: >>>>>> Pipeline p = Pipeline.create(options); >>>>>> PCollection<String> lines = p.apply("Read Input", >>>>>> Create.of("line1", "line2", "line3", "line4")); >>>>>> >>>>>> TableFieldSchema f1 = new >>>>>> TableFieldSchema().setName("value").setType("string"); >>>>>> TableSchema s2 = new >>>>>> TableSchema().setFields(Collections.singletonList(f1)); >>>>>> >>>>>> WriteResult writeResult = lines.apply("Write and load data", >>>>>> BigQueryIO.<String>write() // >>>>>> .to(options.getTableSpec()) // >>>>>> .withFormatFunction(new SlowFormatter()) // >>>>>> .withMethod(BigQueryIO.Write.Method.FILE_LOADS) // >>>>>> // >>>>>> .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS) // >>>>>> .withSchema(s2) >>>>>> >>>>>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) >>>>>> // >>>>>> >>>>>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)); >>>>>> >>>>>> >>>>>> lines.apply(Wait.on(writeResult.getFailedInserts())).apply(ParDo.of(new >>>>>> OnCompletion())); >>>>>> >>>>>> where >>>>>> + format-function "SlowFormatter" prints out each line and has a >>>>>> small sleep for testing purposes, and >>>>>> + DoFn OnCompletion just prints out the contents of each line >>>>>> >>>>>> In production code, OnCompletion would be fed some collection derived >>>>>> from lines, eg min/max record id, and the operation would be "send pubsub >>>>>> message" rather than print.. >>>>>> >>>>>> My expectation is that the "SlowFormatter" would run for each line, >>>>>> then the data would be uploaded, then OnCompletion would print each line. >>>>>> And indeed that happens when STREAMING_INSERTS is used. However for >>>>>> FILE_LOADS, LinePrinter runs before the upload takes place. >>>>>> >>>>>> I use WriteResult.getFailedInserts as that is the only "output" that >>>>>> BiqQueryIO.write() generates AFAICT. I don't expect any failed records, >>>>>> but >>>>>> believe that it can be used as a "signal" for the Wait.on - ie the output >>>>>> is "complete for window" only after all data has been uploaded, which is >>>>>> what I need. And that does seem to work for STREAMING_LOADS. >>>>>> >>>>>> I suspect the reason that this does not work for FILE_LOADS is that >>>>>> method BatchLoads.writeResult returns a WriteResult that wraps an "empty" >>>>>> failedInserts collection, ie data which is not connected to the >>>>>> batch-load-job that is triggered: >>>>>> private WriteResult writeResult(Pipeline p) { >>>>>> PCollection<TableRow> empty = >>>>>> p.apply("CreateEmptyFailedInserts", >>>>>> Create.empty(TypeDescriptor.of(TableRow.class))); >>>>>> return WriteResult.in(p, new TupleTag<>("failedInserts"), empty); >>>>>> } >>>>>> >>>>>> Note that BatchLoads does "synchronously" invoke BigQuery load jobs; >>>>>> once a job is submitted the code repeatedly polls the job status until it >>>>>> reaches DONE or FAILED. However that information does not appear to be >>>>>> exposed anywhere (unlike streaming which effectively exposes >>>>>> completion-state via the failedInserts stream). >>>>>> >>>>>> If I have misunderstood something, corrections welcome! If not, >>>>>> suggestions for workarounds or alternate solutions are also welcome :-) >>>>>> >>>>>> Thanks, >>>>>> Simon >>>>>> >>>>>>