Nice! I like the clean integration with streaming. -Tyler
On Mon, May 14, 2018 at 2:48 PM Eugene Kirpichov <kirpic...@google.com> wrote: > Hi folks, > > Wanted to give a heads up about the existence of a commonly requested > feature and its first successful production usage. > > The feature is the Wait.on() transform [1] , and the first successful > production usage is in Spanner [2] . > > The Wait.on() transform allows you to "do this, then that" - in the sense > that a.apply(Wait.on(signal)) re-emits PCollection "a", but only after the > PCollection "signal" is "done" in the same window (i.e. when no more > elements can arrive into the same window of "signal"). The PCollection > "signal" is typically a collection of results of some operation - so > Wait.on(signal) allows you to wait until that operation is done. It > transparently works correctly in streaming pipelines too. > > This may sound a little convoluted, so the example from documentation > should help. > > PCollection<Void> firstWriteResults = data.apply(ParDo.of(...write to > first database...)); > data.apply(Wait.on(firstWriteResults)) > // Windows of this intermediate PCollection will be processed no > earlier than when > // the respective window of firstWriteResults closes. > .apply(ParDo.of(...write to second database...)); > > This is indeed what Spanner folks have done, and AFAIK they intend this > for importing multiple dependent database tables - e.g. first import a > parent table; when it's done, import the child table - all within one > pipeline. You can see example code in the tests [3]. > > Please note that this kind of stuff requires support from the IO connector > - IO.write() has to return a result that can be waited on. The code of > SpannerIO is a great example; another example is FileIO.write(). > > People have expressed wishes for similar support in Bigtable and BigQuery > connectors but it's not there yet. It would be really cool if somebody > added it to these connectors or others (I think there was a recent thread > discussing how to add it to BigQueryIO). > > [1] > https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Wait.java > [2] https://github.com/apache/beam/pull/4264 > [3] > https://github.com/apache/beam/blob/a3ce091b3bbebf724c63be910bd3bc4cede4d11f/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteIT.java#L158 > >