Thanks Kenn, forwarding to user@ is a good idea; just did that.

JB - this is orthogonal to SDF, because I'd expect this transform to be
primarily used for waiting on the results of SomethingIO.write(), whereas
SDF is primarily useful for implementing SomethingIO.read().

On Mon, May 14, 2018 at 10:25 PM Jean-Baptiste Onofré <j...@nanthrax.net>
wrote:

> Cool !!!
>
> I guess we can leverage this in IOs with SDF.
>
> Thanks
> Regards
> JB
>
> On 14/05/2018 23:48, Eugene Kirpichov wrote:
> > Hi folks,
> >
> > Wanted to give a heads up about the existence of a commonly requested
> > feature and its first successful production usage.
> >
> > The feature is the Wait.on() transform [1] , and the first successful
> > production usage is in Spanner [2] .
> >
> > The Wait.on() transform allows you to "do this, then that" - in the
> > sense that a.apply(Wait.on(signal)) re-emits PCollection "a", but only
> > after the PCollection "signal" is "done" in the same window (i.e. when
> > no more elements can arrive into the same window of "signal"). The
> > PCollection "signal" is typically a collection of results of some
> > operation - so Wait.on(signal) allows you to wait until that operation
> > is done. It transparently works correctly in streaming pipelines too.
> >
> > This may sound a little convoluted, so the example from documentation
> > should help.
> >
> > PCollection<Void> firstWriteResults = data.apply(ParDo.of(...write to
> > first database...));
> > data.apply(Wait.on(firstWriteResults))
> >       // Windows of this intermediate PCollection will be processed no
> > earlier than when
> >       // the respective window of firstWriteResults closes.
> >       .apply(ParDo.of(...write to second database...));
> >
> > This is indeed what Spanner folks have done, and AFAIK they intend this
> > for importing multiple dependent database tables - e.g. first import a
> > parent table; when it's done, import the child table - all within one
> > pipeline. You can see example code in the tests [3].
> >
> > Please note that this kind of stuff requires support from the IO
> > connector - IO.write() has to return a result that can be waited on. The
> > code of SpannerIO is a great example; another example is FileIO.write().
> >
> > People have expressed wishes for similar support in Bigtable and
> > BigQuery connectors but it's not there yet. It would be really cool if
> > somebody added it to these connectors or others (I think there was a
> > recent thread discussing how to add it to BigQueryIO).
> >
> > [1]
> >
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Wait.java
> > [2] https://github.com/apache/beam/pull/4264
> > [3]
> >
> https://github.com/apache/beam/blob/a3ce091b3bbebf724c63be910bd3bc4cede4d11f/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteIT.java#L158
> >
>

Reply via email to