All good so far. I've been a bit side tracked but more or less I have the
idea of using the JobStatus as part of the collection so that not only the
completion is signaled, but also the result (success/failure) can be
accessed, how does it sound?

Regards

On Tue, Jul 17, 2018 at 3:07 AM Eugene Kirpichov <kirpic...@google.com>
wrote:

> Hi Carlos,
>
> Any updates / roadblocks you hit?
>
>
> On Tue, Jul 3, 2018 at 7:13 AM Eugene Kirpichov <kirpic...@google.com>
> wrote:
>
>> Awesome!! Thanks for the heads up, very exciting, this is going to make a
>> lot of people happy :)
>>
>> On Tue, Jul 3, 2018, 3:40 AM Carlos Alonso <car...@mrcalonso.com> wrote:
>>
>>> + dev@beam.apache.org
>>>
>>> Just a quick email to let you know that I'm starting developing this.
>>>
>>> On Fri, Apr 20, 2018 at 10:30 PM Eugene Kirpichov <kirpic...@google.com>
>>> wrote:
>>>
>>>> Hi Carlos,
>>>>
>>>> Thank you for expressing interest in taking this on! Let me give you a
>>>> few pointers to start, and I'll be happy to help everywhere along the way.
>>>>
>>>> Basically we want BigQueryIO.write() to return something (e.g. a
>>>> PCollection) that can be used as input to Wait.on().
>>>> Currently it returns a WriteResult, which only contains a
>>>> PCollection<TableRow> of failed inserts - that one can not be used
>>>> directly, instead we should add another component to WriteResult that
>>>> represents the result of successfully writing some data.
>>>>
>>>> Given that BQIO supports dynamic destination writes, I think it makes
>>>> sense for that to be a PCollection<KV<DestinationT, ???>> so that in theory
>>>> we could sequence different destinations independently (currently Wait.on()
>>>> does not provide such a feature, but it could); and it will require
>>>> changing WriteResult to be WriteResult<DestinationT>. As for what the "???"
>>>> might be - it is something that represents the result of successfully
>>>> writing a window of data. I think it can even be Void, or "?" (wildcard
>>>> type) for now, until we figure out something better.
>>>>
>>>> Implementing this would require roughly the following work:
>>>> - Add this PCollection<KV<DestinationT, ?>> to WriteResult
>>>> - Modify the BatchLoads transform to provide it on both codepaths:
>>>> expandTriggered() and expandUntriggered()
>>>> ...- expandTriggered() itself writes via 2 codepaths: single-partition
>>>> and multi-partition. Both need to be handled - we need to get a
>>>> PCollection<KV<DestinationT, ?>> from each of them, and Flatten these two
>>>> PCollections together to get the final result. The single-partition
>>>> codepath (writeSinglePartition) under the hood already uses WriteTables
>>>> that returns a KV<DestinationT, ...> so it's directly usable. The
>>>> multi-partition codepath ends in WriteRenameTriggered - unfortunately, this
>>>> codepath drops DestinationT along the way and will need to be refactored a
>>>> bit to keep it until the end.
>>>> ...- expandUntriggered() should be treated the same way.
>>>> - Modify the StreamingWriteTables transform to provide it
>>>> ...- Here also, the challenge is to propagate the DestinationT type all
>>>> the way until the end of StreamingWriteTables - it will need to be
>>>> refactored. After such a refactoring, returning a KV<DestinationT, ...>
>>>> should be easy.
>>>>
>>>> Another challenge with all of this is backwards compatibility in terms
>>>> of API and pipeline update.
>>>> Pipeline update is much less of a concern for the BatchLoads codepath,
>>>> because it's typically used in batch-mode pipelines that don't get updated.
>>>> I would recommend to start with this, perhaps even with only the
>>>> untriggered codepath (it is much more commonly used) - that will pave the
>>>> way for future work.
>>>>
>>>> Hope this helps, please ask more if something is unclear!
>>>>
>>>> On Fri, Apr 20, 2018 at 12:48 AM Carlos Alonso <car...@mrcalonso.com>
>>>> wrote:
>>>>
>>>>> Hey Eugene!!
>>>>>
>>>>> I’d gladly take a stab on it although I’m not sure how much available
>>>>> time I might have to put into but... yeah, let’s try it.
>>>>>
>>>>> Where should I begin? Is there a Jira issue or shall I file one?
>>>>>
>>>>> Thanks!
>>>>> On Thu, 12 Apr 2018 at 00:41, Eugene Kirpichov <kirpic...@google.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> Yes, you're both right - BigQueryIO.write() is currently not
>>>>>> implemented in a way that it can be used with Wait.on(). It would 
>>>>>> certainly
>>>>>> be a welcome contribution to change this - many people expressed interest
>>>>>> in specifically waiting for BigQuery writes. Is any of you interested in
>>>>>> helping out?
>>>>>>
>>>>>> Thanks.
>>>>>>
>>>>>> On Fri, Apr 6, 2018 at 12:36 AM Carlos Alonso <car...@mrcalonso.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Simon, I think your explanation was very accurate, at least to my
>>>>>>> understanding. I'd also be interested in getting batch load result's
>>>>>>> feedback on the pipeline... hopefully someone may suggest something,
>>>>>>> otherwise we could propose submitting a Jira, or even better, a PR!! :)
>>>>>>>
>>>>>>> Thanks!
>>>>>>>
>>>>>>> On Thu, Apr 5, 2018 at 2:01 PM Simon Kitching <
>>>>>>> simon.kitch...@unbelievable-machine.com> wrote:
>>>>>>>
>>>>>>>> Hi All,
>>>>>>>>
>>>>>>>> I need to write some data to BigQuery (batch-mode) and then send a
>>>>>>>> Pubsub message to trigger further processing.
>>>>>>>>
>>>>>>>> I found this thread titled "Callbacks/other functions run after a
>>>>>>>> PDone/output transform" on the user-list which was very relevant:
>>>>>>>>
>>>>>>>> https://lists.apache.org/thread.html/ddcdf93604396b1cbcacdff49aba60817dc90ee7c8434725ea0d26c0@%3Cuser.beam.apache.org%3E
>>>>>>>>
>>>>>>>> Thanks to the author of the Wait transform (Beam 2.4.0)!
>>>>>>>>
>>>>>>>> Unfortunately, it appears that the Wait.on transform does not work
>>>>>>>> with BiqQueryIO in FILE_LOADS mode - or at least I cannot get it to 
>>>>>>>> work.
>>>>>>>> Advice appreciated.
>>>>>>>>
>>>>>>>> Here's (most of) the relevant test code:
>>>>>>>>         Pipeline p = Pipeline.create(options);
>>>>>>>>         PCollection<String> lines = p.apply("Read Input",
>>>>>>>> Create.of("line1", "line2", "line3", "line4"));
>>>>>>>>
>>>>>>>>         TableFieldSchema f1 = new
>>>>>>>> TableFieldSchema().setName("value").setType("string");
>>>>>>>>         TableSchema s2 = new
>>>>>>>> TableSchema().setFields(Collections.singletonList(f1));
>>>>>>>>
>>>>>>>>         WriteResult writeResult = lines.apply("Write and load
>>>>>>>> data", BigQueryIO.<String>write() //
>>>>>>>>                 .to(options.getTableSpec()) //
>>>>>>>>                 .withFormatFunction(new SlowFormatter()) //
>>>>>>>>                 .withMethod(BigQueryIO.Write.Method.FILE_LOADS) //
>>>>>>>> //
>>>>>>>> .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS) //
>>>>>>>>                 .withSchema(s2)
>>>>>>>>
>>>>>>>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
>>>>>>>> //
>>>>>>>>
>>>>>>>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
>>>>>>>>
>>>>>>>>
>>>>>>>> lines.apply(Wait.on(writeResult.getFailedInserts())).apply(ParDo.of(new
>>>>>>>> OnCompletion()));
>>>>>>>>
>>>>>>>> where
>>>>>>>> + format-function "SlowFormatter" prints out each line and has a
>>>>>>>> small sleep for testing purposes, and
>>>>>>>> + DoFn OnCompletion just prints out the contents of each line
>>>>>>>>
>>>>>>>> In production code, OnCompletion would be fed some collection
>>>>>>>> derived from lines, eg min/max record id, and the operation would be 
>>>>>>>> "send
>>>>>>>> pubsub message" rather than print..
>>>>>>>>
>>>>>>>> My expectation is that the "SlowFormatter" would run for each line,
>>>>>>>> then the data would be uploaded, then OnCompletion would print each 
>>>>>>>> line.
>>>>>>>> And indeed that happens when STREAMING_INSERTS is used. However for
>>>>>>>> FILE_LOADS, LinePrinter runs before the upload takes place.
>>>>>>>>
>>>>>>>> I use WriteResult.getFailedInserts as that is the only "output"
>>>>>>>> that BiqQueryIO.write() generates AFAICT. I don't expect any failed
>>>>>>>> records, but believe that it can be used as a "signal" for the Wait.on 
>>>>>>>> - ie
>>>>>>>> the output is "complete for window" only after all data has been 
>>>>>>>> uploaded,
>>>>>>>> which is what I need. And that does seem to work for STREAMING_LOADS.
>>>>>>>>
>>>>>>>> I suspect the reason that this does not work for FILE_LOADS is that
>>>>>>>> method BatchLoads.writeResult returns a WriteResult that wraps an 
>>>>>>>> "empty"
>>>>>>>> failedInserts collection, ie data which is not connected to the
>>>>>>>> batch-load-job that is triggered:
>>>>>>>>   private WriteResult writeResult(Pipeline p) {
>>>>>>>>     PCollection<TableRow> empty =
>>>>>>>>         p.apply("CreateEmptyFailedInserts",
>>>>>>>> Create.empty(TypeDescriptor.of(TableRow.class)));
>>>>>>>>     return WriteResult.in(p, new TupleTag<>("failedInserts"),
>>>>>>>> empty);
>>>>>>>>   }
>>>>>>>>
>>>>>>>> Note that BatchLoads does "synchronously" invoke BigQuery load
>>>>>>>> jobs; once a job is submitted the code repeatedly polls the job status
>>>>>>>> until it reaches DONE or FAILED. However that information does not 
>>>>>>>> appear
>>>>>>>> to be exposed anywhere (unlike streaming which effectively exposes
>>>>>>>> completion-state via the failedInserts stream).
>>>>>>>>
>>>>>>>> If I have misunderstood something, corrections welcome! If not,
>>>>>>>> suggestions for workarounds or alternate solutions are also welcome :-)
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Simon
>>>>>>>>
>>>>>>>>

Reply via email to