Nice! I like the clean integration with streaming.

-Tyler

On Mon, May 14, 2018 at 2:48 PM Eugene Kirpichov <kirpic...@google.com>
wrote:

> Hi folks,
>
> Wanted to give a heads up about the existence of a commonly requested
> feature and its first successful production usage.
>
> The feature is the Wait.on() transform [1] , and the first successful
> production usage is in Spanner [2] .
>
> The Wait.on() transform allows you to "do this, then that" - in the sense
> that a.apply(Wait.on(signal)) re-emits PCollection "a", but only after the
> PCollection "signal" is "done" in the same window (i.e. when no more
> elements can arrive into the same window of "signal"). The PCollection
> "signal" is typically a collection of results of some operation - so
> Wait.on(signal) allows you to wait until that operation is done. It
> transparently works correctly in streaming pipelines too.
>
> This may sound a little convoluted, so the example from documentation
> should help.
>
> PCollection<Void> firstWriteResults = data.apply(ParDo.of(...write to
> first database...));
> data.apply(Wait.on(firstWriteResults))
>      // Windows of this intermediate PCollection will be processed no
> earlier than when
>      // the respective window of firstWriteResults closes.
>      .apply(ParDo.of(...write to second database...));
>
> This is indeed what Spanner folks have done, and AFAIK they intend this
> for importing multiple dependent database tables - e.g. first import a
> parent table; when it's done, import the child table - all within one
> pipeline. You can see example code in the tests [3].
>
> Please note that this kind of stuff requires support from the IO connector
> - IO.write() has to return a result that can be waited on. The code of
> SpannerIO is a great example; another example is FileIO.write().
>
> People have expressed wishes for similar support in Bigtable and BigQuery
> connectors but it's not there yet. It would be really cool if somebody
> added it to these connectors or others (I think there was a recent thread
> discussing how to add it to BigQueryIO).
>
> [1]
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Wait.java
> [2] https://github.com/apache/beam/pull/4264
> [3]
> https://github.com/apache/beam/blob/a3ce091b3bbebf724c63be910bd3bc4cede4d11f/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteIT.java#L158
>
>

Reply via email to