Hi Carlos,

Any updates / roadblocks you hit?

On Tue, Jul 3, 2018 at 7:13 AM Eugene Kirpichov <kirpic...@google.com>
wrote:

> Awesome!! Thanks for the heads up, very exciting, this is going to make a
> lot of people happy :)
>
> On Tue, Jul 3, 2018, 3:40 AM Carlos Alonso <car...@mrcalonso.com> wrote:
>
>> + dev@beam.apache.org
>>
>> Just a quick email to let you know that I'm starting developing this.
>>
>> On Fri, Apr 20, 2018 at 10:30 PM Eugene Kirpichov <kirpic...@google.com>
>> wrote:
>>
>>> Hi Carlos,
>>>
>>> Thank you for expressing interest in taking this on! Let me give you a
>>> few pointers to start, and I'll be happy to help everywhere along the way.
>>>
>>> Basically we want BigQueryIO.write() to return something (e.g. a
>>> PCollection) that can be used as input to Wait.on().
>>> Currently it returns a WriteResult, which only contains a
>>> PCollection<TableRow> of failed inserts - that one can not be used
>>> directly, instead we should add another component to WriteResult that
>>> represents the result of successfully writing some data.
>>>
>>> Given that BQIO supports dynamic destination writes, I think it makes
>>> sense for that to be a PCollection<KV<DestinationT, ???>> so that in theory
>>> we could sequence different destinations independently (currently Wait.on()
>>> does not provide such a feature, but it could); and it will require
>>> changing WriteResult to be WriteResult<DestinationT>. As for what the "???"
>>> might be - it is something that represents the result of successfully
>>> writing a window of data. I think it can even be Void, or "?" (wildcard
>>> type) for now, until we figure out something better.
>>>
>>> Implementing this would require roughly the following work:
>>> - Add this PCollection<KV<DestinationT, ?>> to WriteResult
>>> - Modify the BatchLoads transform to provide it on both codepaths:
>>> expandTriggered() and expandUntriggered()
>>> ...- expandTriggered() itself writes via 2 codepaths: single-partition
>>> and multi-partition. Both need to be handled - we need to get a
>>> PCollection<KV<DestinationT, ?>> from each of them, and Flatten these two
>>> PCollections together to get the final result. The single-partition
>>> codepath (writeSinglePartition) under the hood already uses WriteTables
>>> that returns a KV<DestinationT, ...> so it's directly usable. The
>>> multi-partition codepath ends in WriteRenameTriggered - unfortunately, this
>>> codepath drops DestinationT along the way and will need to be refactored a
>>> bit to keep it until the end.
>>> ...- expandUntriggered() should be treated the same way.
>>> - Modify the StreamingWriteTables transform to provide it
>>> ...- Here also, the challenge is to propagate the DestinationT type all
>>> the way until the end of StreamingWriteTables - it will need to be
>>> refactored. After such a refactoring, returning a KV<DestinationT, ...>
>>> should be easy.
>>>
>>> Another challenge with all of this is backwards compatibility in terms
>>> of API and pipeline update.
>>> Pipeline update is much less of a concern for the BatchLoads codepath,
>>> because it's typically used in batch-mode pipelines that don't get updated.
>>> I would recommend to start with this, perhaps even with only the
>>> untriggered codepath (it is much more commonly used) - that will pave the
>>> way for future work.
>>>
>>> Hope this helps, please ask more if something is unclear!
>>>
>>> On Fri, Apr 20, 2018 at 12:48 AM Carlos Alonso <car...@mrcalonso.com>
>>> wrote:
>>>
>>>> Hey Eugene!!
>>>>
>>>> I’d gladly take a stab on it although I’m not sure how much available
>>>> time I might have to put into but... yeah, let’s try it.
>>>>
>>>> Where should I begin? Is there a Jira issue or shall I file one?
>>>>
>>>> Thanks!
>>>> On Thu, 12 Apr 2018 at 00:41, Eugene Kirpichov <kirpic...@google.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> Yes, you're both right - BigQueryIO.write() is currently not
>>>>> implemented in a way that it can be used with Wait.on(). It would 
>>>>> certainly
>>>>> be a welcome contribution to change this - many people expressed interest
>>>>> in specifically waiting for BigQuery writes. Is any of you interested in
>>>>> helping out?
>>>>>
>>>>> Thanks.
>>>>>
>>>>> On Fri, Apr 6, 2018 at 12:36 AM Carlos Alonso <car...@mrcalonso.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Simon, I think your explanation was very accurate, at least to my
>>>>>> understanding. I'd also be interested in getting batch load result's
>>>>>> feedback on the pipeline... hopefully someone may suggest something,
>>>>>> otherwise we could propose submitting a Jira, or even better, a PR!! :)
>>>>>>
>>>>>> Thanks!
>>>>>>
>>>>>> On Thu, Apr 5, 2018 at 2:01 PM Simon Kitching <
>>>>>> simon.kitch...@unbelievable-machine.com> wrote:
>>>>>>
>>>>>>> Hi All,
>>>>>>>
>>>>>>> I need to write some data to BigQuery (batch-mode) and then send a
>>>>>>> Pubsub message to trigger further processing.
>>>>>>>
>>>>>>> I found this thread titled "Callbacks/other functions run after a
>>>>>>> PDone/output transform" on the user-list which was very relevant:
>>>>>>>
>>>>>>> https://lists.apache.org/thread.html/ddcdf93604396b1cbcacdff49aba60817dc90ee7c8434725ea0d26c0@%3Cuser.beam.apache.org%3E
>>>>>>>
>>>>>>> Thanks to the author of the Wait transform (Beam 2.4.0)!
>>>>>>>
>>>>>>> Unfortunately, it appears that the Wait.on transform does not work
>>>>>>> with BiqQueryIO in FILE_LOADS mode - or at least I cannot get it to 
>>>>>>> work.
>>>>>>> Advice appreciated.
>>>>>>>
>>>>>>> Here's (most of) the relevant test code:
>>>>>>>         Pipeline p = Pipeline.create(options);
>>>>>>>         PCollection<String> lines = p.apply("Read Input",
>>>>>>> Create.of("line1", "line2", "line3", "line4"));
>>>>>>>
>>>>>>>         TableFieldSchema f1 = new
>>>>>>> TableFieldSchema().setName("value").setType("string");
>>>>>>>         TableSchema s2 = new
>>>>>>> TableSchema().setFields(Collections.singletonList(f1));
>>>>>>>
>>>>>>>         WriteResult writeResult = lines.apply("Write and load data",
>>>>>>> BigQueryIO.<String>write() //
>>>>>>>                 .to(options.getTableSpec()) //
>>>>>>>                 .withFormatFunction(new SlowFormatter()) //
>>>>>>>                 .withMethod(BigQueryIO.Write.Method.FILE_LOADS) //
>>>>>>> //
>>>>>>> .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS) //
>>>>>>>                 .withSchema(s2)
>>>>>>>
>>>>>>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
>>>>>>> //
>>>>>>>
>>>>>>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
>>>>>>>
>>>>>>>
>>>>>>> lines.apply(Wait.on(writeResult.getFailedInserts())).apply(ParDo.of(new
>>>>>>> OnCompletion()));
>>>>>>>
>>>>>>> where
>>>>>>> + format-function "SlowFormatter" prints out each line and has a
>>>>>>> small sleep for testing purposes, and
>>>>>>> + DoFn OnCompletion just prints out the contents of each line
>>>>>>>
>>>>>>> In production code, OnCompletion would be fed some collection
>>>>>>> derived from lines, eg min/max record id, and the operation would be 
>>>>>>> "send
>>>>>>> pubsub message" rather than print..
>>>>>>>
>>>>>>> My expectation is that the "SlowFormatter" would run for each line,
>>>>>>> then the data would be uploaded, then OnCompletion would print each 
>>>>>>> line.
>>>>>>> And indeed that happens when STREAMING_INSERTS is used. However for
>>>>>>> FILE_LOADS, LinePrinter runs before the upload takes place.
>>>>>>>
>>>>>>> I use WriteResult.getFailedInserts as that is the only "output" that
>>>>>>> BiqQueryIO.write() generates AFAICT. I don't expect any failed records, 
>>>>>>> but
>>>>>>> believe that it can be used as a "signal" for the Wait.on - ie the 
>>>>>>> output
>>>>>>> is "complete for window" only after all data has been uploaded, which is
>>>>>>> what I need. And that does seem to work for STREAMING_LOADS.
>>>>>>>
>>>>>>> I suspect the reason that this does not work for FILE_LOADS is that
>>>>>>> method BatchLoads.writeResult returns a WriteResult that wraps an 
>>>>>>> "empty"
>>>>>>> failedInserts collection, ie data which is not connected to the
>>>>>>> batch-load-job that is triggered:
>>>>>>>   private WriteResult writeResult(Pipeline p) {
>>>>>>>     PCollection<TableRow> empty =
>>>>>>>         p.apply("CreateEmptyFailedInserts",
>>>>>>> Create.empty(TypeDescriptor.of(TableRow.class)));
>>>>>>>     return WriteResult.in(p, new TupleTag<>("failedInserts"), empty);
>>>>>>>   }
>>>>>>>
>>>>>>> Note that BatchLoads does "synchronously" invoke BigQuery load jobs;
>>>>>>> once a job is submitted the code repeatedly polls the job status until 
>>>>>>> it
>>>>>>> reaches DONE or FAILED. However that information does not appear to be
>>>>>>> exposed anywhere (unlike streaming which effectively exposes
>>>>>>> completion-state via the failedInserts stream).
>>>>>>>
>>>>>>> If I have misunderstood something, corrections welcome! If not,
>>>>>>> suggestions for workarounds or alternate solutions are also welcome :-)
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Simon
>>>>>>>
>>>>>>>

Reply via email to