I think you are right, since "writer.close()” contains a business logic, it must be moved to @FinishBundle. The same thing about DeleteFn. I’ll create a Jira for that.
> On 25 Mar 2021, at 00:49, Kenneth Knowles <k...@apache.org> wrote: > > Alex's idea sounds good and like what Vincent maybe implemented. I am just > reading really quickly so sorry if I missed something... > > Checking out the code for the WriteFn<T> I see a big problem: > > @Setup > public void setup() { > writer = new Mutator<>(spec, Mapper::saveAsync, "writes"); > } > > @ProcessElement > public void processElement(ProcessContext c) throws ExecutionException, > InterruptedException { > writer.mutate(c.element()); > } > > @Teardown > public void teardown() throws Exception { > writer.close(); > writer = null; > } > > It is only in writer.close() that all async writes are waited on. This needs > to happen in @FinishBundle. > > Did you discover this when implementing your own Cassandra.Write? > > Until you have waited on the future, you should not output the element as > "has been written". And you cannot output from the @TearDown method which is > just for cleaning up resources. > > Am I reading this wrong? > > Kenn > > On Wed, Mar 24, 2021 at 4:35 PM Alex Amato <ajam...@google.com > <mailto:ajam...@google.com>> wrote: > How about a PCollection containing every element which was successfully > written? > Basically the same things which were passed into it. > > Then you could act on every element after its been successfully written to > the sink. > > On Wed, Mar 24, 2021 at 3:16 PM Robert Bradshaw <rober...@google.com > <mailto:rober...@google.com>> wrote: > On Wed, Mar 24, 2021 at 2:36 PM Ismaël Mejía <ieme...@gmail.com > <mailto:ieme...@gmail.com>> wrote: > +dev > > Since we all agree that we should return something different than > PDone the real question is what should we return. > > My proposal is that one returns a PCollection<?> that consists, internally, > of something contentless like nulls. This is future compatible with returning > something more maningful based on the source source or write process itself, > but at least this would be followable. > > As a reminder we had a pretty interesting discussion about this > already in the past but uniformization of our return values has not > happened. > This thread is worth reading for Vincent or anyone who wants to > contribute Write transforms that return. > https://lists.apache.org/thread.html/d1a4556a1e13a661cce19021926a5d0997fbbfde016d36989cf75a07%40%3Cdev.beam.apache.org%3E > > <https://lists.apache.org/thread.html/d1a4556a1e13a661cce19021926a5d0997fbbfde016d36989cf75a07%40%3Cdev.beam.apache.org%3E> > > Yeah, we should go ahead and finally do something. > > > > Returning PDone is an anti-pattern that should be avoided, but changing it > > now would be backwards incompatible. > > Periodic reminder most IOs are still Experimental so I suppose it is > worth to the maintainers to judge if the upgrade to return someething > different of PDone is worth, in that case we can deprecate and remove > the previous signature in short time (2 releases was the average for > previous cases). > > > On Wed, Mar 24, 2021 at 10:24 PM Alexey Romanenko > <aromanenko....@gmail.com <mailto:aromanenko....@gmail.com>> wrote: > > > > I thought that was said about returning a PCollection of write results as > > it’s done in other IOs (as I mentioned as examples) that have _additional_ > > write methods, like “withWriteResults()” etc, that return PTransform<…, > > PCollection<WriteResults>>. > > In this case, we keep backwards compatibility and just add new > > funtionality. Though, we need to follow the same pattern for user API and > > maybe even naming for this feature across different IOs (like we have for > > "readAll()” methods). > > > > I agree that we have to avoid returning PDone for such cases. > > > > On 24 Mar 2021, at 20:05, Robert Bradshaw <rober...@google.com > > <mailto:rober...@google.com>> wrote: > > > > Returning PDone is an anti-pattern that should be avoided, but changing it > > now would be backwards incompatible. PRs to add non-PDone returning > > variants (probably as another option to the builders) that compose well > > with Wait, etc. would be welcome. > > > > On Wed, Mar 24, 2021 at 11:14 AM Alexey Romanenko <aromanenko....@gmail.com > > <mailto:aromanenko....@gmail.com>> wrote: > >> > >> In this way, I think “Wait” PTransform should work for you but, as it was > >> mentioned before, it doesn’t work with PDone, only with PCollection as a > >> signal. > >> > >> Since you already adjusted your own writer for that, it would be great to > >> contribute it back to Beam in the way as it was done for other IOs (for > >> example, JdbcIO [1] or BigtableIO [2]) > >> > >> In general, I think we need to have it for all IOs, at least to use with > >> “Wait” because this pattern it's quite often required. > >> > >> [1] > >> https://github.com/apache/beam/blob/ab1dfa13a983d41669e70e83b11f58a83015004c/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L1078 > >> > >> <https://github.com/apache/beam/blob/ab1dfa13a983d41669e70e83b11f58a83015004c/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L1078> > >> [2] > >> https://github.com/apache/beam/blob/ab1dfa13a983d41669e70e83b11f58a83015004c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java#L715 > >> > >> <https://github.com/apache/beam/blob/ab1dfa13a983d41669e70e83b11f58a83015004c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java#L715> > >> > >> On 24 Mar 2021, at 18:01, Vincent Marquez <vincent.marq...@gmail.com > >> <mailto:vincent.marq...@gmail.com>> wrote: > >> > >> No, it only needs to ensure that one record seen on Pubsub has > >> successfully written to a database. So "record by record" is fine, or > >> even "bundle". > >> > >> ~Vincent > >> > >> > >> On Wed, Mar 24, 2021 at 9:49 AM Alexey Romanenko <aromanenko....@gmail.com > >> <mailto:aromanenko....@gmail.com>> wrote: > >>> > >>> Do you want to wait for ALL records are written for Cassandra and then > >>> write all successfully written records to PubSub or it should be > >>> performed "record by record"? > >>> > >>> On 24 Mar 2021, at 04:58, Vincent Marquez <vincent.marq...@gmail.com > >>> <mailto:vincent.marq...@gmail.com>> wrote: > >>> > >>> I have a common use case where my pipeline looks like this: > >>> CassandraIO.readAll -> Aggregate -> CassandraIO.write -> PubSubIO.write > >>> > >>> I do NOT want my pipeline to look like the following: > >>> > >>> CassandraIO.readAll -> Aggregate -> CassandraIO.write > >>> | > >>> -> > >>> PubsubIO.write > >>> > >>> Because I need to ensure that only items written to Pubsub have > >>> successfully finished a (quorum) write. > >>> > >>> Since CassandraIO.write is a PTransform<A, PDone> I can't actually use it > >>> here so I often roll my own 'writer', but maybe there is a recommended > >>> way of doing this? > >>> > >>> Thanks in advance for any help. > >>> > >>> ~Vincent > >>> > >>> > >> > >