Hi Jonathan,

I just wanted to let you know that this feature [1] was implemented and, 
finally, merged into master. So, it should be included into next Beam 2.13 
release.

In few words, it was added new method called “Write.withResults()” which 
returns WriteVoid transform that provides “PCollection<Void>” as an output and 
can be used together with "Wait.on()". So, the simple example of writing into 
two different databases can look like this:

PCollection<Void> firstWriteResults = data.apply(JdbcIO.write()
    .withDataSourceConfiguration(CONF_DB_1).withResults());
data.apply(Wait.on(firstWriteResults))
    .apply(JdbcIO.write().withDataSourceConfiguration(CONF_DB_2));

[1] https://issues.apache.org/jira/browse/BEAM-6732 
<https://issues.apache.org/jira/browse/BEAM-6732>

> On 22 Feb 2019, at 16:52, Alexey Romanenko <aromanenko....@gmail.com> wrote:
> 
> I have created new Jira issue for this feature:
> https://issues.apache.org/jira/browse/BEAM-6732 
> <https://issues.apache.org/jira/browse/BEAM-6732>
> 
> Jonathan, feel free to assign it to yourself if you want to contribute, it is 
> always welcomed =)
> 
>> On 21 Feb 2019, at 10:23, Jonathan Perron <jonathan.per...@lumapps.com 
>> <mailto:jonathan.per...@lumapps.com>> wrote:
>> 
>> Thank you Eugene for your answer.
>> 
>> According to your explanation, I think I will go with your 3rd solution, as 
>> this seems the most robust and friendly way to act.
>> 
>> Jonathan
>> On 21/02/2019 02:22, Eugene Kirpichov wrote:
>>> Hi Jonathan,
>>> 
>>> Wait.on() requires a PCollection - it is not possible to change it to wait 
>>> on PDone because all PDone's in the pipeline are the same so it's not clear 
>>> what exactly you'd be waiting on.
>>> 
>>> To use the Wait transform with JdbcIO.write(), you would need to change 
>>> https://github.com/apache/beam/blob/master/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L761-L762
>>>  
>>> <https://github.com/apache/beam/blob/master/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L761-L762>
>>>  to simply "return input.apply(ParDo.of(...))" and propagate that into the 
>>> type signature. Then you'd get a waitable PCollection<Void>.
>>> 
>>> This is a very simple, but backwards-incompatible change. Up to the Beam 
>>> community whether/when people would want to make it.
>>> 
>>> It's also possible to make a slightly larger but compatible change, where 
>>> JdbcIO.write() would stay as is, but you could write e.g. 
>>> "JdbcIO.write().withResults()" which would be a new transform that *does* 
>>> return results and is waitable. A similar approach is taken in 
>>> TextIO.write().withOutputFilenames().
>>> 
>>> On Wed, Feb 20, 2019 at 4:58 AM Jonathan Perron 
>>> <jonathan.per...@lumapps.com <mailto:jonathan.per...@lumapps.com>> wrote:
>>> Hello folks,
>>> 
>>> I am meeting a special case where I need to wait for a JdbcIO.write() 
>>> operation to be complete to start a second one.
>>> 
>>> In the details, I have a PCollection<Map<String, String>> which is used 
>>> to fill two different SQL statement. It is used in a first 
>>> JdbcIO.write() operation to store anonymized user in a table (userId 
>>> with an associated userUuid generated with UUID.randomUUID()). These two 
>>> parameters have a unique constraint, meaning that a userId cannot have 
>>> multiple userUuid. Unfortunately, on several runs of my pipeline, the 
>>> UUID will be different, meaning that I need to query this table at some 
>>> point, or to use what I describe in the following.
>>> 
>>> I am planning to fill a second table with this userUuid with a couple of 
>>> others information such as the time of first visit. To limit I/O and as 
>>> I got a lot of information in my PCollection, I want to use it once more 
>>> with a different SQL statement, where the userUuid is read from the 
>>> first table using a SELECT statement. This cannot work if the first 
>>> JdbcIO.write() operation is not complete.
>>> 
>>> I saw that the Java SDK proposes a Wait.on() PTransform, but it is 
>>> unfortunately only compatible with PCollection, and not a PDone such as 
>>> the one output from the JdbcIO operation. Could my issue be solved by 
>>> expanding the Wait.On() or should I go with an other solution ? If so, 
>>> how could I implement it ?
>>> 
>>> Many thanks for your input !
>>> 
>>> Jonathan
>>> 
> 

Reply via email to