Nice!

Will user@ mostly enjoy the benefits through enhanced IOs? If you think it
has more broad applicability in end-user pipelines, I guess the time to
announce this to user@ is as part of the release notes? Do we have a
suitable format/venue for email-length blurbs like this one (too long for a
one-liners, too short and in-the-weeds for a blog post)? I didn't just
forward it because I wanted to let you & others consider.

Kenn

On Mon, May 14, 2018 at 4:58 PM Tyler Akidau <taki...@google.com> wrote:

> Nice! I like the clean integration with streaming.
>
> -Tyler
>
> On Mon, May 14, 2018 at 2:48 PM Eugene Kirpichov <kirpic...@google.com>
> wrote:
>
>> Hi folks,
>>
>> Wanted to give a heads up about the existence of a commonly requested
>> feature and its first successful production usage.
>>
>> The feature is the Wait.on() transform [1] , and the first successful
>> production usage is in Spanner [2] .
>>
>> The Wait.on() transform allows you to "do this, then that" - in the sense
>> that a.apply(Wait.on(signal)) re-emits PCollection "a", but only after the
>> PCollection "signal" is "done" in the same window (i.e. when no more
>> elements can arrive into the same window of "signal"). The PCollection
>> "signal" is typically a collection of results of some operation - so
>> Wait.on(signal) allows you to wait until that operation is done. It
>> transparently works correctly in streaming pipelines too.
>>
>> This may sound a little convoluted, so the example from documentation
>> should help.
>>
>> PCollection<Void> firstWriteResults = data.apply(ParDo.of(...write to
>> first database...));
>> data.apply(Wait.on(firstWriteResults))
>>      // Windows of this intermediate PCollection will be processed no
>> earlier than when
>>      // the respective window of firstWriteResults closes.
>>      .apply(ParDo.of(...write to second database...));
>>
>> This is indeed what Spanner folks have done, and AFAIK they intend this
>> for importing multiple dependent database tables - e.g. first import a
>> parent table; when it's done, import the child table - all within one
>> pipeline. You can see example code in the tests [3].
>>
>> Please note that this kind of stuff requires support from the IO
>> connector - IO.write() has to return a result that can be waited on. The
>> code of SpannerIO is a great example; another example is FileIO.write().
>>
>> People have expressed wishes for similar support in Bigtable and BigQuery
>> connectors but it's not there yet. It would be really cool if somebody
>> added it to these connectors or others (I think there was a recent thread
>> discussing how to add it to BigQueryIO).
>>
>> [1]
>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Wait.java
>> [2] https://github.com/apache/beam/pull/4264
>> [3]
>> https://github.com/apache/beam/blob/a3ce091b3bbebf724c63be910bd3bc4cede4d11f/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteIT.java#L158
>>
>>
>

Reply via email to