I thought that was said about returning a PCollection of write results as it’s 
done in other IOs (as I mentioned as examples) that have _additional_ write 
methods, like “withWriteResults()” etc, that return PTransform<…, 
PCollection<WriteResults>>. 
In this case, we keep backwards compatibility and just add new funtionality. 
Though, we need to follow the same pattern for user API and maybe even naming 
for this feature across different IOs (like we have for "readAll()” methods).

 I agree that we have to avoid returning PDone for such cases. 

> On 24 Mar 2021, at 20:05, Robert Bradshaw <rober...@google.com> wrote:
> 
> Returning PDone is an anti-pattern that should be avoided, but changing it 
> now would be backwards incompatible. PRs to add non-PDone returning variants 
> (probably as another option to the builders) that compose well with Wait, 
> etc. would be welcome. 
> 
> On Wed, Mar 24, 2021 at 11:14 AM Alexey Romanenko <aromanenko....@gmail.com 
> <mailto:aromanenko....@gmail.com>> wrote:
> In this way, I think “Wait” PTransform should work for you but, as it was 
> mentioned before, it doesn’t work with PDone, only with PCollection as a 
> signal. 
> 
> Since you already adjusted your own writer for that, it would be great to 
> contribute it back to Beam in the way as it was done for other IOs (for 
> example, JdbcIO [1] or BigtableIO [2])
> 
> In general, I think we need to have it for all IOs, at least to use with 
> “Wait” because this pattern it's quite often required.
> 
> [1] 
> https://github.com/apache/beam/blob/ab1dfa13a983d41669e70e83b11f58a83015004c/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L1078
>  
> <https://github.com/apache/beam/blob/ab1dfa13a983d41669e70e83b11f58a83015004c/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L1078>
> [2] 
> https://github.com/apache/beam/blob/ab1dfa13a983d41669e70e83b11f58a83015004c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java#L715
>  
> <https://github.com/apache/beam/blob/ab1dfa13a983d41669e70e83b11f58a83015004c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java#L715>
> 
>> On 24 Mar 2021, at 18:01, Vincent Marquez <vincent.marq...@gmail.com 
>> <mailto:vincent.marq...@gmail.com>> wrote:
>> 
>> No, it only needs to ensure that one record seen on Pubsub has successfully 
>> written to a database.  So "record by record" is fine, or even "bundle". 
>> 
>> ~Vincent
>> 
>> 
>> On Wed, Mar 24, 2021 at 9:49 AM Alexey Romanenko <aromanenko....@gmail.com 
>> <mailto:aromanenko....@gmail.com>> wrote:
>> Do you want to wait for ALL records are written for Cassandra and then write 
>> all successfully written records to PubSub or it should be performed "record 
>> by record"?
>> 
>>> On 24 Mar 2021, at 04:58, Vincent Marquez <vincent.marq...@gmail.com 
>>> <mailto:vincent.marq...@gmail.com>> wrote:
>>> 
>>> I have a common use case where my pipeline looks like this:
>>> CassandraIO.readAll -> Aggregate -> CassandraIO.write -> PubSubIO.write
>>> 
>>> I do NOT want my pipeline to look like the following:
>>> 
>>> CassandraIO.readAll -> Aggregate -> CassandraIO.write 
>>>                                                          |
>>>                                                           -> PubsubIO.write
>>> 
>>> Because I need to ensure that only items written to Pubsub have 
>>> successfully finished a (quorum) write.  
>>> 
>>> Since CassandraIO.write is a PTransform<A, PDone> I can't actually use it 
>>> here so I often roll my own 'writer', but maybe there is a recommended way 
>>> of doing this? 
>>> 
>>> Thanks in advance for any help. 
>>> 
>>> ~Vincent
>> 
> 

Reply via email to