I mean it has to return a PCollection of something, that contains elements representing the result of completing processing of the respective window. E.g. FileIO.write() returns a PCollection of filenames; SpannerIO.write() returns simply a PCollection of Void.
However, connectors such as BigtableIO.write() and BigQueryIO.write() don't return such a PCollection. The former returns PDone; the latter returns a PCollection of failed inserts that in some cases is unconnected to the actual processing (when using load jobs). On Thu, May 17, 2018 at 1:55 PM Ismaël Mejía <ieme...@gmail.com> wrote: > This sounds super interesting and useful ! > > Eugene can you please elaborate on this phrase 'has to return a result that > can be waited on'. It is not clear for me what this means and I would like > to understand this to evaluate what other IOs could potentially support > this. > > > On Thu, May 17, 2018 at 10:13 PM Eugene Kirpichov <kirpic...@google.com> > wrote: > > > Thanks Kenn, forwarding to user@ is a good idea; just did that. > > > JB - this is orthogonal to SDF, because I'd expect this transform to be > primarily used for waiting on the results of SomethingIO.write(), whereas > SDF is primarily useful for implementing SomethingIO.read(). > > > On Mon, May 14, 2018 at 10:25 PM Jean-Baptiste Onofré <j...@nanthrax.net> > wrote: > > >> Cool !!! > > >> I guess we can leverage this in IOs with SDF. > > >> Thanks > >> Regards > >> JB > > >> On 14/05/2018 23:48, Eugene Kirpichov wrote: > >> > Hi folks, > >> > > >> > Wanted to give a heads up about the existence of a commonly requested > >> > feature and its first successful production usage. > >> > > >> > The feature is the Wait.on() transform [1] , and the first successful > >> > production usage is in Spanner [2] . > >> > > >> > The Wait.on() transform allows you to "do this, then that" - in the > >> > sense that a.apply(Wait.on(signal)) re-emits PCollection "a", but only > >> > after the PCollection "signal" is "done" in the same window (i.e. when > >> > no more elements can arrive into the same window of "signal"). The > >> > PCollection "signal" is typically a collection of results of some > >> > operation - so Wait.on(signal) allows you to wait until that operation > >> > is done. It transparently works correctly in streaming pipelines too. > >> > > >> > This may sound a little convoluted, so the example from documentation > >> > should help. > >> > > >> > PCollection<Void> firstWriteResults = data.apply(ParDo.of(...write to > >> > first database...)); > >> > data.apply(Wait.on(firstWriteResults)) > >> > // Windows of this intermediate PCollection will be processed no > >> > earlier than when > >> > // the respective window of firstWriteResults closes. > >> > .apply(ParDo.of(...write to second database...)); > >> > > >> > This is indeed what Spanner folks have done, and AFAIK they intend > this > >> > for importing multiple dependent database tables - e.g. first import a > >> > parent table; when it's done, import the child table - all within one > >> > pipeline. You can see example code in the tests [3]. > >> > > >> > Please note that this kind of stuff requires support from the IO > >> > connector - IO.write() has to return a result that can be waited on. > The > >> > code of SpannerIO is a great example; another example is > FileIO.write(). > >> > > >> > People have expressed wishes for similar support in Bigtable and > >> > BigQuery connectors but it's not there yet. It would be really cool if > >> > somebody added it to these connectors or others (I think there was a > >> > recent thread discussing how to add it to BigQueryIO). > >> > > >> > [1] > >> > > > https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Wait.java > >> > [2] https://github.com/apache/beam/pull/4264 > >> > [3] > >> > > > https://github.com/apache/beam/blob/a3ce091b3bbebf724c63be910bd3bc4cede4d11f/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteIT.java#L158 > >> > >