yes, i must say i have been waiting for this for over 6 months, it would help a lot chaim On Tue, Jul 3, 2018 at 5:14 PM Eugene Kirpichov <[email protected]> wrote: > > Awesome!! Thanks for the heads up, very exciting, this is going to make a lot > of people happy :) > > On Tue, Jul 3, 2018, 3:40 AM Carlos Alonso <[email protected]> wrote: >> >> + [email protected] >> >> Just a quick email to let you know that I'm starting developing this. >> >> On Fri, Apr 20, 2018 at 10:30 PM Eugene Kirpichov <[email protected]> >> wrote: >>> >>> Hi Carlos, >>> >>> Thank you for expressing interest in taking this on! Let me give you a few >>> pointers to start, and I'll be happy to help everywhere along the way. >>> >>> Basically we want BigQueryIO.write() to return something (e.g. a >>> PCollection) that can be used as input to Wait.on(). >>> Currently it returns a WriteResult, which only contains a >>> PCollection<TableRow> of failed inserts - that one can not be used >>> directly, instead we should add another component to WriteResult that >>> represents the result of successfully writing some data. >>> >>> Given that BQIO supports dynamic destination writes, I think it makes sense >>> for that to be a PCollection<KV<DestinationT, ???>> so that in theory we >>> could sequence different destinations independently (currently Wait.on() >>> does not provide such a feature, but it could); and it will require >>> changing WriteResult to be WriteResult<DestinationT>. As for what the "???" >>> might be - it is something that represents the result of successfully >>> writing a window of data. I think it can even be Void, or "?" (wildcard >>> type) for now, until we figure out something better. >>> >>> Implementing this would require roughly the following work: >>> - Add this PCollection<KV<DestinationT, ?>> to WriteResult >>> - Modify the BatchLoads transform to provide it on both codepaths: >>> expandTriggered() and expandUntriggered() >>> ...- expandTriggered() itself writes via 2 codepaths: single-partition and >>> multi-partition. Both need to be handled - we need to get a >>> PCollection<KV<DestinationT, ?>> from each of them, and Flatten these two >>> PCollections together to get the final result. The single-partition >>> codepath (writeSinglePartition) under the hood already uses WriteTables >>> that returns a KV<DestinationT, ...> so it's directly usable. The >>> multi-partition codepath ends in WriteRenameTriggered - unfortunately, this >>> codepath drops DestinationT along the way and will need to be refactored a >>> bit to keep it until the end. >>> ...- expandUntriggered() should be treated the same way. >>> - Modify the StreamingWriteTables transform to provide it >>> ...- Here also, the challenge is to propagate the DestinationT type all the >>> way until the end of StreamingWriteTables - it will need to be refactored. >>> After such a refactoring, returning a KV<DestinationT, ...> should be easy. >>> >>> Another challenge with all of this is backwards compatibility in terms of >>> API and pipeline update. >>> Pipeline update is much less of a concern for the BatchLoads codepath, >>> because it's typically used in batch-mode pipelines that don't get updated. >>> I would recommend to start with this, perhaps even with only the >>> untriggered codepath (it is much more commonly used) - that will pave the >>> way for future work. >>> >>> Hope this helps, please ask more if something is unclear! >>> >>> On Fri, Apr 20, 2018 at 12:48 AM Carlos Alonso <[email protected]> wrote: >>>> >>>> Hey Eugene!! >>>> >>>> I’d gladly take a stab on it although I’m not sure how much available time >>>> I might have to put into but... yeah, let’s try it. >>>> >>>> Where should I begin? Is there a Jira issue or shall I file one? >>>> >>>> Thanks! >>>> On Thu, 12 Apr 2018 at 00:41, Eugene Kirpichov <[email protected]> >>>> wrote: >>>>> >>>>> Hi, >>>>> >>>>> Yes, you're both right - BigQueryIO.write() is currently not implemented >>>>> in a way that it can be used with Wait.on(). It would certainly be a >>>>> welcome contribution to change this - many people expressed interest in >>>>> specifically waiting for BigQuery writes. Is any of you interested in >>>>> helping out? >>>>> >>>>> Thanks. >>>>> >>>>> On Fri, Apr 6, 2018 at 12:36 AM Carlos Alonso <[email protected]> >>>>> wrote: >>>>>> >>>>>> Hi Simon, I think your explanation was very accurate, at least to my >>>>>> understanding. I'd also be interested in getting batch load result's >>>>>> feedback on the pipeline... hopefully someone may suggest something, >>>>>> otherwise we could propose submitting a Jira, or even better, a PR!! :) >>>>>> >>>>>> Thanks! >>>>>> >>>>>> On Thu, Apr 5, 2018 at 2:01 PM Simon Kitching >>>>>> <[email protected]> wrote: >>>>>>> >>>>>>> Hi All, >>>>>>> >>>>>>> I need to write some data to BigQuery (batch-mode) and then send a >>>>>>> Pubsub message to trigger further processing. >>>>>>> >>>>>>> I found this thread titled "Callbacks/other functions run after a >>>>>>> PDone/output transform" on the user-list which was very relevant: >>>>>>> >>>>>>> https://lists.apache.org/thread.html/ddcdf93604396b1cbcacdff49aba60817dc90ee7c8434725ea0d26c0@%3Cuser.beam.apache.org%3E >>>>>>> >>>>>>> Thanks to the author of the Wait transform (Beam 2.4.0)! >>>>>>> >>>>>>> Unfortunately, it appears that the Wait.on transform does not work with >>>>>>> BiqQueryIO in FILE_LOADS mode - or at least I cannot get it to work. >>>>>>> Advice appreciated. >>>>>>> >>>>>>> Here's (most of) the relevant test code: >>>>>>> Pipeline p = Pipeline.create(options); >>>>>>> PCollection<String> lines = p.apply("Read Input", >>>>>>> Create.of("line1", "line2", "line3", "line4")); >>>>>>> >>>>>>> TableFieldSchema f1 = new >>>>>>> TableFieldSchema().setName("value").setType("string"); >>>>>>> TableSchema s2 = new >>>>>>> TableSchema().setFields(Collections.singletonList(f1)); >>>>>>> >>>>>>> WriteResult writeResult = lines.apply("Write and load data", >>>>>>> BigQueryIO.<String>write() // >>>>>>> .to(options.getTableSpec()) // >>>>>>> .withFormatFunction(new SlowFormatter()) // >>>>>>> .withMethod(BigQueryIO.Write.Method.FILE_LOADS) // >>>>>>> // >>>>>>> .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS) // >>>>>>> .withSchema(s2) >>>>>>> >>>>>>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) >>>>>>> // >>>>>>> >>>>>>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)); >>>>>>> >>>>>>> >>>>>>> lines.apply(Wait.on(writeResult.getFailedInserts())).apply(ParDo.of(new >>>>>>> OnCompletion())); >>>>>>> >>>>>>> where >>>>>>> + format-function "SlowFormatter" prints out each line and has a small >>>>>>> sleep for testing purposes, and >>>>>>> + DoFn OnCompletion just prints out the contents of each line >>>>>>> >>>>>>> In production code, OnCompletion would be fed some collection derived >>>>>>> from lines, eg min/max record id, and the operation would be "send >>>>>>> pubsub message" rather than print.. >>>>>>> >>>>>>> My expectation is that the "SlowFormatter" would run for each line, >>>>>>> then the data would be uploaded, then OnCompletion would print each >>>>>>> line. And indeed that happens when STREAMING_INSERTS is used. However >>>>>>> for FILE_LOADS, LinePrinter runs before the upload takes place. >>>>>>> >>>>>>> I use WriteResult.getFailedInserts as that is the only "output" that >>>>>>> BiqQueryIO.write() generates AFAICT. I don't expect any failed records, >>>>>>> but believe that it can be used as a "signal" for the Wait.on - ie the >>>>>>> output is "complete for window" only after all data has been uploaded, >>>>>>> which is what I need. And that does seem to work for STREAMING_LOADS. >>>>>>> >>>>>>> I suspect the reason that this does not work for FILE_LOADS is that >>>>>>> method BatchLoads.writeResult returns a WriteResult that wraps an >>>>>>> "empty" failedInserts collection, ie data which is not connected to the >>>>>>> batch-load-job that is triggered: >>>>>>> private WriteResult writeResult(Pipeline p) { >>>>>>> PCollection<TableRow> empty = >>>>>>> p.apply("CreateEmptyFailedInserts", >>>>>>> Create.empty(TypeDescriptor.of(TableRow.class))); >>>>>>> return WriteResult.in(p, new TupleTag<>("failedInserts"), empty); >>>>>>> } >>>>>>> >>>>>>> Note that BatchLoads does "synchronously" invoke BigQuery load jobs; >>>>>>> once a job is submitted the code repeatedly polls the job status until >>>>>>> it reaches DONE or FAILED. However that information does not appear to >>>>>>> be exposed anywhere (unlike streaming which effectively exposes >>>>>>> completion-state via the failedInserts stream). >>>>>>> >>>>>>> If I have misunderstood something, corrections welcome! If not, >>>>>>> suggestions for workarounds or alternate solutions are also welcome :-) >>>>>>> >>>>>>> Thanks, >>>>>>> Simon >>>>>>>
-- Loans are funded by FinWise Bank, a Utah-chartered bank located in Sandy, Utah, member FDIC, Equal Opportunity Lender. Merchant Cash Advances are made by Behalf. For more information on ECOA, click here <https://www.behalf.com/legal/ecoa/>. For important information about opening a new account, review Patriot Act procedures here <https://www.behalf.com/legal/patriot/>. Visit Legal <https://www.behalf.com/legal/> to review our comprehensive program terms, conditions, and disclosures.
