This sounds super interesting and useful ! Eugene can you please elaborate on this phrase 'has to return a result that can be waited on'. It is not clear for me what this means and I would like to understand this to evaluate what other IOs could potentially support this.
On Thu, May 17, 2018 at 10:13 PM Eugene Kirpichov <kirpic...@google.com> wrote: > Thanks Kenn, forwarding to user@ is a good idea; just did that. > JB - this is orthogonal to SDF, because I'd expect this transform to be primarily used for waiting on the results of SomethingIO.write(), whereas SDF is primarily useful for implementing SomethingIO.read(). > On Mon, May 14, 2018 at 10:25 PM Jean-Baptiste Onofré <j...@nanthrax.net> wrote: >> Cool !!! >> I guess we can leverage this in IOs with SDF. >> Thanks >> Regards >> JB >> On 14/05/2018 23:48, Eugene Kirpichov wrote: >> > Hi folks, >> > >> > Wanted to give a heads up about the existence of a commonly requested >> > feature and its first successful production usage. >> > >> > The feature is the Wait.on() transform [1] , and the first successful >> > production usage is in Spanner [2] . >> > >> > The Wait.on() transform allows you to "do this, then that" - in the >> > sense that a.apply(Wait.on(signal)) re-emits PCollection "a", but only >> > after the PCollection "signal" is "done" in the same window (i.e. when >> > no more elements can arrive into the same window of "signal"). The >> > PCollection "signal" is typically a collection of results of some >> > operation - so Wait.on(signal) allows you to wait until that operation >> > is done. It transparently works correctly in streaming pipelines too. >> > >> > This may sound a little convoluted, so the example from documentation >> > should help. >> > >> > PCollection<Void> firstWriteResults = data.apply(ParDo.of(...write to >> > first database...)); >> > data.apply(Wait.on(firstWriteResults)) >> > // Windows of this intermediate PCollection will be processed no >> > earlier than when >> > // the respective window of firstWriteResults closes. >> > .apply(ParDo.of(...write to second database...)); >> > >> > This is indeed what Spanner folks have done, and AFAIK they intend this >> > for importing multiple dependent database tables - e.g. first import a >> > parent table; when it's done, import the child table - all within one >> > pipeline. You can see example code in the tests [3]. >> > >> > Please note that this kind of stuff requires support from the IO >> > connector - IO.write() has to return a result that can be waited on. The >> > code of SpannerIO is a great example; another example is FileIO.write(). >> > >> > People have expressed wishes for similar support in Bigtable and >> > BigQuery connectors but it's not there yet. It would be really cool if >> > somebody added it to these connectors or others (I think there was a >> > recent thread discussing how to add it to BigQueryIO). >> > >> > [1] >> > https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Wait.java >> > [2] https://github.com/apache/beam/pull/4264 >> > [3] >> > https://github.com/apache/beam/blob/a3ce091b3bbebf724c63be910bd3bc4cede4d11f/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteIT.java#L158 >> >