yes, i must say i have been waiting for this for over 6 months, it
would help a lot
chaim
On Tue, Jul 3, 2018 at 5:14 PM Eugene Kirpichov <kirpic...@google.com> wrote:
>
> Awesome!! Thanks for the heads up, very exciting, this is going to make a lot 
> of people happy :)
>
> On Tue, Jul 3, 2018, 3:40 AM Carlos Alonso <car...@mrcalonso.com> wrote:
>>
>> + dev@beam.apache.org
>>
>> Just a quick email to let you know that I'm starting developing this.
>>
>> On Fri, Apr 20, 2018 at 10:30 PM Eugene Kirpichov <kirpic...@google.com> 
>> wrote:
>>>
>>> Hi Carlos,
>>>
>>> Thank you for expressing interest in taking this on! Let me give you a few 
>>> pointers to start, and I'll be happy to help everywhere along the way.
>>>
>>> Basically we want BigQueryIO.write() to return something (e.g. a 
>>> PCollection) that can be used as input to Wait.on().
>>> Currently it returns a WriteResult, which only contains a 
>>> PCollection<TableRow> of failed inserts - that one can not be used 
>>> directly, instead we should add another component to WriteResult that 
>>> represents the result of successfully writing some data.
>>>
>>> Given that BQIO supports dynamic destination writes, I think it makes sense 
>>> for that to be a PCollection<KV<DestinationT, ???>> so that in theory we 
>>> could sequence different destinations independently (currently Wait.on() 
>>> does not provide such a feature, but it could); and it will require 
>>> changing WriteResult to be WriteResult<DestinationT>. As for what the "???" 
>>> might be - it is something that represents the result of successfully 
>>> writing a window of data. I think it can even be Void, or "?" (wildcard 
>>> type) for now, until we figure out something better.
>>>
>>> Implementing this would require roughly the following work:
>>> - Add this PCollection<KV<DestinationT, ?>> to WriteResult
>>> - Modify the BatchLoads transform to provide it on both codepaths: 
>>> expandTriggered() and expandUntriggered()
>>> ...- expandTriggered() itself writes via 2 codepaths: single-partition and 
>>> multi-partition. Both need to be handled - we need to get a 
>>> PCollection<KV<DestinationT, ?>> from each of them, and Flatten these two 
>>> PCollections together to get the final result. The single-partition 
>>> codepath (writeSinglePartition) under the hood already uses WriteTables 
>>> that returns a KV<DestinationT, ...> so it's directly usable. The 
>>> multi-partition codepath ends in WriteRenameTriggered - unfortunately, this 
>>> codepath drops DestinationT along the way and will need to be refactored a 
>>> bit to keep it until the end.
>>> ...- expandUntriggered() should be treated the same way.
>>> - Modify the StreamingWriteTables transform to provide it
>>> ...- Here also, the challenge is to propagate the DestinationT type all the 
>>> way until the end of StreamingWriteTables - it will need to be refactored. 
>>> After such a refactoring, returning a KV<DestinationT, ...> should be easy.
>>>
>>> Another challenge with all of this is backwards compatibility in terms of 
>>> API and pipeline update.
>>> Pipeline update is much less of a concern for the BatchLoads codepath, 
>>> because it's typically used in batch-mode pipelines that don't get updated. 
>>> I would recommend to start with this, perhaps even with only the 
>>> untriggered codepath (it is much more commonly used) - that will pave the 
>>> way for future work.
>>>
>>> Hope this helps, please ask more if something is unclear!
>>>
>>> On Fri, Apr 20, 2018 at 12:48 AM Carlos Alonso <car...@mrcalonso.com> wrote:
>>>>
>>>> Hey Eugene!!
>>>>
>>>> I’d gladly take a stab on it although I’m not sure how much available time 
>>>> I might have to put into but... yeah, let’s try it.
>>>>
>>>> Where should I begin? Is there a Jira issue or shall I file one?
>>>>
>>>> Thanks!
>>>> On Thu, 12 Apr 2018 at 00:41, Eugene Kirpichov <kirpic...@google.com> 
>>>> wrote:
>>>>>
>>>>> Hi,
>>>>>
>>>>> Yes, you're both right - BigQueryIO.write() is currently not implemented 
>>>>> in a way that it can be used with Wait.on(). It would certainly be a 
>>>>> welcome contribution to change this - many people expressed interest in 
>>>>> specifically waiting for BigQuery writes. Is any of you interested in 
>>>>> helping out?
>>>>>
>>>>> Thanks.
>>>>>
>>>>> On Fri, Apr 6, 2018 at 12:36 AM Carlos Alonso <car...@mrcalonso.com> 
>>>>> wrote:
>>>>>>
>>>>>> Hi Simon, I think your explanation was very accurate, at least to my 
>>>>>> understanding. I'd also be interested in getting batch load result's 
>>>>>> feedback on the pipeline... hopefully someone may suggest something, 
>>>>>> otherwise we could propose submitting a Jira, or even better, a PR!! :)
>>>>>>
>>>>>> Thanks!
>>>>>>
>>>>>> On Thu, Apr 5, 2018 at 2:01 PM Simon Kitching 
>>>>>> <simon.kitch...@unbelievable-machine.com> wrote:
>>>>>>>
>>>>>>> Hi All,
>>>>>>>
>>>>>>> I need to write some data to BigQuery (batch-mode) and then send a 
>>>>>>> Pubsub message to trigger further processing.
>>>>>>>
>>>>>>> I found this thread titled "Callbacks/other functions run after a 
>>>>>>> PDone/output transform" on the user-list which was very relevant:
>>>>>>>   
>>>>>>> https://lists.apache.org/thread.html/ddcdf93604396b1cbcacdff49aba60817dc90ee7c8434725ea0d26c0@%3Cuser.beam.apache.org%3E
>>>>>>>
>>>>>>> Thanks to the author of the Wait transform (Beam 2.4.0)!
>>>>>>>
>>>>>>> Unfortunately, it appears that the Wait.on transform does not work with 
>>>>>>> BiqQueryIO in FILE_LOADS mode - or at least I cannot get it to work. 
>>>>>>> Advice appreciated.
>>>>>>>
>>>>>>> Here's (most of) the relevant test code:
>>>>>>>         Pipeline p = Pipeline.create(options);
>>>>>>>         PCollection<String> lines = p.apply("Read Input", 
>>>>>>> Create.of("line1", "line2", "line3", "line4"));
>>>>>>>
>>>>>>>         TableFieldSchema f1 = new 
>>>>>>> TableFieldSchema().setName("value").setType("string");
>>>>>>>         TableSchema s2 = new 
>>>>>>> TableSchema().setFields(Collections.singletonList(f1));
>>>>>>>
>>>>>>>         WriteResult writeResult = lines.apply("Write and load data", 
>>>>>>> BigQueryIO.<String>write() //
>>>>>>>                 .to(options.getTableSpec()) //
>>>>>>>                 .withFormatFunction(new SlowFormatter()) //
>>>>>>>                 .withMethod(BigQueryIO.Write.Method.FILE_LOADS) //
>>>>>>> //                
>>>>>>> .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS) //
>>>>>>>                 .withSchema(s2)
>>>>>>>                 
>>>>>>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
>>>>>>>  //
>>>>>>>                 
>>>>>>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
>>>>>>>
>>>>>>>         
>>>>>>> lines.apply(Wait.on(writeResult.getFailedInserts())).apply(ParDo.of(new 
>>>>>>> OnCompletion()));
>>>>>>>
>>>>>>> where
>>>>>>> + format-function "SlowFormatter" prints out each line and has a small 
>>>>>>> sleep for testing purposes, and
>>>>>>> + DoFn OnCompletion just prints out the contents of each line
>>>>>>>
>>>>>>> In production code, OnCompletion would be fed some collection derived 
>>>>>>> from lines, eg min/max record id, and the operation would be "send 
>>>>>>> pubsub message" rather than print..
>>>>>>>
>>>>>>> My expectation is that the "SlowFormatter" would run for each line, 
>>>>>>> then the data would be uploaded, then OnCompletion would print each 
>>>>>>> line. And indeed that happens when STREAMING_INSERTS is used. However 
>>>>>>> for FILE_LOADS, LinePrinter runs before the upload takes place.
>>>>>>>
>>>>>>> I use WriteResult.getFailedInserts as that is the only "output" that 
>>>>>>> BiqQueryIO.write() generates AFAICT. I don't expect any failed records, 
>>>>>>> but believe that it can be used as a "signal" for the Wait.on - ie the 
>>>>>>> output is "complete for window" only after all data has been uploaded, 
>>>>>>> which is what I need. And that does seem to work for STREAMING_LOADS.
>>>>>>>
>>>>>>> I suspect the reason that this does not work for FILE_LOADS is that 
>>>>>>> method BatchLoads.writeResult returns a WriteResult that wraps an 
>>>>>>> "empty" failedInserts collection, ie data which is not connected to the 
>>>>>>> batch-load-job that is triggered:
>>>>>>>   private WriteResult writeResult(Pipeline p) {
>>>>>>>     PCollection<TableRow> empty =
>>>>>>>         p.apply("CreateEmptyFailedInserts", 
>>>>>>> Create.empty(TypeDescriptor.of(TableRow.class)));
>>>>>>>     return WriteResult.in(p, new TupleTag<>("failedInserts"), empty);
>>>>>>>   }
>>>>>>>
>>>>>>> Note that BatchLoads does "synchronously" invoke BigQuery load jobs; 
>>>>>>> once a job is submitted the code repeatedly polls the job status until 
>>>>>>> it reaches DONE or FAILED. However that information does not appear to 
>>>>>>> be exposed anywhere (unlike streaming which effectively exposes 
>>>>>>> completion-state via the failedInserts stream).
>>>>>>>
>>>>>>> If I have misunderstood something, corrections welcome! If not, 
>>>>>>> suggestions for workarounds or alternate solutions are also welcome :-)
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Simon
>>>>>>>

-- 


Loans are funded by
FinWise Bank, a Utah-chartered bank located in Sandy, 
Utah, member FDIC, Equal
Opportunity Lender. Merchant Cash Advances are 
made by Behalf. For more
information on ECOA, click here 
<https://www.behalf.com/legal/ecoa/>. For important information about 
opening a new
account, review Patriot Act procedures here 
<https://www.behalf.com/legal/patriot/>.
Visit Legal 
<https://www.behalf.com/legal/> to
review our comprehensive program terms, 
conditions, and disclosures. 

Reply via email to