Thank you Eugene for your answer.
According to your explanation, I think I will go with your 3rd solution,
as this seems the most robust and friendly way to act.
Jonathan
On 21/02/2019 02:22, Eugene Kirpichov wrote:
Hi Jonathan,
Wait.on() requires a PCollection - it is not possible to change it to
wait on PDone because all PDone's in the pipeline are the same so it's
not clear what exactly you'd be waiting on.
To use the Wait transform with JdbcIO.write(), you would need to
change
https://github.com/apache/beam/blob/master/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L761-L762 to
simply "return input.apply(ParDo.of(...))" and propagate that into the
type signature. Then you'd get a waitable PCollection<Void>.
This is a very simple, but backwards-incompatible change. Up to the
Beam community whether/when people would want to make it.
It's also possible to make a slightly larger but compatible change,
where JdbcIO.write() would stay as is, but you could write e.g.
"JdbcIO.write().withResults()" which would be a new transform that
*does* return results and is waitable. A similar approach is taken in
TextIO.write().withOutputFilenames().
On Wed, Feb 20, 2019 at 4:58 AM Jonathan Perron
<jonathan.per...@lumapps.com <mailto:jonathan.per...@lumapps.com>> wrote:
Hello folks,
I am meeting a special case where I need to wait for a JdbcIO.write()
operation to be complete to start a second one.
In the details, I have a PCollection<Map<String, String>> which is
used
to fill two different SQL statement. It is used in a first
JdbcIO.write() operation to store anonymized user in a table (userId
with an associated userUuid generated with UUID.randomUUID()).
These two
parameters have a unique constraint, meaning that a userId cannot
have
multiple userUuid. Unfortunately, on several runs of my pipeline, the
UUID will be different, meaning that I need to query this table at
some
point, or to use what I describe in the following.
I am planning to fill a second table with this userUuid with a
couple of
others information such as the time of first visit. To limit I/O
and as
I got a lot of information in my PCollection, I want to use it
once more
with a different SQL statement, where the userUuid is read from the
first table using a SELECT statement. This cannot work if the first
JdbcIO.write() operation is not complete.
I saw that the Java SDK proposes a Wait.on() PTransform, but it is
unfortunately only compatible with PCollection, and not a PDone
such as
the one output from the JdbcIO operation. Could my issue be solved by
expanding the Wait.On() or should I go with an other solution ? If
so,
how could I implement it ?
Many thanks for your input !
Jonathan