Nice! Will user@ mostly enjoy the benefits through enhanced IOs? If you think it has more broad applicability in end-user pipelines, I guess the time to announce this to user@ is as part of the release notes? Do we have a suitable format/venue for email-length blurbs like this one (too long for a one-liners, too short and in-the-weeds for a blog post)? I didn't just forward it because I wanted to let you & others consider.
Kenn On Mon, May 14, 2018 at 4:58 PM Tyler Akidau <taki...@google.com> wrote: > Nice! I like the clean integration with streaming. > > -Tyler > > On Mon, May 14, 2018 at 2:48 PM Eugene Kirpichov <kirpic...@google.com> > wrote: > >> Hi folks, >> >> Wanted to give a heads up about the existence of a commonly requested >> feature and its first successful production usage. >> >> The feature is the Wait.on() transform [1] , and the first successful >> production usage is in Spanner [2] . >> >> The Wait.on() transform allows you to "do this, then that" - in the sense >> that a.apply(Wait.on(signal)) re-emits PCollection "a", but only after the >> PCollection "signal" is "done" in the same window (i.e. when no more >> elements can arrive into the same window of "signal"). The PCollection >> "signal" is typically a collection of results of some operation - so >> Wait.on(signal) allows you to wait until that operation is done. It >> transparently works correctly in streaming pipelines too. >> >> This may sound a little convoluted, so the example from documentation >> should help. >> >> PCollection<Void> firstWriteResults = data.apply(ParDo.of(...write to >> first database...)); >> data.apply(Wait.on(firstWriteResults)) >> // Windows of this intermediate PCollection will be processed no >> earlier than when >> // the respective window of firstWriteResults closes. >> .apply(ParDo.of(...write to second database...)); >> >> This is indeed what Spanner folks have done, and AFAIK they intend this >> for importing multiple dependent database tables - e.g. first import a >> parent table; when it's done, import the child table - all within one >> pipeline. You can see example code in the tests [3]. >> >> Please note that this kind of stuff requires support from the IO >> connector - IO.write() has to return a result that can be waited on. The >> code of SpannerIO is a great example; another example is FileIO.write(). >> >> People have expressed wishes for similar support in Bigtable and BigQuery >> connectors but it's not there yet. It would be really cool if somebody >> added it to these connectors or others (I think there was a recent thread >> discussing how to add it to BigQueryIO). >> >> [1] >> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Wait.java >> [2] https://github.com/apache/beam/pull/4264 >> [3] >> https://github.com/apache/beam/blob/a3ce091b3bbebf724c63be910bd3bc4cede4d11f/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteIT.java#L158 >> >> >