Thank you Eugene for your answer.

According to your explanation, I think I will go with your 3rd solution, as this seems the most robust and friendly way to act.

Jonathan

On 21/02/2019 02:22, Eugene Kirpichov wrote:
Hi Jonathan,

Wait.on() requires a PCollection - it is not possible to change it to wait on PDone because all PDone's in the pipeline are the same so it's not clear what exactly you'd be waiting on.

To use the Wait transform with JdbcIO.write(), you would need to change https://github.com/apache/beam/blob/master/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L761-L762 to simply "return input.apply(ParDo.of(...))" and propagate that into the type signature. Then you'd get a waitable PCollection<Void>.

This is a very simple, but backwards-incompatible change. Up to the Beam community whether/when people would want to make it.

It's also possible to make a slightly larger but compatible change, where JdbcIO.write() would stay as is, but you could write e.g. "JdbcIO.write().withResults()" which would be a new transform that *does* return results and is waitable. A similar approach is taken in TextIO.write().withOutputFilenames().

On Wed, Feb 20, 2019 at 4:58 AM Jonathan Perron <jonathan.per...@lumapps.com <mailto:jonathan.per...@lumapps.com>> wrote:

    Hello folks,

    I am meeting a special case where I need to wait for a JdbcIO.write()
    operation to be complete to start a second one.

    In the details, I have a PCollection<Map<String, String>> which is
    used
    to fill two different SQL statement. It is used in a first
    JdbcIO.write() operation to store anonymized user in a table (userId
    with an associated userUuid generated with UUID.randomUUID()).
    These two
    parameters have a unique constraint, meaning that a userId cannot
    have
    multiple userUuid. Unfortunately, on several runs of my pipeline, the
    UUID will be different, meaning that I need to query this table at
    some
    point, or to use what I describe in the following.

    I am planning to fill a second table with this userUuid with a
    couple of
    others information such as the time of first visit. To limit I/O
    and as
    I got a lot of information in my PCollection, I want to use it
    once more
    with a different SQL statement, where the userUuid is read from the
    first table using a SELECT statement. This cannot work if the first
    JdbcIO.write() operation is not complete.

    I saw that the Java SDK proposes a Wait.on() PTransform, but it is
    unfortunately only compatible with PCollection, and not a PDone
    such as
    the one output from the JdbcIO operation. Could my issue be solved by
    expanding the Wait.On() or should I go with an other solution ? If
    so,
    how could I implement it ?

    Many thanks for your input !

    Jonathan

Reply via email to