Awesome!! Thanks for the heads up, very exciting, this is going to make a
lot of people happy :)

On Tue, Jul 3, 2018, 3:40 AM Carlos Alonso <> wrote:

> +
> Just a quick email to let you know that I'm starting developing this.
> On Fri, Apr 20, 2018 at 10:30 PM Eugene Kirpichov <>
> wrote:
>> Hi Carlos,
>> Thank you for expressing interest in taking this on! Let me give you a
>> few pointers to start, and I'll be happy to help everywhere along the way.
>> Basically we want BigQueryIO.write() to return something (e.g. a
>> PCollection) that can be used as input to Wait.on().
>> Currently it returns a WriteResult, which only contains a
>> PCollection<TableRow> of failed inserts - that one can not be used
>> directly, instead we should add another component to WriteResult that
>> represents the result of successfully writing some data.
>> Given that BQIO supports dynamic destination writes, I think it makes
>> sense for that to be a PCollection<KV<DestinationT, ???>> so that in theory
>> we could sequence different destinations independently (currently Wait.on()
>> does not provide such a feature, but it could); and it will require
>> changing WriteResult to be WriteResult<DestinationT>. As for what the "???"
>> might be - it is something that represents the result of successfully
>> writing a window of data. I think it can even be Void, or "?" (wildcard
>> type) for now, until we figure out something better.
>> Implementing this would require roughly the following work:
>> - Add this PCollection<KV<DestinationT, ?>> to WriteResult
>> - Modify the BatchLoads transform to provide it on both codepaths:
>> expandTriggered() and expandUntriggered()
>> ...- expandTriggered() itself writes via 2 codepaths: single-partition
>> and multi-partition. Both need to be handled - we need to get a
>> PCollection<KV<DestinationT, ?>> from each of them, and Flatten these two
>> PCollections together to get the final result. The single-partition
>> codepath (writeSinglePartition) under the hood already uses WriteTables
>> that returns a KV<DestinationT, ...> so it's directly usable. The
>> multi-partition codepath ends in WriteRenameTriggered - unfortunately, this
>> codepath drops DestinationT along the way and will need to be refactored a
>> bit to keep it until the end.
>> ...- expandUntriggered() should be treated the same way.
>> - Modify the StreamingWriteTables transform to provide it
>> ...- Here also, the challenge is to propagate the DestinationT type all
>> the way until the end of StreamingWriteTables - it will need to be
>> refactored. After such a refactoring, returning a KV<DestinationT, ...>
>> should be easy.
>> Another challenge with all of this is backwards compatibility in terms of
>> API and pipeline update.
>> Pipeline update is much less of a concern for the BatchLoads codepath,
>> because it's typically used in batch-mode pipelines that don't get updated.
>> I would recommend to start with this, perhaps even with only the
>> untriggered codepath (it is much more commonly used) - that will pave the
>> way for future work.
>> Hope this helps, please ask more if something is unclear!
>> On Fri, Apr 20, 2018 at 12:48 AM Carlos Alonso <>
>> wrote:
>>> Hey Eugene!!
>>> I’d gladly take a stab on it although I’m not sure how much available
>>> time I might have to put into but... yeah, let’s try it.
>>> Where should I begin? Is there a Jira issue or shall I file one?
>>> Thanks!
>>> On Thu, 12 Apr 2018 at 00:41, Eugene Kirpichov <>
>>> wrote:
>>>> Hi,
>>>> Yes, you're both right - BigQueryIO.write() is currently not
>>>> implemented in a way that it can be used with Wait.on(). It would certainly
>>>> be a welcome contribution to change this - many people expressed interest
>>>> in specifically waiting for BigQuery writes. Is any of you interested in
>>>> helping out?
>>>> Thanks.
>>>> On Fri, Apr 6, 2018 at 12:36 AM Carlos Alonso <>
>>>> wrote:
>>>>> Hi Simon, I think your explanation was very accurate, at least to my
>>>>> understanding. I'd also be interested in getting batch load result's
>>>>> feedback on the pipeline... hopefully someone may suggest something,
>>>>> otherwise we could propose submitting a Jira, or even better, a PR!! :)
>>>>> Thanks!
>>>>> On Thu, Apr 5, 2018 at 2:01 PM Simon Kitching <
>>>>>> wrote:
>>>>>> Hi All,
>>>>>> I need to write some data to BigQuery (batch-mode) and then send a
>>>>>> Pubsub message to trigger further processing.
>>>>>> I found this thread titled "Callbacks/other functions run after a
>>>>>> PDone/output transform" on the user-list which was very relevant:
>>>>>> Thanks to the author of the Wait transform (Beam 2.4.0)!
>>>>>> Unfortunately, it appears that the Wait.on transform does not work
>>>>>> with BiqQueryIO in FILE_LOADS mode - or at least I cannot get it to work.
>>>>>> Advice appreciated.
>>>>>> Here's (most of) the relevant test code:
>>>>>>         Pipeline p = Pipeline.create(options);
>>>>>>         PCollection<String> lines = p.apply("Read Input",
>>>>>> Create.of("line1", "line2", "line3", "line4"));
>>>>>>         TableFieldSchema f1 = new
>>>>>> TableFieldSchema().setName("value").setType("string");
>>>>>>         TableSchema s2 = new
>>>>>> TableSchema().setFields(Collections.singletonList(f1));
>>>>>>         WriteResult writeResult = lines.apply("Write and load data",
>>>>>> BigQueryIO.<String>write() //
>>>>>>                 .to(options.getTableSpec()) //
>>>>>>                 .withFormatFunction(new SlowFormatter()) //
>>>>>>                 .withMethod(BigQueryIO.Write.Method.FILE_LOADS) //
>>>>>> //
>>>>>> .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS) //
>>>>>>                 .withSchema(s2)
>>>>>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
>>>>>> //
>>>>>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
>>>>>> lines.apply(Wait.on(writeResult.getFailedInserts())).apply(ParDo.of(new
>>>>>> OnCompletion()));
>>>>>> where
>>>>>> + format-function "SlowFormatter" prints out each line and has a
>>>>>> small sleep for testing purposes, and
>>>>>> + DoFn OnCompletion just prints out the contents of each line
>>>>>> In production code, OnCompletion would be fed some collection derived
>>>>>> from lines, eg min/max record id, and the operation would be "send pubsub
>>>>>> message" rather than print..
>>>>>> My expectation is that the "SlowFormatter" would run for each line,
>>>>>> then the data would be uploaded, then OnCompletion would print each line.
>>>>>> And indeed that happens when STREAMING_INSERTS is used. However for
>>>>>> FILE_LOADS, LinePrinter runs before the upload takes place.
>>>>>> I use WriteResult.getFailedInserts as that is the only "output" that
>>>>>> BiqQueryIO.write() generates AFAICT. I don't expect any failed records, 
>>>>>> but
>>>>>> believe that it can be used as a "signal" for the Wait.on - ie the output
>>>>>> is "complete for window" only after all data has been uploaded, which is
>>>>>> what I need. And that does seem to work for STREAMING_LOADS.
>>>>>> I suspect the reason that this does not work for FILE_LOADS is that
>>>>>> method BatchLoads.writeResult returns a WriteResult that wraps an "empty"
>>>>>> failedInserts collection, ie data which is not connected to the
>>>>>> batch-load-job that is triggered:
>>>>>>   private WriteResult writeResult(Pipeline p) {
>>>>>>     PCollection<TableRow> empty =
>>>>>>         p.apply("CreateEmptyFailedInserts",
>>>>>> Create.empty(TypeDescriptor.of(TableRow.class)));
>>>>>>     return, new TupleTag<>("failedInserts"), empty);
>>>>>>   }
>>>>>> Note that BatchLoads does "synchronously" invoke BigQuery load jobs;
>>>>>> once a job is submitted the code repeatedly polls the job status until it
>>>>>> reaches DONE or FAILED. However that information does not appear to be
>>>>>> exposed anywhere (unlike streaming which effectively exposes
>>>>>> completion-state via the failedInserts stream).
>>>>>> If I have misunderstood something, corrections welcome! If not,
>>>>>> suggestions for workarounds or alternate solutions are also welcome :-)
>>>>>> Thanks,
>>>>>> Simon

Reply via email to