I think you are right, since "writer.close()”  contains a business logic, it 
must be moved to @FinishBundle. The same thing about DeleteFn.
I’ll create a Jira for that.

> On 25 Mar 2021, at 00:49, Kenneth Knowles <k...@apache.org> wrote:
> 
> Alex's idea sounds good and like what Vincent maybe implemented. I am just 
> reading really quickly so sorry if I missed something...
> 
> Checking out the code for the WriteFn<T> I see a big problem:
> 
>     @Setup
>     public void setup() {
>       writer = new Mutator<>(spec, Mapper::saveAsync, "writes");
>     }
> 
>     @ProcessElement
>       public void processElement(ProcessContext c) throws ExecutionException, 
> InterruptedException {
>       writer.mutate(c.element());
>     }
> 
>     @Teardown
>     public void teardown() throws Exception {
>       writer.close();
>       writer = null;
>     }
> 
> It is only in writer.close() that all async writes are waited on. This needs 
> to happen in @FinishBundle.
> 
> Did you discover this when implementing your own Cassandra.Write?
> 
> Until you have waited on the future, you should not output the element as 
> "has been written". And you cannot output from the @TearDown method which is 
> just for cleaning up resources.
> 
> Am I reading this wrong?
> 
> Kenn
> 
> On Wed, Mar 24, 2021 at 4:35 PM Alex Amato <ajam...@google.com 
> <mailto:ajam...@google.com>> wrote:
> How about a PCollection containing every element which was successfully 
> written?
> Basically the same things which were passed into it.
> 
> Then you could act on every element after its been successfully written to 
> the sink.
> 
> On Wed, Mar 24, 2021 at 3:16 PM Robert Bradshaw <rober...@google.com 
> <mailto:rober...@google.com>> wrote:
> On Wed, Mar 24, 2021 at 2:36 PM Ismaël Mejía <ieme...@gmail.com 
> <mailto:ieme...@gmail.com>> wrote:
> +dev
> 
> Since we all agree that we should return something different than
> PDone the real question is what should we return.
> 
> My proposal is that one returns a PCollection<?> that consists, internally, 
> of something contentless like nulls. This is future compatible with returning 
> something more maningful based on the source source or write process itself, 
> but at least this would be followable. 
>  
> As a reminder we had a pretty interesting discussion about this
> already in the past but uniformization of our return values has not
> happened.
> This thread is worth reading for Vincent or anyone who wants to
> contribute Write transforms that return.
> https://lists.apache.org/thread.html/d1a4556a1e13a661cce19021926a5d0997fbbfde016d36989cf75a07%40%3Cdev.beam.apache.org%3E
>  
> <https://lists.apache.org/thread.html/d1a4556a1e13a661cce19021926a5d0997fbbfde016d36989cf75a07%40%3Cdev.beam.apache.org%3E>
> 
> Yeah, we should go ahead and finally do something. 
>  
> 
> > Returning PDone is an anti-pattern that should be avoided, but changing it 
> > now would be backwards incompatible.
> 
> Periodic reminder most IOs are still Experimental so I suppose it is
> worth to the maintainers to judge if the upgrade to return someething
> different of PDone is worth, in that case we can deprecate and remove
> the previous signature in short time (2 releases was the average for
> previous cases).
> 
> 
> On Wed, Mar 24, 2021 at 10:24 PM Alexey Romanenko
> <aromanenko....@gmail.com <mailto:aromanenko....@gmail.com>> wrote:
> >
> > I thought that was said about returning a PCollection of write results as 
> > it’s done in other IOs (as I mentioned as examples) that have _additional_ 
> > write methods, like “withWriteResults()” etc, that return PTransform<…, 
> > PCollection<WriteResults>>.
> > In this case, we keep backwards compatibility and just add new 
> > funtionality. Though, we need to follow the same pattern for user API and 
> > maybe even naming for this feature across different IOs (like we have for 
> > "readAll()” methods).
> >
> >  I agree that we have to avoid returning PDone for such cases.
> >
> > On 24 Mar 2021, at 20:05, Robert Bradshaw <rober...@google.com 
> > <mailto:rober...@google.com>> wrote:
> >
> > Returning PDone is an anti-pattern that should be avoided, but changing it 
> > now would be backwards incompatible. PRs to add non-PDone returning 
> > variants (probably as another option to the builders) that compose well 
> > with Wait, etc. would be welcome.
> >
> > On Wed, Mar 24, 2021 at 11:14 AM Alexey Romanenko <aromanenko....@gmail.com 
> > <mailto:aromanenko....@gmail.com>> wrote:
> >>
> >> In this way, I think “Wait” PTransform should work for you but, as it was 
> >> mentioned before, it doesn’t work with PDone, only with PCollection as a 
> >> signal.
> >>
> >> Since you already adjusted your own writer for that, it would be great to 
> >> contribute it back to Beam in the way as it was done for other IOs (for 
> >> example, JdbcIO [1] or BigtableIO [2])
> >>
> >> In general, I think we need to have it for all IOs, at least to use with 
> >> “Wait” because this pattern it's quite often required.
> >>
> >> [1] 
> >> https://github.com/apache/beam/blob/ab1dfa13a983d41669e70e83b11f58a83015004c/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L1078
> >>  
> >> <https://github.com/apache/beam/blob/ab1dfa13a983d41669e70e83b11f58a83015004c/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L1078>
> >> [2] 
> >> https://github.com/apache/beam/blob/ab1dfa13a983d41669e70e83b11f58a83015004c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java#L715
> >>  
> >> <https://github.com/apache/beam/blob/ab1dfa13a983d41669e70e83b11f58a83015004c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java#L715>
> >>
> >> On 24 Mar 2021, at 18:01, Vincent Marquez <vincent.marq...@gmail.com 
> >> <mailto:vincent.marq...@gmail.com>> wrote:
> >>
> >> No, it only needs to ensure that one record seen on Pubsub has 
> >> successfully written to a database.  So "record by record" is fine, or 
> >> even "bundle".
> >>
> >> ~Vincent
> >>
> >>
> >> On Wed, Mar 24, 2021 at 9:49 AM Alexey Romanenko <aromanenko....@gmail.com 
> >> <mailto:aromanenko....@gmail.com>> wrote:
> >>>
> >>> Do you want to wait for ALL records are written for Cassandra and then 
> >>> write all successfully written records to PubSub or it should be 
> >>> performed "record by record"?
> >>>
> >>> On 24 Mar 2021, at 04:58, Vincent Marquez <vincent.marq...@gmail.com 
> >>> <mailto:vincent.marq...@gmail.com>> wrote:
> >>>
> >>> I have a common use case where my pipeline looks like this:
> >>> CassandraIO.readAll -> Aggregate -> CassandraIO.write -> PubSubIO.write
> >>>
> >>> I do NOT want my pipeline to look like the following:
> >>>
> >>> CassandraIO.readAll -> Aggregate -> CassandraIO.write
> >>>                                                          |
> >>>                                                           -> 
> >>> PubsubIO.write
> >>>
> >>> Because I need to ensure that only items written to Pubsub have 
> >>> successfully finished a (quorum) write.
> >>>
> >>> Since CassandraIO.write is a PTransform<A, PDone> I can't actually use it 
> >>> here so I often roll my own 'writer', but maybe there is a recommended 
> >>> way of doing this?
> >>>
> >>> Thanks in advance for any help.
> >>>
> >>> ~Vincent
> >>>
> >>>
> >>
> >

Reply via email to